SpringBoot+RabbitMq延迟插件实现延时队列
一.安装RabbitMq二.话不多说直接上代码1.导入maven依赖2.在application.yml配置文件中引入RabbitMq配置信息3.rabbitMq配置类4.消息发送确认5.创建控制器并向队列发送消息6.发送队列消息的service7.发送队列消息的service实现类8.配置队列的监听者(完成延时操作)
一.安装RabbitMq
链接地址:https://blog.csdn.net/liang1gsdsdfd/article/details/109242939
二.话不多说直接上代码
1.导入maven依赖
<!-- rabbitMq
-->
<dependency
>
<groupId
>org
.springframework
.boot
</groupId
>
<artifactId
>spring
-boot
-starter
-amqp
</artifactId
>
</dependency
>
2.在application.yml配置文件中引入RabbitMq配置信息
spring
:
rabbitmq
:
host
: 127.0.0.1(你的RabbitMq配置地址)
port
: 5672(端口)
username
: super(用户名)
password
: Jmy2019
.(密码)
virtual
-host
: ecosphere(主机
)
3.rabbitMq配置类
package com
.nuvole
.merchant
.conf
.mq
;
import org
.springframework
.amqp
.core
.*;
import org
.springframework
.context
.annotation
.Bean
;
import org
.springframework
.context
.annotation
.Configuration
;
import java
.util
.HashMap
;
import java
.util
.Map
;
@Configuration
public class AmqpConfig {
public static final String
TEST_EXCHANGE_KEY = "exchange.pay";
public static final String
TEST_QUEUE_KEY = "test.pay";
public static final String
TEST_ROUTK = "test.pay";
@Bean
public Queue
testQueue() {
return new Queue(TEST_QUEUE_KEY, true);
}
@Bean
public CustomExchange
testExchange() {
Map
<String
, Object
> args
= new HashMap<>();
args
.put("x-delayed-type", "direct");
return new CustomExchange(TEST_EXCHANGE_KEY, "x-delayed-message", true, false, args
);
}
@Bean
public Binding
testBinding(CustomExchange testExchange
, Queue testQueue
) {
Binding binding
= BindingBuilder
.bind(testQueue
).to(testExchange
).with(TEST_ROUTK).noargs();
return binding
;
}
}
4.消息发送确认
package com
.nuvole
.merchant
.conf
.mq
;
import org
.springframework
.amqp
.core
.Message
;
import org
.springframework
.amqp
.rabbit
.core
.RabbitTemplate
;
import org
.springframework
.amqp
.rabbit
.support
.CorrelationData
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.stereotype
.Component
;
import javax
.annotation
.PostConstruct
;
@Component
public class AmqpAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate
.ReturnCallback
{
@Autowired
private RabbitTemplate rabbitTemplate
;
@PostConstruct
public void init() {
rabbitTemplate
.setConfirmCallback(this);
rabbitTemplate
.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message
, int replyCode
, String replyText
, String exchange
, String routingKey
) {
System
.out
.println("return--message:" + new String(message
.getBody()) + ",replyCode:" + replyCode
+ ",replyText:" + replyText
+ ",exchange:" + exchange
+ ",routingKey:" + routingKey
);
}
@Override
public void confirm(CorrelationData correlationData
, boolean ack
, String cause
) {
if (ack
) {
} else {
System
.out
.println("消息发送确认失败:" + cause
);
}
}
}
5.创建控制器并向队列发送消息
package com
.nuvole
.merchant
.controller
.v1
;
import com
.alibaba
.fastjson
.JSON;
import com
.nuvole
.merchant
.conf
.mq
.AmqpConfig
;
import com
.nuvole
.merchant
.service
.mq
.QueueMessageService
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.web
.bind
.annotation
.GetMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestMapping
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
import javax
.annotation
.Resource
;
import java
.util
.Date
;
import java
.util
.HashMap
;
@Slf4j
@RestController
@
RequestMapping("/test")
public class TestController {
@Resource
private QueueMessageService queueMessageService
;
@
GetMapping("/do")
public void dos(String msg
, int msec
) {
log
.info("开始发送延时队列。。。。。。。。。。。。。");
log
.info(new Date().toString() + "----------当前时间");
HashMap
<String
, Object
> param
= new HashMap<>();
param
.put("params", 1);
param
.put("order", 2);
queueMessageService
.delayedSend(AmqpConfig
.TEST_EXCHANGE_KEY, AmqpConfig
.TEST_QUEUE_KEY, msg
, msec
);
}
}
6.发送队列消息的service
package com
.nuvole
.merchant
.service
.mq
;
public interface QueueMessageService {
void send(String exchangeKey
, String routingKey
, Object message
);
void delayedSend(String exchangeKey
, String routingKey
, Object message
, int msec
);
}
7.发送队列消息的service实现类
package com
.nuvole
.merchant
.service
.mq
;
import com
.nuvole
.util
.IdGenerator
;
import org
.springframework
.amqp
.rabbit
.core
.RabbitTemplate
;
import org
.springframework
.amqp
.rabbit
.support
.CorrelationData
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.stereotype
.Service
;
@Service
public class QueueMessageServiceImpl implements QueueMessageService {
@Autowired
private RabbitTemplate rabbitTemplate
;
@Override
public void send(String exchangeKey
, String routingKey
, Object message
) {
CorrelationData correlationData
= new CorrelationData(IdGenerator
.getUUID());
rabbitTemplate
.convertAndSend(exchangeKey
, routingKey
, message
, correlationData
);
}
@Override
public void delayedSend(String exchangeKey
, String routingKey
, Object msg
,final int xdelay
) {
rabbitTemplate
.convertAndSend(exchangeKey
, routingKey
, msg
, message
-> {
message
.getMessageProperties().setDelay(xdelay
);
return message
;
});
}
}
8.配置队列的监听者(完成延时操作)
package com
.nuvole
.merchant
.conf
.mq
;
import com
.rabbitmq
.client
.Channel
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.amqp
.core
.Message
;
import org
.springframework
.amqp
.rabbit
.annotation
.EnableRabbit
;
import org
.springframework
.amqp
.rabbit
.annotation
.RabbitListener
;
import org
.springframework
.stereotype
.Component
;
import java
.io
.IOException
;
import java
.util
.Date
;
@Slf4j
@Component
@EnableRabbit
public class TestReceiver {
@
RabbitListener(queues
= AmqpConfig
.TEST_QUEUE_KEY)
public void process(String msg
, Channel channel
, Message message
) {
log
.info("======================延时队列开始执行。。。。。。。。。。。。。。");
log
.info(new Date().toString() + ",延时收到了信息 message = " + msg
);
try {
channel
.basicAck(message
.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e
) {
e
.printStackTrace();
}
}
}
至此一切准备完毕,启动项目来看一下效果。 依次访问接口:
http://localhost:9097/test/do?msg=1&msec=10000
http://localhost:9097/test/do?msg=2&msec=5000
2020/07/01-14:11:58 [http
-nio
-9097-exec
-5] INFO com
.nuvole
.merchant
.controller
.v1
.TestController
- 开始发送延时队列。。。。。。。。。。。。。
2020/07/01-14:11:58 [http
-nio
-9097-exec
-5] INFO com
.nuvole
.merchant
.controller
.v1
.TestController
- Wed Jul
01 14:11:58 CST 2020----------当前时间
return--message
:1,replyCode
:312,replyText
:NO_ROUTE,exchange
:exchange
.pay
,routingKey
:test
.pay
2020/07/01-14:12:00 [http
-nio
-9097-exec
-7] INFO com
.nuvole
.merchant
.controller
.v1
.TestController
- 开始发送延时队列。。。。。。。。。。。。。
2020/07/01-14:12:00 [http
-nio
-9097-exec
-7] INFO com
.nuvole
.merchant
.controller
.v1
.TestController
- Wed Jul
01 14:12:00 CST 2020----------当前时间
return--message
:2,replyCode
:312,replyText
:NO_ROUTE,exchange
:exchange
.pay
,routingKey
:test
.pay
2020/07/01-14:12:05 [SimpleAsyncTaskExecutor
-1] INFO com
.nuvole
.merchant
.conf
.mq
.TestReceiver
- ======================延时队列开始执行。。。。。。。。。。。。。。
2020/07/01-14:12:05 [SimpleAsyncTaskExecutor
-1] INFO com
.nuvole
.merchant
.conf
.mq
.TestReceiver
- Wed Jul
01 14:12:05 CST 2020,延时收到了信息 message
= 2
2020/07/01-14:12:08 [SimpleAsyncTaskExecutor
-1] INFO com
.nuvole
.merchant
.conf
.mq
.TestReceiver
- ======================延时队列开始执行。。。。。。。。。。。。。。
2020/07/01-14:12:08 [SimpleAsyncTaskExecutor
-1] INFO com
.nuvole
.merchant
.conf
.mq
.TestReceiver
- Wed Jul
01 14:12:08 CST 2020,延时收到了信息 message
= 1
可以看到第一个消息10秒后才被消费,第二个消息5秒消费,符合预期结果。
**下一篇:Linux下安装RabbitMq以及延时插件**