RocketMQ 5:消息重试

    技术2022-07-11  97

    1.使用consumer.setMaxReconsumeTimes()方法可以设置重试次数,默认15次,返回ConsumeConcurrentlyStatus.RECONSUME_LATER;消费失败后,先会进入%RETRY%+consumergroup的Topic中,再到这个ConsumerGroup。而如果一直这样重复消费都持续失败超过重试次数,就会投递到DLQ死信队列。

    2.创建消费者RetryConsumer

    package org.example.retry; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 消费者-推模式 **/ public class RetryConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.subscribe("TopicA", "*"); consumer.setNamesrvAddr("192.168.42.112:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//每次从最后一次消费的偏移量开始消费 //重试次数(默认15次,先会进入%RETRY%group1中) consumer.setMaxReconsumeTimes(3); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { int count = list.get(0).getReconsumeTimes(); System.out.println("重试第"+count+"次"); System.out.println("queueID:" + list.get(0).getQueueId() + ",ThreadName:" + Thread.currentThread().getName() + ",Messages:" + new String(list.get(0).getBody())); //这里返回RECONSUME_LATER,这个消息会重新发送到Broker中的RETRY topic return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); System.out.println("ConsumerPartOrder Started."); } }

    3.启动消费者

    4.启动生产者,使用单向发送,发送3条消息

    package org.example.normal; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * 单向发送 **/ public class OneWayProducer { public static void main(String[] args) throws Exception { //生产者实例化 DefaultMQProducer producer = new DefaultMQProducer("oneway"); //指定mq服务器地址 producer.setNamesrvAddr("192.168.42.112:9876"); //启动实例 producer.start(); for (int i = 0; i < 3; i++) { //创建一个消息实例,指定topic、tag和消息体 Message msg = new Message("TopicA", "TagA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送消息 producer.sendOneway(msg); System.out.println(new String(msg.getBody())); } //实例关闭 producer.shutdown(); } }

    5.查看打印

    ConsumerPartOrder Started. 重试第0次 queueID:2,ThreadName:ConsumeMessageThread_1,Messages:Hello RocketMQ1 重试第0次 queueID:3,ThreadName:ConsumeMessageThread_2,Messages:Hello RocketMQ2 重试第0次 queueID:1,ThreadName:ConsumeMessageThread_3,Messages:Hello RocketMQ0 重试第1次 queueID:0,ThreadName:ConsumeMessageThread_4,Messages:Hello RocketMQ1 重试第1次 queueID:0,ThreadName:ConsumeMessageThread_5,Messages:Hello RocketMQ0 重试第1次 queueID:0,ThreadName:ConsumeMessageThread_6,Messages:Hello RocketMQ2 重试第2次 重试第2次 queueID:0,ThreadName:ConsumeMessageThread_8,Messages:Hello RocketMQ0 重试第2次 queueID:0,ThreadName:ConsumeMessageThread_9,Messages:Hello RocketMQ2 queueID:0,ThreadName:ConsumeMessageThread_7,Messages:Hello RocketMQ1 重试第3次 queueID:0,ThreadName:ConsumeMessageThread_10,Messages:Hello RocketMQ0 重试第3次 queueID:0,ThreadName:ConsumeMessageThread_11,Messages:Hello RocketMQ2 重试第3次 queueID:0,ThreadName:ConsumeMessageThread_12,Messages:Hello RocketMQ1
    Processed: 0.013, SQL: 9