RabbitMq系列(一):服务器搭建
RabbitMq系列(二):最简单的例子
RabbitMq系列(三):工作队列
RabbitMq系列(四):消息确认和持久性
RabbitMq系列(五):公平派遣
RabbitMq系列(六):交换类型以及例子
RabbitMq系列(七):直接交换Direct exchange
RabbitMq系列(八):扇出交换Fanout Exchange
RabbitMq系列(九):主题交换Topic Exchange
RabbitMq系列(十):标头交换Headers exchange
RabbitMq系列(十一):远程调用RPC
前言
实际演示
示例1
示例2
总结
扇出交换Fanout Exchang是一种不依赖于路由键的交换方式,当消息到达交换机的时候,交换机会将消息副本分别路由到绑定在它上面的所有队列中。 扇出交换是一种适用于系统公告、大事件通知等需要广播消息的模式,它也通常也被称为广播模式。
广播模式路由规则是相对简单的 ,我们采用两个消费者一个生产者的方式来进行简单的演示
1)因为扇出交换跟路由键没有关系,所以这里忽略两个消费者和生产者的路由键
2)扇出交换只跟绑定队列有关系,这里有两个队列 fanout.queue 和 fanout.queue.one 通过 bingding 绑定在交换机上面,消息到达时,交换机将通过路由发送到两个队列中,所以消费者1和消费者2都收到了消息。
扇出模式也可以直接查看有哪些绑定队列——通过命令或者管理UI这里声明的方式更加贴合扇出交换的定义
环境 身份交换机队列routeKey消费者1fanout.exchangefanout.queue空字符串消费者2fanout.exchangefanout.queue.one空字符串生产者fanout.exchange无空字符串 消费者1 /** * 消费者1 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerOne { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue",""); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueDeclare() channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 消费者2 /** * 消费者2 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue.one",""); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 唯一生产者 /** * 消费者2 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue.one",""); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 同实列1一样,分别启动后查看结果:这里分析省略