RabbitMQ 教程

    技术2022-07-10  148

    RabbitMQ 教程

    文章目录

    RabbitMQ 教程消息中间件安装及管理windows安装:RabbitMQ Linux安装Mac安装 基本概念主要概念Exchange的类型RabbitMQ的工作模式及代码示例简单模式 Simple2.工作模式 work (资源竞争消费)3.发布订阅 publish/subscribe (广播)4.路由 routing5.主题订阅 topic 消息确认机制(ack)spring-boot连接RabbitMQ 集群搭建

    消息中间件

    消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦(降偶)、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。

    基本模型: 消息发布者 --> 消息中间件 —> 消息消费者

    同步调用是 A调用B后阻塞等待,此时A的后续操作是不能进行。异步调用的一种实现方式是引入MQ,A将调用信息发到消息中间件,然后B从消息中间件获取到消息内容执行。此时A不再需要等待B。

    这个前提下,就可以实现异步调用,冗余存储(指可以在MQ中多存一些数据,等待B来消费,MQ可以看作是个缓冲区)、流量削峰(如果A的调用请求数量大于B的消费能力,则消息中间件可以通过冗余存储来缓冲,B可以按照以比较慢的速度来处理请求)等等功能。

    安装及管理

    RabbitMQ是使用Erlang开发的,所以使用RabbitMQ需要先安装Erlang运行时环境

    windows安装:

    RabbitMQ

    先安装erlang并配置环境变量

    配置ERLANG环境变量(这里配置的是ERLANG,不是ELANG,PDF有错):

    1、配置home路径,ERLANG_HOME=E:\erl10.2

    2、配置到path路径下:%ERLANG_HOME%\bin;

    cmd 输入 erl

    安装rabbitmq并配置环境变量

    1、配置home路径,RABBITMQ_HOME=E:\RabbitMQ Server\rabbitmq_server\-3.7. 12

    2、配置到path路径下:%RABBITMQ_HOME%\sbin;

    激活rabbitmq_management

    新开CMD,键入如下命令: rabbitmq-plugins enable rabbitmq_management

    启动RabbitMQ服务

    net start RabbitMQ

    访问 http://localhost:15672/ 登录页的账户名密码都是guest,登陆进去就是成功了

    rabbitmq启动方式有2种

    1、以应用方式启动

    rabbitmq-server -detached 后台启动

    Rabbitmq-server 直接启动,如果你关闭窗口或者需要在改窗口使用其他命令时应用就会停止

    关闭:rabbitmqctl stop

    2、以服务方式启动(安装完之后在任务管理器中服务一栏能看到RabbtiMq)

    rabbitmq-service install 安装服务

    rabbitmq-service start 开始服务

    Rabbitmq-service stop 停止服务

    Rabbitmq-service enable 使服务有效

    Rabbitmq-service disable 使服务无效

    rabbitmq-service help 帮助

    当rabbitmq-service install之后默认服务是enable的,如果这时设置服务为disable的话,rabbitmq-service start就会报错。

    当rabbitmq-service start正常启动服务之后,使用disable是没有效果的

    关闭:rabbitmqctl stop

    3、Rabbitmq 管理插件启动,可视化界面

    rabbitmq-plugins enable rabbitmq_management 启动

    rabbitmq-plugins disable rabbitmq_management 关闭

    4、Rabbitmq节点管理方式

    Rabbitmqctl

    详见rabbitmq安装和使用方法.pdf

    Linux安装

    Centos 6.5 32位太老,官方支持不佳。要求换成64位,再执行以下操作

    # 1)使用packagecloud的仓库来安装,最省事的方案 curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash # 2)安装erlang,-y命令表示默认同意 yum install -y erlang # 3)下载安装rabbitmq-server #3.1 下载要用wget所以先安装wget yum install wget #3.2 下载 wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/6/rabbitmq-server-3.7.23-1.el6.noarch.rpm/download.rpm #3.3 先添加epel仓库,避免相关依赖包socat找不到 yum -y install epel-release #3.4 安装rabbitmq-server yum install -y rabbitmq-server-3.7.23-1.el6.noarch.rpm # 启用管理插件 rabbitmq-plugins enable rabbitmq_management # 开机启动 chkconfig rabbitmq-server on # 启动rabbitmq service rabbitmq-server start # 无法远程访问的办法 找到这个文件rabbit.app /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.7/ebin/rabbit.app 将:{loopback_users, [<<”guest”>>]}, 改为:{loopback_users, []}

    Mac安装

    https://www.rabbitmq.com/install-homebrew.html

    brew install rabbitmq

    启动:rabbitmq-server

    管理控制台:http://localhost:15672/#/

    基本概念

    主要概念

    RabbitMQ实现了AMQP(高级消息队列协议),下面介绍AMQP的一些基本概念

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-525lUesv-1593570284700)(D:\sync\docs\讲课\Java开发课程\phrase4\rabbit\rabbit0.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zxV375sB-1593570284704)(D:\java\rabbitMQ\rabbit0.png)]

    Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

    Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。

    Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    Routing key 就是路由键,服务器会根据路由键将消息从交换器路由到队列上去。

    Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

    Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    Connection 网络连接,比如一个TCP连接。

    Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

    Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

    Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

    Broker 表示消息队列服务器实体。

    参考自:https://www.jianshu.com/p/79ca08116d57

    简单来说,发布者将消息发给 交换器,交换器根据binding把信息路由到相应队列,队列里的消息会发给消费者消息。

    Exchange的类型

    Exchange 主要用于将消息发送到指定的队列。

    Exchange有多个种类,常用的有direct,fanout,topic。前三种类似集合对应关系那样,(direct)1:1,(fanout)1:N,(topic)N:1

    direct: 1:1类似完全匹配

    fanout:1:N 可以把一个消息并行发布到多个队列上去,简单的说就是,当多个队列绑定到fanout的交换器,那么交换器一次性拷贝多个消息分别发送到绑定的队列上,每个队列有这个消息的副本。

    topic N:1 ,多个交换器可以路由消息到同一个队列。根据模糊匹配,比如一个队列的routing key 为*.test ,那么凡是到达交换器的消息中的routing key 后缀.test都被路由到这个队列上。

    RabbitMQ的工作模式及代码示例

    简单模式 Simple

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WsuUDW0a-1593570284708)(rabbit-工作模式1.jpg)]

    打个比方,寄信的例子,你将信件投入邮筒,那非常确定邮递员同志会帮你将信取走并投递到目的地。

    当然消息中间件投递的是信息而不是邮件。生产者就是写信的人,队列就是邮筒,rabbitMQ就是邮递员,消费者就是收件人。

    消息产生消息,将消息放入队列

    消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

    2.工作模式 work (资源竞争消费)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WQM7vBNH-1593570284711)(rabbit-工作模式2.jpg)]

    消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

    通过 设置prefetchCount 指定每个消费者从队列预取的消息数量(消费者的处理能力),如果autoAck设为false,需要每次手动确认,多个worker协同工作。

    3.发布订阅 publish/subscribe (广播)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d2RMHtAC-1593570284713)(rabbit-工作模式3.jpg)]

    每个消费者监听自己的队列;生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。交换器为FANOUT。

    4.路由 routing

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uwYastIi-1593570284715)(rabbit-工作模式4.jpg)]

    1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

    2.根据业务功能定义路由字符串

    3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

    4.业务场景:

    error 通知;

    EXCEPTION;错误通知的功能;

    传统意义的错误通知;

    客户通知;

    利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

    5.主题订阅 topic

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tRaYcTcJ-1593570284717)(rabbit-工作模式5.jpg)]

    * # 代表通配符,* 代表一个单词,#代表零个或多个单词

    路由功能添加模糊匹配

    消息产生者产生消息,把消息交给交换机

    交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

    消息确认机制(ack)

    消息确认机制分为 发布者确认 及 消费者确认

    spring-boot连接RabbitMQ

    订单完成,发送成功消息,不同程序监听和处理

    计算会员等级记账…

    步骤:

    创建spring-boot项目,选择Spring for RabbitMQ spring-web lombok

    配置application.yml

    spring: rabbitmq: host: localhost port: 5672 username: guest password: guest

    创建rabbitmq的配置类

    package com.woniuxy.rabbit.rabbitboot32.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 发布者: 交换机+队列+绑定 */ @Configuration public class RabbitMQConfig { public static final String EXCHANGE = "order.finish.exchange"; public static final String QUEUE = "order.finish.queue"; //创建交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange(EXCHANGE); } //创建队列 @Bean public Queue queue(){ return new Queue(QUEUE); } //将队列绑定到交换机 @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(topicExchange()).with("order.finish.*"); } }

    发送消息

    @Autowired private RabbitTemplate rabbitTemplate; // Map<String,String> data = new HashMap<>(); data.put("status","success"); data.put("orderNo",orderNo); //发送时,指定交换机,routing-key rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"order.finish.success",data); return data;

    接收消息

    配置类中添加 @Autowired private CachingConnectionFactory cachingConnectionFactory; //接收消息的listener @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(cachingConnectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.NONE); factory.setPrefetchCount(1); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); return factory; }

    接收使用@RabbitListener实现

    package com.woniuxy.rabbit.rabbitboot32consumer.component; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @Slf4j public class OrderFinishListener { @RabbitListener(queues = "order.finish.queue") public void recieveOrderFinish(Map<String,String> data){ log.info("接收到消息:{}",data); } }

    集群搭建

    Processed: 0.013, SQL: 12