package org.example.orderMessage.part; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 消费者-推模式 **/ public class ConsumerTagA { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("partA"); consumer.setNamesrvAddr("192.168.42.112:9876"); consumer.subscribe("PartOrder","TagA"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//每次从最后一次消费的偏移量开始消费 consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { System.out.printf("%s:Messages:%s %n", Thread.currentThread().getName(), new String(list.get(0).getBody())); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("ConsumerPartOrder Started."); } }
ConsumerTagA
ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 0 ConsumeMessageThread_2:Messages:Hello RocketMQ 4 ConsumeMessageThread_3:Messages:Hello RocketMQ 8ConsumerTagB
ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 1 ConsumeMessageThread_2:Messages:Hello RocketMQ 5 ConsumeMessageThread_3:Messages:Hello RocketMQ 9ConsumerTagC
ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 2 ConsumeMessageThread_2:Messages:Hello RocketMQ 6 ConsumeMessageThread_3:Messages:Hello RocketMQ 10ConsumerTagD
ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 3 ConsumeMessageThread_2:Messages:Hello RocketMQ 7 ConsumeMessageThread_3:Messages:Hello RocketMQ 11