从AMQP协议可以看出,MessageQueue、Exchange 和 Binding 构成了 AMQP 协议的核心,下面从应用使用的角度全面的介绍如何利用 Rabbit MQ 构建消息队列以及使用过程中的注意事项。
声明 MessageQueue
在RabbitMQ 中,无论是生产者发送消息还是消费这接受消息,都首先需要声明一个 MessageQueue。这就存在一个问题:这个消息队列是生产者声明还是消费这声明?解决这个问题之前需要先明确下面两点:
消费者是无法订阅或者获取不存在的 MessageQueue 中消息消息被 Exchange 接受以后,如果没有匹配的Queue,则会被丢弃。明白上述两点后,就容易理解如果是消费者去声明 Queue,就可能会出现声明 Queue 之前生产者发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用次方案没有问题。但如果不允许消息丢弃,这就需要无论是生产者还是消费者,在发送或者接受消息前都需要去尝试建立消息队列。这里还有一点需要明确:如果客户端尝试建立一个已经存在的消息队列,RabbitMQ 不会做任何事情,并返回客户端建立成功。
 如果一个消费者在一个信道中在监听某个队列的消息,RabbitMQ 是不允许该消费者在同一个 channel 去声明其他队列的。
RabbitMQ 中可以通过 queue.declare 命令声明一个队列,可以设置该队列以下属性:
Exclusive: 排他队列,如果体格队列被声明为排他队列,该队列仅对首次声明它的链接可见,并在链接断开时自动删除。这里需要注意三点: 排他队列是基于链接可见的,同一链接的不同信道是可以同时访问同一个链接创建的排他队列”首次“,如果一个链接已经声明了一个排他队列,其他链接是不允许建立同名的排队列的,这个与普通队列不同即使该队列是持久化的,一旦链接关闭或者客户端推出,该队列都会被自动删除的,这种队列适用只限于一个客户端发送读取消息的应用场景 Auto-delete: 自动创建,如果该队列没有任何订阅的消费者,该队列会被自动删除。这种队列适用于临时队列。Durable:持久化,这个后面在讲其他选项:例如用户仅仅想查询某一个队列是否已存在,如果存在不想建立该队列,仍然可以调用 queue.declare,只不过需要讲参数 passive 设为 true,传给 queue.declare,如果该队列已存在则会返回 true;如果不存在,则会返回 error,但不会创建新的队列生产者发送的消息
 在 AMQP 模型中,Exchange 是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType 和Binding 决定了消息的路由规则。所以生产者想要发送消息首先必须声明一个 Exchange 和该 Exchange 对应的 Binding。可以通过 ExchangeDeclare 和 BindingDeclare 完成。在 RabbitMQ 中,声明一个 Exchange 需要三个参数:ExchangeName、ExchangeType 和 Durable。ExchangeName 是该 Exchange 的名字,该属性在创建 Binding 和生产者通过 publish 推送消息是需要指定。ExchangeType 是指定 Exchange 的类型,在 RabbitMQ 中有三种类型的Exchange:direct ,fanout和topic,
