1.创建一个新的springboot工程,引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<parent>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-parent
</artifactId>
<version>2.3.1.RELEASE
</version>
<relativePath/>
</parent>
<groupId>com.example
</groupId>
<artifactId>rockectmq-springboot
</artifactId>
<version>0.0.1-SNAPSHOT
</version>
<name>rockectmq-springboot
</name>
<description>Demo project for Spring Boot
</description>
<properties>
<java.version>1.8
</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq
</groupId>
<artifactId>rocketmq-client
</artifactId>
<version>4.4.0
</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-maven-plugin
</artifactId>
</plugin>
</plugins>
</build>
</project>
2.配置文件添加mq的地址
rocketmq.namesrvaddr=192.168.42.112:9876
3.创建生产者MQProducer
package com
.example
.rockectmqspringboot
;
import org
.apache
.rocketmq
.client
.exception
.MQClientException
;
import org
.apache
.rocketmq
.client
.producer
.DefaultMQProducer
;
import org
.apache
.rocketmq
.client
.producer
.SendCallback
;
import org
.apache
.rocketmq
.client
.producer
.SendResult
;
import org
.apache
.rocketmq
.common
.message
.Message
;
import org
.apache
.rocketmq
.remoting
.common
.RemotingHelper
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Value
;
import org
.springframework
.stereotype
.Component
;
import javax
.annotation
.PostConstruct
;
import javax
.annotation
.PreDestroy
;
@Component
public class MQProducer {
private static final Logger LOGGER
= LoggerFactory
.getLogger(MQProducer
.class);
@Value("${rocketmq.namesrvaddr}")
private String namesrvaddr
;
private final DefaultMQProducer producer
= new DefaultMQProducer("TestProducer");
@PostConstruct
public void start(){
LOGGER
.info("MQ启动生产者");
producer
.setNamesrvAddr(namesrvaddr
);
try {
producer
.start();
} catch (MQClientException e
) {
LOGGER
.info("启动生产者失败:{}-{}",e
.getResponseCode(),e
.getErrorMessage());
throw new RuntimeException(e
.getErrorMessage(),e
);
}
}
public void sendMessage(String data
, String topic
, String tags
, String keys
) {
try {
Message message
= new Message(topic
, tags
, keys
, data
.getBytes(RemotingHelper
.DEFAULT_CHARSET
));
producer
.send(message
, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult
) {
LOGGER
.info("生产者发送消息:{}", sendResult
);
}
@Override
public void onException(Throwable throwable
) {
LOGGER
.error(throwable
.getMessage(), throwable
);
}
});
} catch (Exception e
) {
LOGGER
.error(e
.getMessage(), e
);
}
}
@PreDestroy
public void stop() {
if (producer
!= null
) {
producer
.shutdown();
LOGGER
.info("关闭生产者");
}
}
}
4.创建消费者MQPushConsumer
package com
.example
.rockectmqspringboot
;
import org
.apache
.rocketmq
.client
.consumer
.DefaultMQPushConsumer
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.ConsumeConcurrentlyContext
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.ConsumeConcurrentlyStatus
;
import org
.apache
.rocketmq
.client
.consumer
.listener
.MessageListenerConcurrently
;
import org
.apache
.rocketmq
.client
.exception
.MQClientException
;
import org
.apache
.rocketmq
.common
.consumer
.ConsumeFromWhere
;
import org
.apache
.rocketmq
.common
.message
.MessageExt
;
import org
.apache
.rocketmq
.common
.protocol
.heartbeat
.MessageModel
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Value
;
import org
.springframework
.stereotype
.Component
;
import javax
.annotation
.PostConstruct
;
import javax
.annotation
.PreDestroy
;
import java
.util
.List
;
@Component
public class MQPushConsumer implements MessageListenerConcurrently {
private static final Logger LOGGER
= LoggerFactory
.getLogger(MQPushConsumer
.class);
@Value("${rocketmq.namesrvaddr}")
private String namesrvaddr
;
private final DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("TestConsumer");
@PostConstruct
public void start(){
try {
LOGGER
.info("启动消费者");
consumer
.setNamesrvAddr(namesrvaddr
);
consumer
.setConsumeFromWhere(ConsumeFromWhere
.CONSUME_FROM_FIRST_OFFSET
);
consumer
.setMessageModel(MessageModel
.CLUSTERING
);
consumer
.subscribe("TopicTest", "*");
consumer
.registerMessageListener(this);
consumer
.start();
} catch (MQClientException e
) {
LOGGER
.error("消费者启动失败:{}",e
.getErrorMessage());
throw new RuntimeException(e
.getErrorMessage(),e
);
}
}
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List
<MessageExt> list
, ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
int index
=0;
try {
for (; index
< list
.size(); index
++) {
MessageExt msg
= list
.get(index
);
String msgBody
= new String(msg
.getBody());
LOGGER
.info("消费者监听:queueId:" + msg
.getQueueId() + ";message:" + msgBody
);
}
} catch (Exception e
) {
LOGGER
.error(e
.getMessage(),e
);
} finally {
if (index
< list
.size()) {
consumeConcurrentlyContext
.setAckIndex(index
+ 1);
}
}
return ConsumeConcurrentlyStatus
.CONSUME_SUCCESS
;
}
@PreDestroy
public void stop() {
if (consumer
!= null
) {
consumer
.shutdown();
LOGGER
.info("关闭消费者");
}
}
}
5.创建RocketController
package com
.example
.rockectmqspringboot
.controller
;
import com
.example
.rockectmqspringboot
.MQProducer
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.web
.bind
.annotation
.RequestMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestParam
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
@RestController
public class RocketController {
protected static final Logger LOGGER
= LoggerFactory
.getLogger(RocketController
.class);
@Autowired
private MQProducer producer
;
@RequestMapping(value
= "/send")
public String
sendRocket(@RequestParam(required
= false) String data
, @RequestParam(required
= false) String tag
) {
try {
LOGGER
.info("rocket的消息:{}", data
);
producer
.sendMessage(data
, "TopicTest", tag
, null
);
return "发送成功";
} catch (Exception e
) {
LOGGER
.error("发送rocket异常:", e
);
return "发送失败";
}
}
}
6.执行main方法启动服务,查看打印,生产者和消费者都已启动
7.访问"http://localhost:8080/send?data=Fisher&tag=mq"发送消息
8.查看打印,消息被成功消费