总结
文章目录
总结一.广播模式1.producer2.consumer
一.广播模式
1.producer
public class RocketMqTest {
public static void main(String
[] args
) {
DefaultMQProducer producer
= new DefaultMQProducer("producerGroup");
producer
.setNamesrvAddr("ip:port");
producer
.setInstanceName(UUID
.randomUUID().toString());
producer
.start();
Message msg
= new Message(topic
, tag
, key
, (消息内容
).getBytes(RemotingHelper
.DEFAULT_CHARSET
));
SendResult result
= producer
.send(msg
, new MessageQueueSelector() {
@Override
public MessageQueue
select(List
<MessageQueue> list
, Message message
, Object o
) {
return list
.get(0);
}
},0L
);
if (null
!= producer
) {
producer
.shutdown();
}
}
}
2.consumer
public class MqConsumerRunner implements ApplicationRunner {
private Logger logger
= LoggerFactory
.getLogger(this.getClass());
@Resource
private RocketMqConfigurationProperties rocketMqConfigurationProperties
;
@Autowired
private MqConsumerService mqConsumerService
;
@Override
public void run(ApplicationArguments applicationArguments
) {
initConsumer();
}
public void initConsumer() {
try {
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer(rocketMqConfigurationProperties
.getConsumerGroupName());
consumer
.setNamesrvAddr(rocketMqConfigurationProperties
.getNamesrvAddr());
consumer
.subscribe(rocketMqConfigurationProperties
.getProducerTopicName(), "*");
consumer
.setConsumeFromWhere(ConsumeFromWhere
.CONSUME_FROM_FIRST_OFFSET
);
consumer
.setMessageModel(MessageModel
.BROADCASTING
);
consumer
.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List
<MessageExt> list
, ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
try {
mqConsumerService
.consumerMessage(list
);
} catch (Exception e
) {
e
.printStackTrace();
logger
.error(e
.getMessage());
return ConsumeConcurrentlyStatus
.RECONSUME_LATER
;
}
return ConsumeConcurrentlyStatus
.CONSUME_SUCCESS
;
}
});
consumer
.start();
} catch (Exception e
) {
e
.printStackTrace();
logger
.error("消息中间件消费初始化异常", e
);
}
}
}