springboot对接rabbitMQ开发简单示例

    技术2022-07-10  141

    1.三种Exchange类型

    (1)fanout(扇形)

    把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,

    生产者:定义1个FanoutExchange及多个Queue,并把Queue绑定到该Exchange,发送时发送到该Exchange中即可,routingKey传空值,消息会同时到这多个Queue中

    消费者:监听指定Queue并处理

    (2)direct(直连)

    把消息路由到那些binding key与routing key完全匹配的Queue中

    生产者:定义1个DirectExchange及1个或多个Queue,并把Queue通过binding key(其实代码里是routing key)绑定到该Exchange,发送时发送到该Exchange中,并指定routingKey,消息到这routingKey相同的1个或多个Queue中

    消费者:监听指定Queue并处理

    (3)topic(主题)

    direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

    routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”binding key与routing key一样也是句点号“. ”分隔的字符串binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

    生产者:定义1个TopicExchange及1个或多个Queue,也是把Queue通过binding key(其实代码里是routing key)绑定到该Exchange,但此时的binding key是可以模糊的,发送时发送到该Exchange中,并指定routingKey(精确的),消息到这匹配binding key的1个或多个Queue中

    消费者:监听指定Queue并处理

    注意:多个消费者同时监听同一个Queue,消息会平均分摊给多个消费者

    2.开发示例-生产者

    springboot开发

    (1)引入jar依赖

    <!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    (2)引入配置(因为本次使用了HAProxy代理,否则默认端口应该5672)

    spring: application: name: rabbitmq-provider rabbitmq: host: 192.168.166.159 port: 9189 username: admin password: admin

    (3)fanout(扇形)类型的代码示例

    ①定义队列及FanoutExchange

    import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { /** * 创建三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 因为是扇型交换机, 路由键无需配置,配置也不起作用 */ @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }

    ②controller示例

    import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @GetMapping("/sendFanoutMessage") public String sendFanoutMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: testFanoutMessage "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("fanoutExchange", null, map); return "ok"; } }

    (4)direct(直连型)类型的代码示例

    ①定义队列及DirectExchange

    import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue1() { return new Queue("TestDirectQueue1",true); //true 是否持久 } @Bean public Queue TestDirectQueue2() { return new Queue("TestDirectQueue2",true); //true 是否持久 } @Bean public Queue TestDirectQueue3() { return new Queue("TestDirectQueue3",true); //true 是否持久 } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect1() { return BindingBuilder.bind(TestDirectQueue1()).to(TestDirectExchange()).with("TestDirectRouting"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect2() { return BindingBuilder.bind(TestDirectQueue2()).to(TestDirectExchange()).with("TestDirectRouting"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting3 @Bean Binding bindingDirect3() { return BindingBuilder.bind(TestDirectQueue3()).to(TestDirectExchange()).with("TestDirectRouting3"); } }

    ②controller示例

    import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message: hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange //因TestDirectQueue1、TestDirectQueue2队列都通过TestDirectRouting绑定到TestDirectExchange,所以此处会发送到TestDirectQueue1、TestDirectQueue2 //rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); //因TestDirectQueue3队列都通过TestDirectRouting3绑定到TestDirectExchange,所以此处会发送到TestDirectQueue3 rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting3", map); return "ok"; } }

    (4)direct(直连型)类型的代码示例

    ①定义队列及TopicExchange

    import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue1() { return new Queue("TestDirectQueue1",true); //true 是否持久 } @Bean public Queue TestDirectQueue2() { return new Queue("TestDirectQueue2",true); //true 是否持久 } @Bean public Queue TestDirectQueue3() { return new Queue("TestDirectQueue3",true); //true 是否持久 } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect1() { return BindingBuilder.bind(TestDirectQueue1()).to(TestDirectExchange()).with("TestDirectRouting"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect2() { return BindingBuilder.bind(TestDirectQueue2()).to(TestDirectExchange()).with("TestDirectRouting"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting3 @Bean Binding bindingDirect3() { return BindingBuilder.bind(TestDirectQueue3()).to(TestDirectExchange()).with("TestDirectRouting3"); } }

    ②controller示例

    import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @GetMapping("/sendTopicMessage1") public String sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", messageId); manMap.put("messageData", messageData); manMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return "ok"; } @GetMapping("/sendTopicMessage2") public String sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", messageId); womanMap.put("messageData", messageData); womanMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap); return "ok"; } }

    3.开发示例-消费者

    springboot开发

    (1)引入jar依赖

    <!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    (2)引入配置(因为本次使用了HAProxy代理,否则默认端口应该5672)

    spring: application: name: rabbitmq-provider rabbitmq: host: 192.168.166.159 port: 9189 username: admin password: admin

    (3)直接监听对应队列即可,例如监听队列名叫fanout.A的消息

    import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString()); } }
    Processed: 0.012, SQL: 9