rocketMq——消息发送与消息消费

    技术2022-07-10  125

    总结

    文章目录

    总结一.广播模式1.producer2.consumer

    一.广播模式

    1.producer

    /** * @Author: wangzb * @Date: 2020/06/30 * @Desciption: 发送消息 * @Params: */ public class RocketMqTest { public static void main(String[] args) { //1.消息提供者 传入参数:生产者组名 DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); //2.rocketMQ的服务器地址,可以指定多个,由逗号分隔 producer.setNamesrvAddr("ip:port"); //3.生产者实例名称 producer.setInstanceName(UUID.randomUUID().toString()); //4.开启生产者 producer.start(); //5.创建消息模型:Topic:主题; Tag:目标; Key:键; 默认字符集:utf-8 Message msg = new Message(topic, tag, key, (消息内容).getBytes(RemotingHelper.DEFAULT_CHARSET)); //6.发送消息:0L:队列选择 queue:默认4个队列 SendResult result = producer.send(msg, new MessageQueueSelector() { //7.队列选择器 @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { /*Integer id = Integer.parseInt(o+""); Integer index = id % list.size();*/ //返回选中的队列 return list.get(0); } },0L); //8.资源关闭 if (null != producer) { producer.shutdown(); } } }

    2.consumer

    /** * @Author: wangzb * @Date: 2020/06/30 * @Desciption: 消费消息 * @Params: */ public class MqConsumerRunner implements ApplicationRunner { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private RocketMqConfigurationProperties rocketMqConfigurationProperties; @Autowired private MqConsumerService mqConsumerService; @Override public void run(ApplicationArguments applicationArguments) { initConsumer(); } public void initConsumer() { try { //1.消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketMqConfigurationProperties.getConsumerGroupName()); //2.rocketMQ的服务器地址 consumer.setNamesrvAddr(rocketMqConfigurationProperties.getNamesrvAddr()); //3.订阅模式:"*":tag,多个用||分隔 consumer.subscribe(rocketMqConfigurationProperties.getProducerTopicName(), "*"); //4.从何处开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //5.消费模式:广播 consumer.setMessageModel(MessageModel.BROADCASTING); //6.消息监听:顺序的是:new MessageListenerOrderly() consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { //7.消费消息 mqConsumerService.consumerMessage(list); } catch (Exception e) { e.printStackTrace(); logger.error(e.getMessage()); //8.异常时重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); logger.error("消息中间件消费初始化异常", e); } } }
    Processed: 0.011, SQL: 12