RabbitMQ - 消息中间件

    技术2024-05-16  88

    一、引言

    什么RabbitMQ?

    RabbitMQ是基于amqp协议,实现的一种MQ理念的服务。类似的服务 RocketMQ、ActiveMQ、Kafka等

    为什么在分布式项目中需要一款消息中间件?

    消息中间件能够实现一些Feign(同步调用)无法实现的效果:

    1、服务的异步调用 2、消息的广播(事件总线) 3、消息的延迟处理 4、分布式事务 5、请求削峰(处理高并发)

     

    二、RabbitMQ的Docker安装

    1)拉取镜像

    docker pull rabbitmq:3.8.5-management

    2)准备docker-compose模板

    .... rabbitmq:   image: rabbitmq:3.8.5-management   container_name: rabbitmq   ports:     - 5672:5672     - 15672:15672   restart: always

    3)启动rabbitmq容器

    docker-compose up -d rabbitmq

    4)访问rabbitmq的管理页面

     

    5)登录rabbitmq的管理页面 (账号:guest 密码:guest)

    看到这个页面说明安装成功

    三、RabbitMQ常用模型

    1)模型一

    P -> Provider(提供者) 红色方块 -> 队列(存储消息) C -> Consumer(消费者)

    2)模型二

    一个提供者对应多个消费者,消息会轮训发送给两个消费者

    起到一个消费端负载均衡的目的,减轻消费端的消费压力

    3)模型三

    发布/订阅模式 - 消息广播

    多个消费者会同时收到提供者发布的消息

    X -> Exchange(交换机,消息的复制转发,不能存储消息)

    4)模型四

    路由键 -> 交换机和队列绑定若干路由键,发布的消息可以指定路由键发送

    5)模型五

    通配符的路由键

    6)模型六

    Rabbitmq的同步调用模型

    四、JavaAPI调用RabbitMQ

    添加依赖

    <dependency>    <groupId>com.rabbitmq</groupId>    <artifactId>amqp-client</artifactId>    <version>5.9.0</version> </dependency>

     

    1)模型一的实现

    服务的提供者:

    public static void main(String[] args) throws IOException, TimeoutException {    //1、连接RabbitMQ    Connection connection = ConnectionUtil.getConnection(); ​    //2、通过连接获得管道对象(后面所有的操作都是通过管道对象操作)    Channel channel = connection.createChannel(); ​    //3、创建队列    channel.queueDeclare("test_queue1", false, false, false, null); ​    //4、给队列中发布消息    String msg = "Hello RabbitMQ!!!!";    channel.basicPublish("", "test_queue1", null, msg.getBytes("utf-8")); ​    //关闭连接    connection.close(); }

    服务的消费者:

    public static void main(String[] args) throws IOException {        //1、连接RabbitMQ        Connection connection = ConnectionUtil.getConnection(); ​        //2、获得连接的channel对象        Channel channel = connection.createChannel(); ​        //3、监听队列        channel.basicConsume("test_queue1", true, new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println("接收到消息:" + new String(body, "utf-8"));           }       }); ​        //关闭连接??? //       connection.close();   }

    思考:

    1、队列应该在消费者端创建还是提供者端创建?

        - 消费者通常创建队列,提供者创建交换机

    2、消费端是同步消费消息还是异步消费消息?- 同步消费,必须消费完一条消息,才能继续消费下一条消息,在实际开发过程中,为了提高消费者的消费速率,往往会引入线程池的方式,进行多线程消费。 

    //创建一个线程池 - 线程数量为5 private static ExecutorService executorService = Executors.newFixedThreadPool(5); .... channel.basicConsume("test_queue1", true, new DefaultConsumer(channel){   @Override   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ​             executorService.submit(new Runnable() {                 public void run() {                     try {                         System.out.println("接收到消息:"                                               + new String(body, "utf-8"));                       } catch (UnsupportedEncodingException e) {                            e.printStackTrace();                       }                        try {                            Thread.sleep(2000);                       } catch (InterruptedException e) {                            e.printStackTrace();                       }                   }               }); ​           }       });  

     

    Processed: 0.016, SQL: 9