RabbitMq系列(十):标头交换Headers exchange

    技术2022-07-11  142

    系列文章

    RabbitMq系列(一):服务器搭建

    RabbitMq系列(二):最简单的例子

    RabbitMq系列(三):工作队列

    RabbitMq系列(四):消息确认和持久性

    RabbitMq系列(五):公平派遣

    RabbitMq系列(六):交换类型以及例子

    RabbitMq系列(七):直接交换Direct exchange

    RabbitMq系列(八):扇出交换Fanout Exchange

    RabbitMq系列(九):主题交换Topic Exchange

    RabbitMq系列(十):标头交换Headers exchange

    RabbitMq系列(十一):远程调用RPC

     

    目录

    前言

    实际演示

    示例1

    总结


    前言

    标头交换是四个主要交换模式中较为特殊的一种模式,她的路由规则不是基于路由键 routeKey,而是基于一种新的方式——Headers。

    标头交换的路由规则主要由Headers头部所带的参数来决定,Headers是一个Map集合,她里面包含了一个特殊参数和零个或多个一般参数。

    特殊参数是 x-match,x-match 有两个值:any 和 all

    any:表示匹配任意一个参数与其值

    all:表示匹配所有参数与其值

    这里的匹配,主要是消息携带的参数与消费者定义的参数比较,占据主动的是消费者,最终解释权归属消费者。

    需要注意的是:x- 开头的参数都不算匹配项

    实际演示

    标头交换的规则类似于主题交换,主要起决定性作用的参数不一样,一个是Header,另一个是routeKey,相对于主题交换,变化没那么多,这里采用两个消费者和一个生产者来测试。

    示例1

    环境 身份交换机队列routeKey消费者1header.exchangeamq.gen-Z7hxEJzQ-gnS0mxbQ7bHiQ空字符串消费者2header.exchangeamq.gen-lT2NBCuLpX-1Oh1Sjcd1CQ空字符串生产者header.exchange空字符串 消费者1  /** * 消费者1 * @Tag 主题交换Topic exchange */ public class MQConsumerOne { public static void main(String[] args) { try { consumerMsg("header.exchange"); } catch (Exception e) { e.printStackTrace(); } } /** * 构建参数 */ public static Map buildArgs() { HashMap<String,String> args = new HashMap<>(); args.put("name","张三"); args.put("age","32"); args.put("x-match","any"); return args; } /** * 消费者逻辑 * @param exchangeName * @throws IOException * @throws TimeoutException */ public static void consumerMsg(String exchangeName) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //构建参数 Map args = buildArgs(); //绑定交换机 channel.queueBind(queueName,exchangeName,"",args); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 消费者2 /** * 消费者2 * @Tag 主题交换Topic exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("header.exchange"); } catch (Exception e) { e.printStackTrace(); } } /** * 构建参数 */ public static Map buildArgs() { HashMap<String,String> args = new HashMap<>(); args.put("name","李四"); args.put("age","32"); args.put("x-match","all"); return args; } /** * 消费者逻辑 * @param exchangeName * @throws IOException * @throws TimeoutException */ public static void consumerMsg(String exchangeName) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS); //声明队列(临时排它队列) String queueName = channel.queueDeclare().getQueue(); //构建标头参数 Map map = buildArgs(); //绑定交换机 channel.queueBind(queueName,exchangeName,"",map); //消费消息(主动确认) channel.basicConsume(queueName,true,(tag,msg)->{ System.out.println(new String(msg.getBody(),"UTF-8")); },(cancel)->{ }); /* //关闭连接 channel.close(); connection.close();*/ } } 唯一生产者 /** * 生产者 * @Tag 主题交换Topic exchange */ public class MQProducer { public static void main(String[] args) { try { producerMsg("header.exchange","想要我的消息?就看你有没得这个本事!"); } catch (Exception e) { e.printStackTrace(); } } /** * 构建参数 */ public static Map buildArgs() { HashMap<String,String> args = new HashMap<>(); args.put("name","李四"); args.put("age","32"); args.put("weight","180"); return args; } /** * 生产者 * @param exchangeName * @param msg * @throws IOException * @throws TimeoutException */ public static void producerMsg(String exchangeName,String msg) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //设置连接端口 connectionFactory.setPort(5672); //设置连接的虚拟机 connectionFactory.setVirtualHost("mqtest"); //设置连接用户 connectionFactory.setUsername("mqtest"); //设置连接用户密码 connectionFactory.setPassword("test123"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明临时交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS,false); //构建标头参数 Map map = buildArgs(); AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().headers(map).build(); //创建生产者 channel.basicPublish(exchangeName,"",basicProperties,msg.getBytes("UTF-8")); //关闭连接 channel.close(); connection.close(); } } 完成代码逻辑过后,分别启动消费者1、消费者2和唯一的生产者,观察消息的收发情况。这里消费者1、消费者2都收到了消息

    分析一下:

                1)  消费者1的 Headers: 

                   name=张三                age=32                x-match=any

                2)  消费者2的 Headers:

                  name=李四               age=32               x-match=all

                3)唯一生产者的 Headers:

                  name=李四               age=32               weight=180

    咱们一个一个来,往下看👇

    消费者1:消费者1的Headers中特殊参数 x-match 为any,表示只要来的消息只要任意匹配她的任意一个参数与其值就可以了,满足一个就收了这个消息。生产者的Headers集合中刚好有一个age与消费者1的age参数匹配,消费者1该收到消息。

    消费者2:消费者1的Headers中特殊参数 x-match 为all,表示要全部满足消费者1的参数要求,她才会将对应的消息收下,要求还不小,但是咱不慌。这里生产者的Headers集合中有两个参数满足消费者2的要求,name和age两个的参数与值都匹配,看上去还差 x-match 没匹配到,但是上面提过,x- 开头的不算进匹配项,特殊参数匹配的时候略过,忽略 x-match 后这里全部匹配,消费者2该收到消息。

     

    总结

    标头交换路由规则不是基于 routeKey,而是 headershearers有一个特殊参数,该特殊参数有两个值:any 和 all特殊情况1:当消费者没有参数的时候,消息都会被路由到该消费者特殊情况2:当消费者只有 x- 开头的特殊匹配参数时,根据 x- 开头的特殊匹配项匹配

     

     

     

     

    Processed: 0.020, SQL: 9