1、实现方案:
延时队列可以通过redis的zset来实现。可以将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset获取到期的任务进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。又因为有多个线程,所以需要考虑并发争抢任务,确保任务不会被多次消费。
2、常用应用场景:
1、超过30分钟未支付,订单自动作废; 2、向注册N天用户发送短信; 3、微信公众号关注后N时间发送消息; ………………等等 以下模拟订单场景进行实现;
3、具体实现(java版)
package com
.example
.demo
.queue
;
import com
.alibaba
.fastjson
.JSON
;
import org
.springframework
.data
.redis
.core
.RedisTemplate
;
import org
.springframework
.stereotype
.Service
;
import javax
.annotation
.Resource
;
import java
.math
.BigDecimal
;
import java
.time
.LocalDateTime
;
import java
.time
.format
.DateTimeFormatter
;
import java
.util
.Set
;
@Service
public class RedisDelayQueue {
@Resource
private RedisTemplate redisTemplate
;
private static final String QUEUE_KEY
= "queueKey";
public void delay(Order order
) {
String orderStr
= JSON
.toJSONString(order
);
int integer
= Integer
.parseInt(order
.getId());
int time
= integer
* 2000;
redisTemplate
.opsForZSet().add(QUEUE_KEY
, orderStr
, System
.currentTimeMillis() + time
);
}
public void loop() {
while (!Thread
.interrupted()) {
Set
<String> value
=
redisTemplate
.opsForZSet().rangeByScore(QUEUE_KEY
, 0, System
.currentTimeMillis(), 0, 1);
if (value
.isEmpty()) {
continue;
}
String orderStr
= value
.iterator().next();
if (redisTemplate
.opsForZSet().remove(QUEUE_KEY
, orderStr
) > 0) {
Order order
= JSON
.parseObject(orderStr
, Order
.class);
this.handle(order
);
}
}
}
public void handle(Order order
) {
Thread thread
= Thread
.currentThread();
System
.out
.println("线程:" + thread
.getId() + "--" + thread
.getName() + " 时间:第"
+ LocalDateTime
.now().getSecond() + "秒,消费订单:" + order
.toString());
}
public static class Order {
public Order(String id
, BigDecimal amount
, String goods
) {
this.id
= id
;
this.amount
= amount
;
this.goods
= goods
;
}
public String id
;
public BigDecimal amount
;
public String goods
;
public String
getId() {
return id
;
}
public void setId(String id
) {
this.id
= id
;
}
public BigDecimal
getAmount() {
return amount
;
}
public void setAmount(BigDecimal amount
) {
this.amount
= amount
;
}
public String
getGoods() {
return goods
;
}
public void setGoods(String goods
) {
this.goods
= goods
;
}
@Override
public String
toString() {
return "Order{" +
"id='" + id
+ '\'' +
", amount=" + amount
+
", goods='" + goods
+ '\'' +
'}';
}
}
}
4、单元测试:
@Resource
private RedisDelayQueue redisDelayQueue
;
@Test
public void delayQueue() throws InterruptedException
{
ExecutorService pool
= Executors
.newFixedThreadPool(6);
CountDownLatch latch
= new CountDownLatch(20);
pool
.execute(() -> {
IntStream
.range(0, 10).forEach(i
-> {
RedisDelayQueue
.Order order
= new RedisDelayQueue.Order(String
.valueOf(i
), new BigDecimal(i
), "商品");
redisDelayQueue
.delay(order
);
latch
.countDown();
});
});
IntStream
.range(0, 5).forEach(i
-> {
pool
.execute(() -> {
redisDelayQueue
.loop();
latch
.countDown();
});
});
latch
.await();
}
5、测试结果:
从图中看到,订单随着时间到期而被消费,符合预期。