SpringBoot+RabbitMq延迟插件实现延时队列

    技术2022-07-11  126

    SpringBoot+RabbitMq延迟插件实现延时队列

    一.安装RabbitMq二.话不多说直接上代码1.导入maven依赖2.在application.yml配置文件中引入RabbitMq配置信息3.rabbitMq配置类4.消息发送确认5.创建控制器并向队列发送消息6.发送队列消息的service7.发送队列消息的service实现类8.配置队列的监听者(完成延时操作)

    一.安装RabbitMq

    链接地址:https://blog.csdn.net/liang1gsdsdfd/article/details/109242939

    二.话不多说直接上代码

    1.导入maven依赖

    <!-- rabbitMq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

    2.在application.yml配置文件中引入RabbitMq配置信息

    spring: rabbitmq: host: 127.0.0.1(你的RabbitMq配置地址) port: 5672(端口) username: super(用户名) password: Jmy2019.(密码) virtual-host: ecosphere(主机)

    3.rabbitMq配置类

    package com.nuvole.merchant.conf.mq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * rabbit配置类(声明交换机、队列以及他们的绑定关系) * * @author lc * @date 2020/7/1 11:15 */ @Configuration public class AmqpConfig { // 交换机名称 public static final String TEST_EXCHANGE_KEY = "exchange.pay"; // 队列名称(测试) public static final String TEST_QUEUE_KEY = "test.pay"; // 队列路线/绑定关系(测试) public static final String TEST_ROUTK = "test.pay"; @Bean public Queue testQueue() { return new Queue(TEST_QUEUE_KEY, true); } /** * 延时队列交换器 * * @author lc * @date 2020/6/30 15:06 */ @Bean public CustomExchange testExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(TEST_EXCHANGE_KEY, "x-delayed-message", true, false, args); } @Bean public Binding testBinding(CustomExchange testExchange, Queue testQueue) { Binding binding = BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTK).noargs(); return binding; } }

    4.消息发送确认

    package com.nuvole.merchant.conf.mq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 消息发送确认 * * @author lc * @date 2020/7/1 11:14 */ @Component public class AmqpAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { } else { System.out.println("消息发送确认失败:" + cause); } } }

    5.创建控制器并向队列发送消息

    package com.nuvole.merchant.controller.v1; import com.alibaba.fastjson.JSON; import com.nuvole.merchant.conf.mq.AmqpConfig; import com.nuvole.merchant.service.mq.QueueMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Date; import java.util.HashMap; @Slf4j @RestController @RequestMapping("/test") public class TestController { @Resource private QueueMessageService queueMessageService; @GetMapping("/do") public void dos(String msg, int msec) { log.info("开始发送延时队列。。。。。。。。。。。。。"); log.info(new Date().toString() + "----------当前时间"); HashMap<String, Object> param = new HashMap<>(); param.put("params", 1); param.put("order", 2); //queueMessageService.send(AmqpConfig.IMPORT_EXCHANGE_KEY, AmqpConfig.TEST_1_QUEUE_KEY, JSON.toJSONString(param)); queueMessageService.delayedSend(AmqpConfig.TEST_EXCHANGE_KEY, AmqpConfig.TEST_QUEUE_KEY, msg, msec); } }

    6.发送队列消息的service

    package com.nuvole.merchant.service.mq; /** * 发送消息 * * @author lc * @date 2020/7/1 11:26 */ public interface QueueMessageService { /** * 发送正常队列消息 * * @author lc * @date 2020/7/1 11:26 */ void send(String exchangeKey, String routingKey, Object message); /** * 发送延时队列消息 * * @param * @author lc * @date 2020/6/30 11:47 */ void delayedSend(String exchangeKey, String routingKey, Object message, int msec); }

    7.发送队列消息的service实现类

    package com.nuvole.merchant.service.mq; import com.nuvole.util.IdGenerator; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class QueueMessageServiceImpl implements QueueMessageService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void send(String exchangeKey, String routingKey, Object message) { CorrelationData correlationData = new CorrelationData(IdGenerator.getUUID()); rabbitTemplate.convertAndSend(exchangeKey, routingKey, message, correlationData); } @Override public void delayedSend(String exchangeKey, String routingKey, Object msg,final int xdelay) { rabbitTemplate.convertAndSend(exchangeKey, routingKey, msg, message -> { // 设置延迟时间 message.getMessageProperties().setDelay(xdelay); return message; }); } }

    8.配置队列的监听者(完成延时操作)

    package com.nuvole.merchant.conf.mq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Date; /** * 监听队列消息 * * @Author: lc * @Date: 2020/5/20 16:09 */ @Slf4j @Component @EnableRabbit public class TestReceiver { @RabbitListener(queues = AmqpConfig.TEST_QUEUE_KEY) public void process(String msg, Channel channel, Message message) { log.info("======================延时队列开始执行。。。。。。。。。。。。。。"); log.info(new Date().toString() + ",延时收到了信息 message = " + msg); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } } 至此一切准备完毕,启动项目来看一下效果。 依次访问接口: http://localhost:9097/test/do?msg=1&msec=10000 http://localhost:9097/test/do?msg=2&msec=5000 2020/07/01-14:11:58 [http-nio-9097-exec-5] INFO com.nuvole.merchant.controller.v1.TestController- 开始发送延时队列。。。。。。。。。。。。。 2020/07/01-14:11:58 [http-nio-9097-exec-5] INFO com.nuvole.merchant.controller.v1.TestController- Wed Jul 01 14:11:58 CST 2020----------当前时间 return--message:1,replyCode:312,replyText:NO_ROUTE,exchange:exchange.pay,routingKey:test.pay 2020/07/01-14:12:00 [http-nio-9097-exec-7] INFO com.nuvole.merchant.controller.v1.TestController- 开始发送延时队列。。。。。。。。。。。。。 2020/07/01-14:12:00 [http-nio-9097-exec-7] INFO com.nuvole.merchant.controller.v1.TestController- Wed Jul 01 14:12:00 CST 2020----------当前时间 return--message:2,replyCode:312,replyText:NO_ROUTE,exchange:exchange.pay,routingKey:test.pay 2020/07/01-14:12:05 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- ======================延时队列开始执行。。。。。。。。。。。。。。 2020/07/01-14:12:05 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- Wed Jul 01 14:12:05 CST 2020,延时收到了信息 message = 2 2020/07/01-14:12:08 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- ======================延时队列开始执行。。。。。。。。。。。。。。 2020/07/01-14:12:08 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- Wed Jul 01 14:12:08 CST 2020,延时收到了信息 message = 1 可以看到第一个消息10秒后才被消费,第二个消息5秒消费,符合预期结果。

    **下一篇:Linux下安装RabbitMq以及延时插件**

    Processed: 0.011, SQL: 9