exchange的广播类型(交换类型)exchange_type:
fanout: 所有bind到此exchange的queue都可以接收消息,广播模式direct: 通过routing_key决定此exchange的那些匹配的queue可以接收消息,组播模式topic:所有匹配routing_key的queue可以接收消息,此时type可以是一个表达式headers: 通过headers 来决定把消息发给哪些queue,比较少用。注意⚠️:
1. 发布者在发布时,如果订阅者是在之后才订阅的,肯定是看不到之前的广播或组播的。广播或组播是实时的,就像现实中的广播和听众一样。
2.如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。
关于队列、exchange:
无论哪一种广播类型,发布者都无需要声明和指定队列,而是声明和指定exchange,同时指定广播类型。
无论哪一种广播类型,订阅者都需要使用系统随机分配的队列(可以确保唯一性,如果自定义的queue可能重名)来接收消息,并将队列绑定到exchange;同时声明和指定exchange,同时指定广播类型。
关于routing_key:
routing_key,即路由的标识符,可以为任意字符;也可以使用队列名作为标识符,但它本身的含义不是队列名。
不同类型的广播,区别在于exchange_type不同,使得不同类型的routing_key不同。
fanout类型,发布者无需要指定routing_key。direct类型,发布者需要指定routing_key,订阅者使用相同的routing_key匹配,就像分组一样。topic类型,发布者需要指定routing_key,订阅者可使用带通配符的routing_key去匹配,是比分组更精细的匹配。不同的 Exchange 会表现出不同路由行为。Durable 是该 Exchange 的持久化属性。声明一个 Binding 需要提供一个 QueueName、ExchangeName、BindingKey。下面我们分析下不同 ExchangeType 表现出的不同路由规则。
如果是 Direct 类型,则将消息中的 RoutingKey 与该 Exchange 关联所有该 Binding 中的 BindingKey 进行比较,如果相等,则发送该 Bingding 对应的 Queue 如果是 Fanout 类型,则会将消息发送给所有与 Exchange 定义过 Binding 的所有 Queues 中去,是一种广播行为 如果是 Topic 类型,则会按照正则表达式,对 RoutingKey 与 BindingKey 进行匹配,如果匹配成功,则会发送到对应的 Queue 中。消费者订阅消息
在 RbbitMQ 中消费者有2种方式获取队列中的消息:
一种是通过 basic.consume 命令,订阅某一个队列中的消息,channel 会自动在处理完上一条消息后接受下一条消息。(同一个channel 消息处理是串行的)。除非关闭 channel 或者取消订阅。否则客户端将会一直接受队列的消息。另一种方式是通过 basic.get 命令主动获取队列中的消息,但是绝对不可以通过循环调用 basic.get 来替换 basic.consume,这是因为 basic.get RabbitMQ 在实际执行的时候是首先 consume 某一个队列,然后检索第一条消息,如果是高吞吐率的消费者,最好还是建议使用 basic.consume. 如果有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,然后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。
 这里我们可以看出,消费者再接到消息以后,都需要给服务器发送一条确认命令,这个即可以在handleDelivery里显示的调用basic.ack实现,也可以在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器可以安全的删除该消息,而不是通知生产者,与RPC不同。 如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息。
 既然RabbitMQ提供了ACK某一个消息的命令,当然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,可以设置一个requeue的属性,如果为true,则消息服务器会重传该消息给下一个订阅者;如果为false,则会直接删除该消息。当然,也可以通过ack,让消息服务器直接删除该消息并且不会重传。
持久化
Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要生产者在发送消息的时候,将delivery mode设置为2,只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。
事物
 对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
Confirm机制
 使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。
 exchanges, queues, and bindings是三个基础的概念, 他们的作用是: exchanges: 是生产者发布消息的地方,queues: 是消息最终到达并被消费者接收的地方,bindings: 是消息如何从交换路由到特定队列的地方。
 Vhosts也是AMQP的一个基础概念,连接到RabbitMQ默认就有一个名为"/“的vhost可用,本地调试的时候可以直接使用这个默认的vhost.这个”/"的访问可以使用guest用户名(密码guest)访问.可以使用rabbitmqctl工具修改这个账户的权限和密码,这在生产环境是必须要关注的. 出于安全和可移植性的考虑,一个vhost内的exchange不能绑定到其他的vhost.
 可以按照业务功能组来规划vhost,在集群环境中只要在某个节点创建vhost就会在整个集群内的节点都创建该vhost.VHost和权限都不能通过AMQP协议创建,在RabbitMQ中都是使用rabbitmqctl进行创建,管理.
 消息由两部分组成: payload and label. "payload"是实际要传输的数据,至于数据的格式RabbitMQ并不关心,"label"描述payload,包括exchange name 和可选的topic tag.消息一旦到了consumer那里就只有payload部分了,label部分并没有带过来.RabbitMQ并不告诉你消息是谁发出的.这好比你收到一封信但是信封上是空白的.当然想知道是谁发的还是有办法的,在消息内容中包含发送者的信息就可以了.
 所有接收到的消息都要求发送响应消息(ACK).这里有两种方式一种是Consumer使用basic.ack明确发送ACK,一种是订阅queue的时候指定auto_ack为true,这样消息一到Consumer那里RabbitMQ就会认为消息已经得到ACK.
