RabbitMQ死信队列对超时未支付订单进行交易关闭处理

    技术2024-08-17  67

    一、前言介绍

    死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息存活时间~非必需的),而死信队列又可以由“面向生产者的基本交换机+基本路由”绑定而成,故而生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中。如下图所示:

    二、具体实现

    1、项目目录结构 2、引入相关依赖

    <!--SpringBoot 整合RabbitMq--> <dependencies> <!--springboot整合 mybatis --> <!--mybatis-plus--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.2</version> </dependency> <!--mysql依赖--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> </dependencies>

    3、application.yml

    #rabbitmq配置 rabbitmq: virtual-host: /guest host: 192.168.61.xxx username: guest password: guest port: 5672 #模拟演示死信队列 mayikt: dlx: exchange: mayikt_order_dlx_exchange queue: mayikt_order_dlx_queue routingKey: dlx ##订单交换机 order: exchange: mayikt_order_exchange queue: mayikt_order_queue routingKey: mayikt.order

    3、在DeadLetterMQConfig配置类创建死信队列的消息模型

    @Component public class DeadLetterMQConfig { /** * 订单交换机 */ @Value("${mayikt.order.exchange}") private String orderExchange; /** * 订单队列 */ @Value("${mayikt.order.queue}") private String orderQueue; /** * 订单路由key */ @Value("${mayikt.order.routingKey}") private String orderRoutingKey; /** * 死信交换机 */ @Value("${mayikt.dlx.exchange}") private String dlxExchange; /** * 死信队列 */ @Value("${mayikt.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${mayikt.dlx.routingKey}") private String dlxRoutingKey; //死信交换机 @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } //声明死信队列 (真正的队列) @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 声明订单业务交换机(基本交换机) */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 声明订单队列(用于基本交换机和基本路由到死信队列的绑定) */ @Bean public Queue orderQueue() { Map<String, Object> arguments = new HashMap<>(2); // 绑定我们的死信交换机 arguments.put("x-dead-letter-exchange", dlxExchange); // 绑定我们的路由key arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * (创建基本交换机+基本路由 -> 死信队列 的绑定) */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingKey); } /** * (死信交换机+死信路由->真正队列 的绑定) */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey); } }

    OrderController:

    /* * 创建订单 * */ @PostMapping("/createOrder") @ApiOperation(value = "创建订单接口,返回订单编号", notes = "创建订单") public R createOrder(@RequestBody Order order, HttpSession session) { User user = (User)session.getAttribute(CONSTANT.LOGIN_USER); String id = this.orderService.createOrder(order,user); return R.ok().data("orderId",id).message("创建订单成功"); }

    成功创建了消息模型之后,紧接着,我们需要在通用的RabbitMQ发送消息服务类RabbitSenderService中开发“发送消息入死信队列”的功能,在该功能方法中,我们指定了消息的存活时间TTL。

    @Service public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService { @Autowired private ShippingService shippingService; @Autowired private OrderItemService orderItemService; @Autowired private ProductService productService; @Autowired private RabbitTemplate rabbitTemplate; @Value("${mayikt.order.exchange}") private String orderExchange; //订单交换机 @Value("${mayikt.order.routingKey}") private String orderRoutingKey; //订单路由key //创建订单 @Override @Transactional public String createOrder(Order order, User user) { IdWorker idWorker = new IdWorker(); if(order == null){ throw new XmallException(20001,"创建订单失败"); } if(user == null){ throw new XmallException(20001,"用户未登录,创建订单失败"); } //创建订单编号 String orderId = String.valueOf(idWorker.nextId()); order.setId(orderId); order.setUserId(user.getId()); order.setStatus(CONSTANT.ORDER_SATAUS.NOT_PAY); //根据用户查询收货地址 /*QueryWrapper<Shipping> wrapper = new QueryWrapper<>(); wrapper.eq("id",user.getId()); Shipping shipping = shippingService.getOne(wrapper); order.setShippingId(shipping.getId());*/ //设置支付类型 order.setPaymentType(CONSTANT.PAYMENT_TYPE.ONLINE); // //设置订单条目 // order.setOrderItems(); order.setPostage(0); //插入订单表 int insert = baseMapper.insert(order); if(insert ==0 ){ throw new XmallException(20001,"创建订单失败"); } //order_item表中插入订单id(批量更新) order.getOrderItems().forEach(orderItem -> { orderItem.setOrderId(orderId); //商品表减库存 QueryWrapper<Product> wrapper = new QueryWrapper<>(); wrapper.eq("id", orderItem.getProductId()); Product product = productService.getOne(wrapper); if(orderItem.getQuantity() <= product.getStock()){ product.setStock(product.getStock() - orderItem.getQuantity()); this.productService.updateById(product); }else { throw new XmallException(20001,product.getName()+"商品库存不足,下单失败"); } }); orderItemService.saveorderItemBatch(order.getOrderItems()); //下单成功后-发送信息入死信队列,等待着一定时间失效超时未支付的订单 rabbitTemplate.convertAndSend(orderExchange,orderRoutingKey,orderId,messagePostProcessor()); return orderId; } //处理待发送消息() private MessagePostProcessor messagePostProcessor(){ return new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置有效期30分钟 //message.getMessageProperties().setExpiration("1800000"); message.getMessageProperties().setExpiration("900000"); //15min后过期 return message; } }; } }

    最后,是需要在RabbitMQ通用的消息监听服务类OrderServiceImpl中监听“真正队列”中的消息并进行处理:在这里我们是对该订单进行失效处理(前提是还没付款的情况下!),其完整的源代码如下所示:

    消费者OrderDlxConsumer 监听死信队列

    @Component //死信队列 public class OrderDlxConsumer { @Autowired private OrderService orderService; @Autowired private ProductService productService; /** * 监听我们的死信队列(用户秒杀成功后超时未支付-监听者) */ @RabbitListener(queues = "mayikt_order_dlx_queue") public void orderConsumer(String orderId) { System.out.println("死信队列获取消息:" + orderId); if (StringUtils.isEmpty(orderId)) { return; } //根据id查询 QueryWrapper<Order> wrapper = new QueryWrapper<>(); wrapper.eq("id",orderId); Order order = orderService.getOne(wrapper); if (null == order) { return; } //获取状态 Integer orderStatus=order.getStatus(); //判断未支付 , 关闭订单 if(orderStatus == CONSTANT.ORDER_SATAUS.NOT_PAY){ order.setStatus(CONSTANT.ORDER_SATAUS.TRAN_CLOSE); order.setCloseTime(new Date()); orderService.updateById(order); //库存返还 order.getOrderItems().forEach(orderItem -> { QueryWrapper<Product> productQueryWrapper = new QueryWrapper<>(); productQueryWrapper.eq("id",orderItem.getProductId()); Product product = productService.getOne(productQueryWrapper); product.setStock(product.getStock() + orderItem.getQuantity()); boolean update = productService.updateById(product); if(!update){ throw new XmallException(20001,product.getName()+"商品库存返还失败--订单取消失败"); } }); } } }

    三、前端订单未支付状态倒计时

    js代码

    countDown(){ let countDown = setInterval(() => { // --this.seconds; //计算剩余时间 var time = (new Date(this.orderDetail.createTime).getTime() + 15* 60 * 1000) - (new Date().getTime()); //这里设置的和后台一样,都是15分钟过期时间 if(time>0){ //计算剩余的分钟 var minutes = parseInt(time / 1000 / 60 % 60, 10); //计算剩余的秒数 var seconds = parseInt(time / 1000 % 60, 10); //判断分钟和秒数小于10要在前面加个0. if(minutes<10){ minutes = '0' + minutes; } if (seconds < 10) { seconds = '0' + seconds; } var timer2 = minutes + "分:" + seconds+"秒"; this.timer = timer2; // return timer }else{ this.timer = "" // clearInterval(countDown) // return "" } }, 1000); }
    欢迎关注公众号Java技术大联盟,会不定期分享BAT面试资料等福利。


    Processed: 0.018, SQL: 9