消息队列-RabbitMq

    技术2022-07-11  89

    消息队列-RabbitMq


    1、AMQP 简介 AMQP (Advanced Message Queuing Protocol,高级消息队列协议)是一个线路层的协议规范, 而不是 API 规范(例如 JMS)。由于 AMQP 是一个线路层协议规范,因此它天然就是跨平台的, 就像 SMTP、 HTTP 等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过 AMQP 进行消息交互。像目前流行的 StormMQ、 RabbitMQ 等都实现了 AMQP。

    2、安装RabbitMq

    自行百度

    3、使用

    依赖

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.2.RELEASE</version> </dependency>

    配置

    spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest

    RabbitTopicConfig

    package org.sang.rabbitmq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTopicConfig { public final static String TOPICNAME = "sang-topic"; @Bean TopicExchange topicExchange() { return new TopicExchange(TOPICNAME, true, false); } @Bean Queue xiaomi() { return new Queue("xiaomi"); } @Bean Queue huawei() { return new Queue("huawei"); } @Bean Queue phone() { return new Queue("phone"); } @Bean Binding xiaomiBinding() { return BindingBuilder.bind(xiaomi()).to(topicExchange()) .with("xiaomi.#"); } @Bean Binding huaweiBinding() { return BindingBuilder.bind(huawei()).to(topicExchange()) .with("huawei.#"); } @Bean Binding phoneBinding() { return BindingBuilder.bind(phone()).to(topicExchange()) .with("#.phone.#"); } }

    首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储 和“xiaom1”有关的消息,第二个 Queue 用来存储和“huawei”有关的消息,第三个 Queue 用来存储和“phone” 有关的消息。

    • 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的“xiaomi.#'’表示消息的 routingkey 凡是以“xiaomi”开头的,都将被路由到名称为“xiaomi”的 Queue 上;第二个 Binding 中的“huawei.#”表示消息的 routingkey 凡是以“huawei”开头的,都将被路由到名 称为“huawei”的 Queue 上;第三个 Binding 中的“#.phone.#"则表示消息的 routingkey 中凡 是包含“phone”的,都将被路由到名称为“phone”的 Queue 上。


    接下来针对三个 Queue 创建三个消费

    TopicReceiver

    package org.sang.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicReceiver { @RabbitListener(queues = "phone") public void handler1(String message) { System.out.println("PhoneReceiver:"+ message); } @RabbitListener(queues = "xiaomi") public void handler2(String message) { System.out.println("XiaoMiReceiver:"+message); } @RabbitListener(queues = "huawei") public void handler3(String message) { System.out.println("HuaWeiReceiver:"+message); } }
    package org.sang.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.core.ChannelCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void headerTest() { /* Message nameMsg = MessageBuilder .withBody("hello header! name-queue".getBytes()) .setHeader("name", "sang").build(); Message ageMsg = MessageBuilder .withBody("hello header! age-queue".getBytes()) .setHeader("age", "99").build(); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);*/ rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻.."); } }

    根据 RabbitTopicConfig 中的配置,第一条消息将被路由到名称为“xiaomi”的 Queue 上, 第 二条消息将被路由到名为“huawei”的 Queue 上,第三条消息将被路由到名为“xiaomi”以及名为 “phone”的 Queue 上,第四条消息将被路由到名为“huawei'’以及名为“phone”的 Queue 上,最 后一条消息则将被路由到名为“phone”的 Queue 上。

    Processed: 0.011, SQL: 9