要注意的是这里的响应和消息的发送者没有丝毫关系,ACK只是Consumer向RabbitMQ确认消息已经正确的接收到消息,RabbitMQ可以安全移除该消息,仅此而已.
消息如何发送到队列
消息是如何发送到队列的?这就要说到AMQP bindings and exchanges. 投递消息到queue都是经由exchange完成的,和生活中的邮件投递一样也需要遵循一定的规则,在RabbitMQ中规则是通过routing key把queue绑定到exchange上,这种绑定关系即binding.消息发送到RabbitMQ都会携带一个routing key(哪怕是空的key),RabbitMQ会根据bindings匹配routing key,如果匹配成功消息会转发到指定Queue,如果没有匹配到queue消息就会被扔到黑洞.
如何发送到多个队列
消息是分发到多个队列的?AMQP协议里面定义了几种不同类型的exchange:direct, fanout, topic, and headers. 每一种都实现了一种 routing 算法. header的路由消息并不依赖routing key而是去匹配AMQP消息的header部分,这和下面提到的direct exchange如出一辙,但是性能要差很多,在实际场景中几乎不会被用到.
direct exchange routing key完全匹配才转发fanout exchange 不理会routing key,消息直接广播到所有绑定的queuetopic exchange 对routing key模式匹配exchange持久化
创建queue和exchange默认情况下都是没有持久化的,节点重启之后queue和exchange就会消失,这里需要特别指定queue和exchange的durable属性.
 无论是要发布消息还是要获取消息 ,应用程序都需要通过TCP连接到RabbitMQ.应用程序连接并通过权限认证之后就要创建Channel来执行AMQP命令.Channel是建立在实际TCP连接之上通信管道,这里之所以引入channel的概念而不是直接通过TCP链接直接发送AMQP命令,是出于两方面的考虑:建立上成百上千的TCP链接,一方面浪费了TCP链接,一方面很快会触及系统瓶颈.引入了Channel之后多个进程与RabbitMQ的通信可以在一条TCP链接上完成.我们可以把TCP类比做光缆,那么Channel就像光缆中的一根根光纤.
在发布、订阅模式中,生产者采用广播、组播,多个消费者收取。
在发布、订阅模式中,每个消费者一个队列,生产者将消息提交给exchange交换机,谁订阅了消息,exchange就将消息转发到谁的队列中(使用系统随机分配的队列)。
无需routing_key。订阅者将queue绑定在exchange之上。
消费者
import pika def on_message_callback(ch, method, properties, body): print('[x] %r' % body) connection = pika.BlockingConnection( pika.ConnectionParameters( host='localhost' ) ) channel = connection.channel() channel.exchange_declare( exchange='fanout', exchange_type='fanout' ) # exclusive=True 设置后会自动生成 queue,采用这种形式当消费者退出 queue 即被销毁 result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue print('>>>', queue_name) # 订阅:将消费者自己的 queue 绑定到 交换机上 channel.queue_bind( exchange='fanout', queue=queue_name ) channel.basic_consume( queue_name, on_message_callback ) channel.start_consuming()使用 exclusive=True 随机生成 queue
生产者
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() # 声明交换机 使用广播形式(无需声明 queue) channel.exchange_declare(exchange='fanout', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or 'hello world' # 在交换机上发送消息,即广播0 channel.basic_publish( exchange='fanout', routing_key='', body = message ) print('send message success') connection.close()采用广播形式的发布订阅模式:生产者可以不声明 queue 以及 routing_key,消息会被发送到绑定的 exchange 中,所有绑定该 exchange 的消费者都可以消费该交换机上的消息。这种方式应该先启动消费者,每一个消费者都拥有自己的一个队列去绑定 exchange(e xchange 不具备存储消息的能力)
发布者与订阅者使用相同的关键字作为routing_key使用。订阅者将queue和routing_key绑定在exchange之上。
生产者:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() # 声明交换机 使用广播形式(无需声明 queue) channel.exchange_declare(exchange='direct', exchange_type='direct') message = ' '.join(sys.argv[1:]) or 'hello world' channel.basic_publish( exchange='direct', routing_key='direct', body = message ) print('send message success') connection.close()消费者:
import pika def on_message_callback(ch, method, properties, body): print('>>>') print('[x] %r' % body) connection = pika.BlockingConnection( pika.ConnectionParameters( host='localhost' ) ) channel = connection.channel() channel.exchange_declare( exchange='direct', exchange_type='direct' ) result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue print('>>>', queue_name) # 订阅:将消费者自己的 queue 绑定到 交换机上且和生产者相同的 routing_key channel.queue_bind( exchange='direct', queue=queue_name, routing_key='direct' ) channel.basic_consume( queue_name, on_message_callback ) channel.start_consuming()