1.使用consumer.setMaxReconsumeTimes()方法可以设置重试次数,默认15次,返回ConsumeConcurrentlyStatus.RECONSUME_LATER;消费失败后,先会进入%RETRY%+consumergroup的Topic中,再到这个ConsumerGroup。而如果一直这样重复消费都持续失败超过重试次数,就会投递到DLQ死信队列。
2.创建消费者RetryConsumer
package org
.example
.retry
;
import org
.apache
.rocketmq
.client
.consumer
.DefaultMQPushConsumer
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.ConsumeConcurrentlyContext
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.ConsumeConcurrentlyStatus
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.MessageListenerConcurrently
;
import org
.apache
.rocketmq
.client
.exception
.MQClientException
;
import org
.apache
.rocketmq
.common
.consumer
.ConsumeFromWhere
;
import org
.apache
.rocketmq
.common
.message
.MessageExt
;
import java
.util
.List
;
public class RetryConsumer {
public static void main(String
[] args
) throws MQClientException
{
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("group1");
consumer
.subscribe("TopicA", "*");
consumer
.setNamesrvAddr("192.168.42.112:9876");
consumer
.setConsumeFromWhere(ConsumeFromWhere
.CONSUME_FROM_LAST_OFFSET
);
consumer
.setMaxReconsumeTimes(3);
consumer
.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus
consumeMessage(List
<MessageExt> list
, ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
int count
= list
.get(0).getReconsumeTimes();
System
.out
.println("重试第"+count
+"次");
System
.out
.println("queueID:" + list
.get(0).getQueueId() + ",ThreadName:" + Thread
.currentThread().getName()
+ ",Messages:" + new String(list
.get(0).getBody()));
return ConsumeConcurrentlyStatus
.RECONSUME_LATER
;
}
});
consumer
.start();
System
.out
.println("ConsumerPartOrder Started.");
}
}
3.启动消费者
4.启动生产者,使用单向发送,发送3条消息
package org
.example
.normal
;
import org
.apache
.rocketmq
.client
.producer
.DefaultMQProducer
;
import org
.apache
.rocketmq
.common
.message
.Message
;
import org
.apache
.rocketmq
.remoting
.common
.RemotingHelper
;
public class OneWayProducer {
public static void main(String
[] args
) throws Exception
{
DefaultMQProducer producer
= new DefaultMQProducer("oneway");
producer
.setNamesrvAddr("192.168.42.112:9876");
producer
.start();
for (int i
= 0; i
< 3; i
++) {
Message msg
= new Message("TopicA", "TagA",
("Hello RocketMQ" + i
).getBytes(RemotingHelper
.DEFAULT_CHARSET
));
producer
.sendOneway(msg
);
System
.out
.println(new String(msg
.getBody()));
}
producer
.shutdown();
}
}
5.查看打印
ConsumerPartOrder Started
.
重试第
0次
queueID
:2,ThreadName
:ConsumeMessageThread_1
,Messages
:Hello RocketMQ1
重试第
0次
queueID
:3,ThreadName
:ConsumeMessageThread_2
,Messages
:Hello RocketMQ2
重试第
0次
queueID
:1,ThreadName
:ConsumeMessageThread_3
,Messages
:Hello RocketMQ0
重试第
1次
queueID
:0,ThreadName
:ConsumeMessageThread_4
,Messages
:Hello RocketMQ1
重试第
1次
queueID
:0,ThreadName
:ConsumeMessageThread_5
,Messages
:Hello RocketMQ0
重试第
1次
queueID
:0,ThreadName
:ConsumeMessageThread_6
,Messages
:Hello RocketMQ2
重试第
2次
重试第
2次
queueID
:0,ThreadName
:ConsumeMessageThread_8
,Messages
:Hello RocketMQ0
重试第
2次
queueID
:0,ThreadName
:ConsumeMessageThread_9
,Messages
:Hello RocketMQ2
queueID
:0,ThreadName
:ConsumeMessageThread_7
,Messages
:Hello RocketMQ1
重试第
3次
queueID
:0,ThreadName
:ConsumeMessageThread_10
,Messages
:Hello RocketMQ0
重试第
3次
queueID
:0,ThreadName
:ConsumeMessageThread_11
,Messages
:Hello RocketMQ2
重试第
3次
queueID
:0,ThreadName
:ConsumeMessageThread_12
,Messages
:Hello RocketMQ1