RabbitMq系列(九):主题交换Topic Exchange

    技术2022-07-11  126

    系列文章

    RabbitMq系列(一):服务器搭建

    RabbitMq系列(二):最简单的例子

    RabbitMq系列(三):工作队列

    RabbitMq系列(四):消息确认和持久性

    RabbitMq系列(五):公平派遣

    RabbitMq系列(六):交换类型以及例子

    RabbitMq系列(七):直接交换Direct exchange

    RabbitMq系列(八):扇出交换Fanout Exchange

    RabbitMq系列(九):主题交换Topic Exchange

    RabbitMq系列(十):标头交换Headers exchange

    RabbitMq系列(十一):远程调用RPC

     

    目录

    前言

    实际演示

    示例1

    示例2

    总结


    前言

    主题交换是四个主要交换模式中,变化最多、最灵活的交换模式。主题交换的消息传递主要依赖于路由键,路由键不同,交换机匹配的方式不同。

    主题交换的主要关注点在路由键,路由键通常是由零个或者多个有意义的单词通过点号( . )分隔拼接而成,类似于: topic.route.one topic.routetopic 等等,路由键最多只能有255个字节。

    主题交换中一般的路由键规则跟直接交换路由规则大致相同,都是直接比较是否相等,但是主题交换有特殊的路由键规则。

    主题交换中有个两个特殊的匹配符号: * : 匹配任意一个单词

    # :匹配零个或者多个单词

    不带两个特殊符号的路由键匹配规则的同直接交换匹配规则一样,带两个特殊符号的类似于模糊匹配,只带单个 # 的就是扇出交换啦,带特殊符号的路由键类似于: topic.#.#*  topic.route.#.#topic.route.*#  topic.route.one # * 等等

    实际演示

    主题交换作为四个主要交换中最灵活的交换,路由键的定义非常丰富,所以这里采用三个消费者和一个生产者来进行测试,路由键从简单的复杂。

    示例1

    环境 身份交换机队列routeKey消费者1topic.exchangeamq.gen-7V2hbBS1AIyJkSOSCuWdRAtopic.route.two消费者2topic.exchangeamq.gen-P4DJ32qk9nvX3UE7PM4T_Atopic.route.*消费者3topic.exchangeamq.gen-X_JrOM4KZP_KvMk3IcaH1gtopic.#生产者topic.exchangetopic.route.one 消费者1  /** * 消费者1 * @Tag 主题交换Topic exchange */ public class MQConsumerOne { public static void main(String[] args) { try { consumerMsg("topic.exchange","topic.route.two"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchangeName,String routeKey) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("10.0.10.3"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //绑定交换机 channel.queueBind(queueName,exchangeName,routeKey); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 消费者2 /** * 消费者2 * @Tag 主题交换Topic exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("topic.exchange","topic.route.*"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchangeName,String routeKey) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("10.0.10.3"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //绑定交换机 channel.queueBind(queueName,exchangeName,routeKey); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 消费者3 /** * 消费者3 * @Tag 主题交换Topic exchange */ public class MQConsumerThree { public static void main(String[] args) { try { consumerMsg("topic.exchange","topic.#"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchangeName,String routeKey) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("10.0.10.3"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //绑定交换机 channel.queueBind(queueName,exchangeName,routeKey); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 唯一的生产者 /** * 生产者 * @Tag 主题交换Topic exchange */ public class MQProducer { public static void main(String[] args) { try { producerMsg("topic.exchange","topic.route.one","想要我的消息?就看你有没得这个本事!"); } catch (Exception e) { e.printStackTrace(); } } public static void producerMsg(String exchangeName,String routeKey,String msg) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("10.0.10.3"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明临时交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,false); //创建生产者 channel.basicPublish(exchangeName,routeKey,null,msg.getBytes("UTF-8")); //关闭连接 channel.close(); connection.close(); } } 完成代码逻辑过后,分别启动消费者1、消费者2、消费者3和唯一的生产者,观察消息的收发情况。这里消费者2、消费者3收到消息,消费者1未收到消息。

    分析一下:

                1)  消费者1的 routeKey  topic.route.two

                2)  消费者2的 routeKey : topic.route.*

                3)消费者3的 routeKey : topic.#

                4)唯一生产者的 routeKey : topic.route.one

    咱们一个一个来,往下看👇

    消费者1:消费者1的 routeKey 和生产者的 routeKey 都没有特殊字符,进行完全匹配,最后一个单词 two 和 one 不相等,所以两个路由键不相等,消费者1不应该收到消息,分析和结果一致

    消费者2:消费者2的 routeKey带有特殊字符——*,*  匹配任意一个单词,这里消费者 routeKey 同生产者 routeKey 进行模糊匹配,前两个单词  topic.route 完全匹配相等,* 同 one 模糊匹配成功,消费者2该收到消息,分析和结果一致

    消费者3:消费者3的 routeKey 匹配规则同消费者2一样,这里带有特殊字符——#,# 匹配零个或者多个字符,消费者3的路由键routeKey 同生产者 routeKey 第一个单词完全匹配, # 模糊匹配生产者后两个单词 route.one,消费者3应该收到消息,分析和结果一致

    主题交换的匹配模式跟正则表达式差不多,还更简单,变化更少,下面进行更丰富的匹配测试

    示例2

    环境 身份交换机队列routeKey消费者1topic.exchangeamq.gen-9nxs1twFiLAg1Wf6cxiz7w topic.route.*#消费者2topic.exchangeamq.gen-I4M0gTQleiD46zd5M5cyOQ #消费者3topic.exchangeamq.gen-t1eW32lJGVWAPUkuQy0w4g topic.#.##生产者topic.exchangetopic.route.# 消费者1 /** * 消费者1 * @Tag 主题交换Topic exchange */ public class MQConsumerOne { public static void main(String[] args) { try { consumerMsg("topic.exchange","topic.route.*#"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchangeName,String routeKey) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //绑定交换机 channel.queueBind(queueName,exchangeName,routeKey); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 消费者2 /** * 消费者2 * @Tag 主题交换Topic exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("topic.exchange","#"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchangeName,String routeKey) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //绑定交换机 channel.queueBind(queueName,exchangeName,routeKey); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 消费者3 /** * 消费者3 * @Tag 主题交换Topic exchange */ public class MQConsumerThree { public static void main(String[] args) { try { consumerMsg("topic.exchange","topic.#.##"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchangeName,String routeKey) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //绑定交换机 channel.queueBind(queueName,exchangeName,routeKey); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 唯一生产者 /** * 生产者 * @Tag 主题交换Topic exchange */ public class MQProducer { public static void main(String[] args) { try { producerMsg("topic.exchange","topic.route.#","想要我的消息?就看你有没得这个本事!"); } catch (Exception e) { e.printStackTrace(); } } public static void producerMsg(String exchangeName,String routeKey,String msg) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明临时交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,false); //创建生产者 channel.basicPublish(exchangeName,routeKey,null,msg.getBytes("UTF-8")); //关闭连接 channel.close(); connection.close(); } } 分别启动消费者1、2、3和生产者,观察消息的收发情况。这里只有消费者2收到消息,消费者1、3未收到消息。

    分析一下:

                 1)消费者1的 routeKey:topic.route.*#

                 2)消费者2的 routeKey:#

                 3)消费者3的 routeKey:topic.#.##

                 4)生产者的 routeKey:topic.route.#

    消费者1分析:消费者1的 routeKey 中带有特殊字符,但是并非是在每一个节点上单独成一个单词,对于消费者而言,当两个特殊字符( * 和 # )组合在一起成一个单词的时候,这个单词就是一个普通的、不具备任何模糊匹配能力的单词。此外,不要被生产者 routeKey 中的 # 字符迷惑,生产者的任何单词都不具备模糊匹配能力,仅仅是一个具备某种意义的字符组合而已,这里 topic.route.*# 和 topic.route.# 进行匹配,前面两个单词完全匹配成功,消费者1、生产者最后一个单词 *# 和 #  完全匹配失败,消费者1不该收到消息,分析和结果一致

    消费者2分析:消费者2单独一个路由键 # 将队列绑定到 交换机 exchange 上面,# 号匹配零个或者多个单词,单个 # 形成的路由键 routeKey 类似于广播模式,所以,这里可以不看生产者的 routeKey,只要发送到 topic.exchange 这个交换机的消息,消费者2绑定的队列都会收到,而监听这个队列的消费者自然而言就会拿到这些消息,分析和结果一致

    消费者3分析:这里消费者3的 routeKey—— topic.#.##,第一个单词 topic 和生产者的第一个单词完全匹配,第二个单词 # 和 route 模糊匹配,但是第三个单词 ## 根据消费者1的分析过程(看上面👆),同生产者的第三个单词 # 完全匹配失败,所以消费者3不该收到消息,分析同结果一致

     

    总结

    主题交换路由键匹配模式中,不带特殊字符的匹配规则同直接交换一样,进行完全匹配带特殊字符 * 和 # 的路由键进行模糊匹配,* 匹配任意一个单词,# 匹配零个或者多个单词两个特殊字符组合在一起形成的路由键,其单独进行模糊匹配能力失效,就是单纯的组合性单词生产者的路由键不具备任何模糊匹配能力,更重要的是,消息具体的筛选和匹配规则,决定方是消费者——这个决定是否需要消费消息的人单个 # 形成的路由键 routeKey 类似于广播模式,进行筛选不关注生产者的路由键路由键有最大长度限制,最多255个字节

    除了上面路由规则之外,觉得有必要记录下来的还有,amq.gen-7V2hbBS1AIyJkSOSCuWdRA 这种格式的消费者名称是系统帮我们生成的,这里主要是通过下面调用方式产生的,当然,它有一点点限制。

    //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue();

     

     

    Processed: 0.009, SQL: 9