RocketMQ 10:整合springboot

    技术2025-02-26  12

    1.创建一个新的springboot工程,引入依赖

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rockectmq-springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rockectmq-springboot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

    2.配置文件添加mq的地址

    rocketmq.namesrvaddr=192.168.42.112:9876

    3.创建生产者MQProducer

    package com.example.rockectmqspringboot; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @Component public class MQProducer { private static final Logger LOGGER = LoggerFactory.getLogger(MQProducer.class); @Value("${rocketmq.namesrvaddr}") private String namesrvaddr; private final DefaultMQProducer producer = new DefaultMQProducer("TestProducer"); /** *初始化 */ @PostConstruct public void start(){ LOGGER.info("MQ启动生产者"); producer.setNamesrvAddr(namesrvaddr); try { producer.start(); } catch (MQClientException e) { LOGGER.info("启动生产者失败:{}-{}",e.getResponseCode(),e.getErrorMessage()); throw new RuntimeException(e.getErrorMessage(),e); } } /** * 发送消息 */ public void sendMessage(String data, String topic, String tags, String keys) { try { Message message = new Message(topic, tags, keys, data.getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { LOGGER.info("生产者发送消息:{}", sendResult); } @Override public void onException(Throwable throwable) { LOGGER.error(throwable.getMessage(), throwable); } }); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } @PreDestroy public void stop() { if (producer != null) { producer.shutdown(); LOGGER.info("关闭生产者"); } } }

    4.创建消费者MQPushConsumer

    package com.example.rockectmqspringboot; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; @Component public class MQPushConsumer implements MessageListenerConcurrently { private static final Logger LOGGER = LoggerFactory.getLogger(MQPushConsumer.class); @Value("${rocketmq.namesrvaddr}") private String namesrvaddr; private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer"); /** *初始化 **/ @PostConstruct public void start(){ try { LOGGER.info("启动消费者"); consumer.setNamesrvAddr(namesrvaddr); //从头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //集群消费模式 consumer.setMessageModel(MessageModel.CLUSTERING); //订阅主题 consumer.subscribe("TopicTest", "*"); //注册监听器 consumer.registerMessageListener(this); consumer.start(); } catch (MQClientException e) { LOGGER.error("消费者启动失败:{}",e.getErrorMessage()); throw new RuntimeException(e.getErrorMessage(),e); } } @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { int index=0; try { for (; index < list.size(); index++) { MessageExt msg = list.get(index); String msgBody = new String(msg.getBody()); LOGGER.info("消费者监听:queueId:" + msg.getQueueId() + ";message:" + msgBody); } } catch (Exception e) { LOGGER.error(e.getMessage(),e); } finally { //标记失败消息的位置 if (index < list.size()) { consumeConcurrentlyContext.setAckIndex(index + 1); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } @PreDestroy public void stop() { if (consumer != null) { consumer.shutdown(); LOGGER.info("关闭消费者"); } } }

    5.创建RocketController

    package com.example.rockectmqspringboot.controller; import com.example.rockectmqspringboot.MQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class RocketController { protected static final Logger LOGGER = LoggerFactory.getLogger(RocketController.class); @Autowired private MQProducer producer; @RequestMapping(value = "/send") public String sendRocket(@RequestParam(required = false) String data, @RequestParam(required = false) String tag) { try { LOGGER.info("rocket的消息:{}", data); producer.sendMessage(data, "TopicTest", tag, null); return "发送成功"; } catch (Exception e) { LOGGER.error("发送rocket异常:", e); return "发送失败"; } } }

    6.执行main方法启动服务,查看打印,生产者和消费者都已启动

    7.访问"http://localhost:8080/send?data=Fisher&tag=mq"发送消息

    8.查看打印,消息被成功消费

    Processed: 0.008, SQL: 9