从安装到使用202072亲测:springboot集成RocketMq4.5.2

    技术2022-07-21  72

    1.安装RocketMq

    使用 Docker 安装软件(不会Docker?先去学!!!贼好用!!!)

    docker pull foxiswho/rocketmq:server docker pull foxiswho/rocketmq:broker docker pull styletang/rocketmq-console-ng

    2.配置、启动Mq

    启动nameserver

    docker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:server

    启动broker 注意: 1) broker中/etc/rocketmq下没有broker.conf 需要自己挂载文件 2) broker默认对外开放的ip为内网ip,需手动修改配置文件,添加brokerIP1 = xxx.xxx.xxx.xxx

    brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH * 1)解决方法:自己建个broker.conf文件将上面内容拷贝进去,上面为broker默认配置文件 * 2)解决方法:在配置文件最上面加上 brokerIP1 = xxx.xxx.xxx.xxx

    store为数据文件 docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/logs:/root/logs -v `pwd`/store:/root/store -v `pwd`/broker.conf:/etc/rocketmq/broker.conf --name rmqbroker --link rmqserver:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m" foxiswho/rocketmq:broker

    3.可视化工具

    docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv\ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876\ -Dcom.rocketmq.sendMessageWithVIPChannel=false"\ -t styletang/rocketmq-console-ng

    访问 ip:8180,及可进入下图页面,然后可切换中文界面

    4.SpringBoot 集成

    <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> # RocketMQ YML配置 rocketmq: #nameserver集群地址用;隔开 name-server: 192.168.1.169:9876 #生产者参数配置 producer: group: my-group sendMessageTimeout: 300000 compress-message-body-threshold: 4096 max-message-size: 4194304 retry-times-when-send-async-failed: 0 retry-next-server: true retry-times-when-send-failed: 2
    生产者

    我是使用测试类启动的,请自行修改

    import com.zz.rocketmq.CloudRocketmqApplication; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @Description: 消息生产者 * @Author: zhao * @Date: 2020-07-02 **/ @RunWith(SpringRunner.class) @SpringBootTest(classes = CloudRocketmqApplication.class) public class MqProduceTest { @Autowired private RocketMQTemplate rocketMQTemplate; private static String topic = "zz_topic"; /** * 发送同步消息 */ @Test public void syncSend() { //第一个参数是发送的目的地,一般是放topic,也可以放topic:tag;第二个参数是消息 SendResult sendResult = rocketMQTemplate.syncSend(topic, "Hello, sychronized message!"); System.out.println("同步消息的结果:" + sendResult); } /** * 发送异步消息 */ @Test public void asyncSend() { //第一个参数是发送的目的地,一般是放topic,也可以放topic:tag;第二个参数是消息; //第三个参数是异步消息发送结果的回调 rocketMQTemplate.asyncSend(topic, "Hello, asychronized message!", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("异步消息发送成功,发送结果:"+sendResult); } @Override public void onException(Throwable throwable) { System.out.println("异步消息发送失败,消息回调"); } }); } @Test public void sendMsg() { //发送oneway消息 //oneway消息就是只管发送,不管发送的结果如何 rocketMQTemplate.sendOneWay(topic,"Hello, oneway message!"); //发送带有tag的消息 SendResult tagResult1 = rocketMQTemplate.syncSend(topic + ":test1", "Hello, tags:test1 message!"); System.out.println("带有tag:test1的消息发送结果:"+tagResult1); rocketMQTemplate.convertAndSend(topic, "hell aaa"); } }
    消费者
    package com.zz.rocketmq; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * @Description: 消息消费者 * @Author: zhao * @Date: 2020-07-02 **/ @Service @RocketMQMessageListener(topic = "zz_topic",consumerGroup = "my_consumer") public class Consumer implements RocketMQListener<MessageExt> { /** * 注意: topic 要和发送者的一致,不然收不到 * MessageExt 限定数据类型,可改为RocketMQListener<String> * @param messageExt */ @Override public void onMessage(MessageExt messageExt) { System.out.println("收到了消息:"+new String(messageExt.getBody())); } }

    要注意编码规范哦,如:topic 常量应定义在配置类中或配置文件中,以便管理 ~对你有帮助的话点个赞呗。 ~有天津的小伙伴吗,一起交流下呀。

    Processed: 0.008, SQL: 9