RabbitMq系列(八):扇出交换Fanout Exchange

    技术2022-07-11  122

    系列文章

    RabbitMq系列(一):服务器搭建

    RabbitMq系列(二):最简单的例子

    RabbitMq系列(三):工作队列

    RabbitMq系列(四):消息确认和持久性

    RabbitMq系列(五):公平派遣

    RabbitMq系列(六):交换类型以及例子

    RabbitMq系列(七):直接交换Direct exchange

    RabbitMq系列(八):扇出交换Fanout Exchange

    RabbitMq系列(九):主题交换Topic Exchange

    RabbitMq系列(十):标头交换Headers exchange

    RabbitMq系列(十一):远程调用RPC

     

    目录

    前言

    实际演示

    示例1

    示例2

    总结


    前言

    扇出交换Fanout Exchang是一种不依赖于路由键的交换方式,当消息到达交换机的时候,交换机会将消息副本分别路由到绑定在它上面的所有队列中。 扇出交换是一种适用于系统公告、大事件通知等需要广播消息的模式,它也通常也被称为广播模式。

    实际演示

    广播模式路由规则是相对简单的 ,我们采用两个消费者一个生产者的方式来进行简单的演示

    示例1

    环境 身份交换机队列routeKey消费者1fanout.exchangefanout.queue空字符串消费者2fanout.exchangefanout.queue.onefanout.test生产者fanout.exchangequeue.route.one 消费者1  ** * 消费者1 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerOne { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue",""); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("10.0.10.3"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 消费者2 ** * 消费者2 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue.one","fanout.test"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("10.0.10.3"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 唯一生产者  /** * 生产者 * * @Tag 扇出交换 Fanout exchange */ public class MQProducer { public static void main(String[] args) { try { consumerMsg("fanout.exchange","queue.route.one","接收到这条消息的的人,你通过考核了!"); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String routeKey,String msg) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("10.0.10.3"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //创建生产者,发送消息 channel.basicPublish(exchange,routeKey,null,msg.getBytes()); //关闭连接 channel.close(); connection.close(); } } 相关逻辑和声明完成后,分别启动消费者1、消费者2和唯一的生产者,这里生产者没有写重复发送消息逻辑,所以这里模拟发送重复消息,生产者启动3次,查看结果如下:

    分析一下:

             1)因为扇出交换跟路由键没有关系,所以这里忽略两个消费者和生产者的路由键

             2)扇出交换只跟绑定队列有关系,这里有两个队列 fanout.queue 和  fanout.queue.one 通过 bingding 绑定在交换机上面,消息到达时,交换机将通过路由发送到两个队列中,所以消费者1和消费者2都收到了消息。

    扇出模式也可以直接查看有哪些绑定队列——通过命令或者管理UI

    示例2

    这里声明的方式更加贴合扇出交换的定义

    环境 身份交换机队列routeKey消费者1fanout.exchangefanout.queue空字符串消费者2fanout.exchangefanout.queue.one空字符串生产者fanout.exchange空字符串 消费者1  /** * 消费者1 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerOne { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue",""); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueDeclare() channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 消费者2 /** * 消费者2 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue.one",""); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 唯一生产者 /** * 消费者2 * * @Tag 扇出交换 Fanout exchange */ public class MQConsumerTwo { public static void main(String[] args) { try { consumerMsg("fanout.exchange","fanout.queue.one",""); } catch (Exception e) { e.printStackTrace(); } } public static void consumerMsg(String exchange,String queue,String routeKey) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接地址 connectionFactory.setHost("192.168.239.128"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,false); //声明队列 channel.queueDeclare(queue,false,false,true,null); //绑定队列 channel.queueBind(queue,exchange,routeKey,null); //创建消费者,消费消息 channel.basicConsume(queue, true, (consumerTag, message) -> { //消费消息 String msg = new String(message.getBody(), "UTF-8"); System.out.println(msg); }, (consumerTag) -> { System.out.println(consumerTag); }); //关闭连接 /* channel.close(); connection.close();*/ } } 同实列1一样,分别启动后查看结果:

    这里分析省略

    总结

    扇出模式定义一定要 exchangeType = fanout扇出模式可以不用在意路由键 routeKey 但是不能为null请注意这里定义的都是临时队列,啥叫临时队列?临时存在随时GG的队列,重启服务器就消失特殊需要临时队列可以采用简洁的写法(如果队列名不重要的话) //声明一个临时的排他的队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列 channel.queueBind(queueName,exchange,routeKey);

     

    Processed: 0.009, SQL: 9