RocketMQ 7:部分顺序消息

    技术2022-07-11  86

    1.使用Tag区分同一个Topic下的不同Queue

    2.创建生产者ProducePartOrder,使用selector选择器控制消息往哪个queue发

    package org.example.orderMessage.part; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; /** * 同步发送 */ public class ProducePartOrder { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("orderGroup"); producer.setNamesrvAddr("192.168.42.112:9876"); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"}; for (int i = 0; i < 12; i++) { final int index = i % tags.length; Message msg = new Message("PartOrder" ,tags[index] ,"KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //使用selector选择器控制消息往哪个queue发 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> list, Message message, Object o) { return list.get(index); } },i); System.out.println("queueId:"+sendResult.getMessageQueue().getQueueId()+";Tag:"+msg.getTags()+";message:"+new String(msg.getBody())); } producer.shutdown(); } }

    3.创建消费者ConsumerTagA,ConsumerTagB,ConsumerTagC,ConsumerTagD,区别在于设置不同的consumerGroup和subExpression

    package org.example.orderMessage.part; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 消费者-推模式 **/ public class ConsumerTagA { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("partA"); consumer.setNamesrvAddr("192.168.42.112:9876"); consumer.subscribe("PartOrder","TagA"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//每次从最后一次消费的偏移量开始消费 consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { System.out.printf("%s:Messages:%s %n", Thread.currentThread().getName(), new String(list.get(0).getBody())); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("ConsumerPartOrder Started."); } }

    4.启动4个消费者,再启动生产者

    5.查看消费者打印

    ConsumerTagA

    ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 0 ConsumeMessageThread_2:Messages:Hello RocketMQ 4 ConsumeMessageThread_3:Messages:Hello RocketMQ 8

    ConsumerTagB

    ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 1 ConsumeMessageThread_2:Messages:Hello RocketMQ 5 ConsumeMessageThread_3:Messages:Hello RocketMQ 9

    ConsumerTagC

    ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 2 ConsumeMessageThread_2:Messages:Hello RocketMQ 6 ConsumeMessageThread_3:Messages:Hello RocketMQ 10

    ConsumerTagD

    ConsumerPartOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 3 ConsumeMessageThread_2:Messages:Hello RocketMQ 7 ConsumeMessageThread_3:Messages:Hello RocketMQ 11
    Processed: 0.012, SQL: 9