RocketMQ 6:全局顺序消息

    技术2022-07-11  86

    1.RocketMQ 在默认情况下不保证顺序,要保证全局顺序,需要把 Topic 的读写队列数设置为 1,然后生产者和消费者的并发设置也是 1,不能使用多线程。所以这样的话 高并发,高吞吐量的功能完全用不上。

    适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。 在mq的bin目录下,执行./mqadmin updateTopic -t AllOrder -c DefaultCluster -r 1 -w 1 -n 127.0.0.1:9876,将AllOrder修改为只有一个queue

    2.创建消费者ConsumerAllOrder,监听使用MessageListenerOrderly

    package org.example.orderMessage.all; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 消费者-推模式 **/ public class ConsumerAllOrder { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("AllOrderGroup"); consumer.setNamesrvAddr("192.168.42.112:9876"); consumer.subscribe("AllOrder", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//每次从最后一次消费的地址 consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.printf("%s:Messages:%s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("ConsumerAllOrder Started.%n"); } }

    3.创建生产者ProduceAllOrder

    package org.example.orderMessage.all; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * 同步发送 */ public class ProduceAllOrder { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("AllOrder"); producer.setNamesrvAddr("192.168.42.112:9876"); producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("AllOrder" , "TagC" ,"KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult.getSendStatus()+":(MsgId):" +sendResult.getMsgId()+":(queueId):" +sendResult.getMessageQueue().getQueueId() +"(value):"+ new String(msg.getBody())); } producer.shutdown(); } }

    4.启动消费者

    5.启动生产者

    6.查看消费者打印

    ConsumerAllOrder Started. ConsumeMessageThread_1:Messages:Hello RocketMQ 0 ConsumeMessageThread_1:Messages:Hello RocketMQ 1 ConsumeMessageThread_1:Messages:Hello RocketMQ 2 ConsumeMessageThread_1:Messages:Hello RocketMQ 3 ConsumeMessageThread_1:Messages:Hello RocketMQ 4 ConsumeMessageThread_1:Messages:Hello RocketMQ 5 ConsumeMessageThread_1:Messages:Hello RocketMQ 6 ConsumeMessageThread_1:Messages:Hello RocketMQ 7 ConsumeMessageThread_1:Messages:Hello RocketMQ 8 ConsumeMessageThread_1:Messages:Hello RocketMQ 9
    Processed: 0.011, SQL: 9