系列文章
RabbitMq系列(一):服务器搭建
RabbitMq系列(二):最简单的例子
RabbitMq系列(三):工作队列
RabbitMq系列(四):消息确认和持久性
RabbitMq系列(五):公平派遣
RabbitMq系列(六):交换类型以及例子
RabbitMq系列(七):直接交换Direct exchange
RabbitMq系列(八):扇出交换Fanout Exchange
RabbitMq系列(九):主题交换Topic Exchange
RabbitMq系列(十):标头交换Headers exchange
RabbitMq系列(十一):远程调用RPC
目录
前言
实际演示
示例1
总结
前言
标头交换是四个主要交换模式中较为特殊的一种模式,她的路由规则不是基于路由键 routeKey,而是基于一种新的方式——Headers。
标头交换的路由规则主要由Headers头部所带的参数来决定,Headers是一个Map集合,她里面包含了一个特殊参数和零个或多个一般参数。
特殊参数是 x-match,x-match 有两个值:any 和 all
any:表示匹配任意一个参数与其值
all:表示匹配所有参数与其值
这里的匹配,主要是消息携带的参数与消费者定义的参数比较,占据主动的是消费者,最终解释权归属消费者。
需要注意的是:x- 开头的参数都不算匹配项
实际演示
标头交换的规则类似于主题交换,主要起决定性作用的参数不一样,一个是Header,另一个是routeKey,相对于主题交换,变化没那么多,这里采用两个消费者和一个生产者来测试。
示例1
环境
身份交换机队列routeKey消费者1header.exchangeamq.gen-Z7hxEJzQ-gnS0mxbQ7bHiQ空字符串消费者2header.exchangeamq.gen-lT2NBCuLpX-1Oh1Sjcd1CQ空字符串生产者header.exchange无空字符串
消费者1
/**
* 消费者1
* @Tag 主题交换Topic exchange
*/
public class MQConsumerOne {
public static void main(String[] args) {
try {
consumerMsg("header.exchange");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 构建参数
*/
public static Map buildArgs()
{
HashMap<String,String> args = new HashMap<>();
args.put("name","张三");
args.put("age","32");
args.put("x-match","any");
return args;
}
/**
* 消费者逻辑
* @param exchangeName
* @throws IOException
* @throws TimeoutException
*/
public static void consumerMsg(String exchangeName) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接地址
connectionFactory.setHost("192.168.239.128");
//设置连接端口
connectionFactory.setPort(5672);
//设置连接的虚拟机
connectionFactory.setVirtualHost("mqtest");
//设置连接用户
connectionFactory.setUsername("mqtest");
//设置连接用户密码
connectionFactory.setPassword("test123");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS);
//声明队列(临时排它队列)
String queueName = channel.queueDeclare().getQueue();
//构建参数
Map args = buildArgs();
//绑定交换机
channel.queueBind(queueName,exchangeName,"",args);
//消费消息(主动确认)
channel.basicConsume(queueName,true,(tag,msg)->{
System.out.println(new String(msg.getBody(),"UTF-8"));
},(cancel)->{
});
/* //关闭连接
channel.close();
connection.close();*/
}
}
消费者2
/**
* 消费者2
* @Tag 主题交换Topic exchange
*/
public class MQConsumerTwo {
public static void main(String[] args) {
try {
consumerMsg("header.exchange");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 构建参数
*/
public static Map buildArgs()
{
HashMap<String,String> args = new HashMap<>();
args.put("name","李四");
args.put("age","32");
args.put("x-match","all");
return args;
}
/**
* 消费者逻辑
* @param exchangeName
* @throws IOException
* @throws TimeoutException
*/
public static void consumerMsg(String exchangeName) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接地址
connectionFactory.setHost("192.168.239.128");
//设置连接端口
connectionFactory.setPort(5672);
//设置连接的虚拟机
connectionFactory.setVirtualHost("mqtest");
//设置连接用户
connectionFactory.setUsername("mqtest");
//设置连接用户密码
connectionFactory.setPassword("test123");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS);
//声明队列(临时排它队列)
String queueName = channel.queueDeclare().getQueue();
//构建标头参数
Map map = buildArgs();
//绑定交换机
channel.queueBind(queueName,exchangeName,"",map);
//消费消息(主动确认)
channel.basicConsume(queueName,true,(tag,msg)->{
System.out.println(new String(msg.getBody(),"UTF-8"));
},(cancel)->{
});
/* //关闭连接
channel.close();
connection.close();*/
}
}
唯一生产者
/**
* 生产者
* @Tag 主题交换Topic exchange
*/
public class MQProducer {
public static void main(String[] args) {
try {
producerMsg("header.exchange","想要我的消息?就看你有没得这个本事!");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 构建参数
*/
public static Map buildArgs()
{
HashMap<String,String> args = new HashMap<>();
args.put("name","李四");
args.put("age","32");
args.put("weight","180");
return args;
}
/**
* 生产者
* @param exchangeName
* @param msg
* @throws IOException
* @throws TimeoutException
*/
public static void producerMsg(String exchangeName,String msg) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接地址
connectionFactory.setHost("192.168.239.128");
//设置连接端口
connectionFactory.setPort(5672);
//设置连接的虚拟机
connectionFactory.setVirtualHost("mqtest");
//设置连接用户
connectionFactory.setUsername("mqtest");
//设置连接用户密码
connectionFactory.setPassword("test123");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//声明临时交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS,false);
//构建标头参数
Map map = buildArgs();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().headers(map).build();
//创建生产者
channel.basicPublish(exchangeName,"",basicProperties,msg.getBytes("UTF-8"));
//关闭连接
channel.close();
connection.close();
}
}
完成代码逻辑过后,分别启动消费者1、消费者2和唯一的生产者,观察消息的收发情况。这里消费者1、消费者2都收到了消息
分析一下:
1) 消费者1的 Headers:
name=张三 age=32 x-match=any
2) 消费者2的 Headers:
name=李四 age=32 x-match=all
3)唯一生产者的 Headers:
name=李四 age=32 weight=180
咱们一个一个来,往下看👇
消费者1:消费者1的Headers中特殊参数 x-match 为any,表示只要来的消息只要任意匹配她的任意一个参数与其值就可以了,满足一个就收了这个消息。生产者的Headers集合中刚好有一个age与消费者1的age参数匹配,消费者1该收到消息。
消费者2:消费者1的Headers中特殊参数 x-match 为all,表示要全部满足消费者1的参数要求,她才会将对应的消息收下,要求还不小,但是咱不慌。这里生产者的Headers集合中有两个参数满足消费者2的要求,name和age两个的参数与值都匹配,看上去还差 x-match 没匹配到,但是上面提过,x- 开头的不算进匹配项,特殊参数匹配的时候略过,忽略 x-match 后这里全部匹配,消费者2该收到消息。
总结
标头交换路由规则不是基于 routeKey,而是 headershearers有一个特殊参数,该特殊参数有两个值:any 和 all特殊情况1:当消费者没有参数的时候,消息都会被路由到该消费者特殊情况2:当消费者只有 x- 开头的特殊匹配参数时,根据 x- 开头的特殊匹配项匹配