Redis之延时队列

    技术2026-06-12  10

    1、实现方案:

    延时队列可以通过redis的zset来实现。可以将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset获取到期的任务进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。又因为有多个线程,所以需要考虑并发争抢任务,确保任务不会被多次消费。

    2、常用应用场景:

    1、超过30分钟未支付,订单自动作废; 2、向注册N天用户发送短信; 3、微信公众号关注后N时间发送消息; ………………等等 以下模拟订单场景进行实现;

    3、具体实现(java版)

    package com.example.demo.queue; import com.alibaba.fastjson.JSON; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Set; @Service public class RedisDelayQueue { @Resource private RedisTemplate redisTemplate; /** * 延时队列名 */ private static final String QUEUE_KEY = "queueKey"; /** * 将数据放入延时队列 */ public void delay(Order order) { String orderStr = JSON.toJSONString(order); // 延时时间,为方便测试观看,故按id递增 int integer = Integer.parseInt(order.getId()); int time = integer * 2000; // 塞入延时队列 redisTemplate.opsForZSet().add(QUEUE_KEY, orderStr, System.currentTimeMillis() + time); } /** * 取出到期数据 */ public void loop() { while (!Thread.interrupted()) { // 每次取一条 Set<String> value = redisTemplate.opsForZSet().rangeByScore(QUEUE_KEY, 0, System.currentTimeMillis(), 0, 1); if (value.isEmpty()) { // do something... continue; } String orderStr = value.iterator().next(); // 防止多线程下多次消费,规定谁抢到谁消费 if (redisTemplate.opsForZSet().remove(QUEUE_KEY, orderStr) > 0) { Order order = JSON.parseObject(orderStr, Order.class); this.handle(order); } } } /** * 处理逻辑 */ public void handle(Order order) { // do something... Thread thread = Thread.currentThread(); System.out.println("线程:" + thread.getId() + "--" + thread.getName() + " 时间:第" + LocalDateTime.now().getSecond() + "秒,消费订单:" + order.toString()); } /** * 模拟订单实体 */ public static class Order { public Order(String id, BigDecimal amount, String goods) { this.id = id; this.amount = amount; this.goods = goods; } public String id; public BigDecimal amount; public String goods; public String getId() { return id; } public void setId(String id) { this.id = id; } public BigDecimal getAmount() { return amount; } public void setAmount(BigDecimal amount) { this.amount = amount; } public String getGoods() { return goods; } public void setGoods(String goods) { this.goods = goods; } @Override public String toString() { return "Order{" + "id='" + id + '\'' + ", amount=" + amount + ", goods='" + goods + '\'' + '}'; } } }

    4、单元测试:

    @Resource private RedisDelayQueue redisDelayQueue; @Test public void delayQueue() throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(6); // 防止单测主线程退出 CountDownLatch latch = new CountDownLatch(20); // producer pool.execute(() -> { IntStream.range(0, 10).forEach(i -> { RedisDelayQueue.Order order = new RedisDelayQueue.Order(String.valueOf(i), new BigDecimal(i), "商品"); redisDelayQueue.delay(order); latch.countDown(); }); }); // consumer IntStream.range(0, 5).forEach(i -> { pool.execute(() -> { redisDelayQueue.loop(); latch.countDown(); }); }); latch.await(); }

    5、测试结果:

    从图中看到,订单随着时间到期而被消费,符合预期。

    Processed: 0.009, SQL: 9