9.集成消息中间件ActiveMQ

    技术2025-08-07  9

    1.ActiveMQ简介 MQ全称为(Message Queue),是一个消息的接收和转发容器,用于消息推送。ActiveMQ是Apache提供的一个开源的消息系统,完全采用Java来实现,很好地支持了J2EE提出的JMS(Java Message Service)规范。

    2.数据准备 新建一个名为mydb的数据库,在mydb中新建一个名为mood的数据表,具体SQL语句如下。

    create database if not exists mydb character set = utf8; create table mood ( id int(11) not null auto_increment, user_id varchar(255) default null, content varchar(255) default null, publish_time datetime(3) default null, primary key (id) );

    3.引入依赖 新建一个SpringBoot工程,并在pom.xml文件中添加集成ActiveMQ所需要的dependency。

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <scope>provided</scope> </dependency>

    4.添加配置 在application.properties文件中添加如下配置信息。

    ############################################################ # # MySQL配置 # ############################################################ ### 连接信息 spring.datasource.url = jdbc:mysql://localhost:3306/mydb ### 用户名 spring.datasource.username = root ### 密码 spring.datasource.password = admin123 ### 驱动 spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver ############################################################ # # MyBatis配置 # ############################################################ # ##po类存放目录 mybatis.type-aliases-package = com.leichuangkj.activemq.dao.po ### mapper(.xml)资源文件存放路径 mybatis.mapper-locations = classpath:mybatis/mapper/*.xml ############################################################ # # ActiveMQ配置 # ############################################################ spring.activemq.broker-url = tcp://localhost:61616 spring.activemq.in-memory = true spring.activemq.pool.enabled = false spring.activemq.packages.trust-all = true spring.activemq.packages.trust-all:ObjectMessage的使用机制是不安全的,ActiveMQ自5.12.2之后,强制Consumer端声明一份可信任的包列表,只有当ObjectMessage中的Object在可信任包内,才能被提取出来。该配置表示信任所有的包。

    5.生产者开发 在项目目录“/src/main/java/com/leichuangkj/activemq”下新建“/producer”目录,并在producer目录下新建MoodProducer类,具体代码如下。

    @Service public class MoodProducer { @Resource private JmsMessagingTemplate jmsMessagingTemplate; public void sendMessage(Destination destination, final Mood mood){ jmsMessagingTemplate.convertAndSend(destination, mood); } } JmsMessagingTemplate:是对JmsTemplate的封装,用来发消息的工具类,参数destination是发送到的队列,另一参数可以是String类型的message,也可以是自定义的po对象。

    6.dao层开发 首先在项目目录“/src/main/java/com/leichuangkj/activemq”下新建“/dao/po”目录,并在po目录下新建Mood实体类,具体代码如下。

    @Data @NoArgsConstructor @AllArgsConstructor @ToString public class Mood implements Serializable{ private Integer id; private String userId; private String content; private Date publishTime; }

    然后在dao目录下新建mapper目录,并在mapper目录下新建MoodMapper接口,具体代码如下。

    public interface MoodMapper { int insertSelective(Mood record); }

    最后在resource目录下新建“mybatis/mapper”目录,并在mapper目录下新建MoodMapper.xml文件,具体代码如下。

    <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.leichuangkj.activemq.dao.mapper.MoodMapper"> <insert id="insertSelective" parameterType="com.leichuangkj.activemq.dao.po.Mood"> insert into mood <trim prefix="(" suffix=")" suffixOverrides=","> <if test="id != null"> id, </if> <if test="userId != null"> user_id, </if> <if test="content != null"> content, </if> <if test="publishTime != null"> publish_time, </if> </trim> <trim prefix="values (" suffix=")" suffixOverrides=","> <if test="id != null"> #{id,jdbcType=INTEGER}, </if> <if test="userId != null"> #{userId,jdbcType=VARCHAR}, </if> <if test="content != null"> #{content,jdbcType=VARCHAR}, </if> <if test="publishTime != null"> now(), </if> </trim> </insert> </mapper>

    7.service层开发 在项目目录“/src/main/java/com/leichuangkj/activemq”下新建service目录,并在service目录下新建IMood接口,具体代码如下。

    public interface IMood { void save(Mood mood); String asynSave(Mood mood); }

    然后在service目录下新建impl目录,并在impl目录下新建MoodImpl实现类,具体代码如下。

    @Service public class MoodImpl implements IMood{ private static Destination destination = new ActiveMQQueue("steven.queue"); @Autowired MoodProducer moodProducer; @Autowired MoodMapper moodMapper; @Override public void save(Mood mood) { moodMapper.insertSelective(mood); } @Override public String asynSave(Mood mood) { moodProducer.sendMessage(destination,mood); return "成功!"; } }

    8.消费者开发 在项目目录“/src/main/java/com/leichuangkj/activemq”下新建“/consumer”目录,并在consumer目录下新建MoodConsumer类,具体代码如下。

    @Component public class MoodConsumer { @Autowired MoodImpl moodImpl; @JmsListener(destination = "steven.queue") public void receiveQueue(Mood mood){ moodImpl.save(mood); } } @JmsListener:使用JmsListener配置消费者监听的队列steven.queue,其中mood是接收到的消息对象。

    9.controller层开发 在项目目录“/src/main/java/com/leichuangkj/activemq”下新建controller目录,并在controller目录下新建MoodController类,具体代码如下。

    @Controller @RequestMapping("/mood") public class MoodController { @Autowired MoodImpl moodImpl; @RequestMapping(value = "/saveMood",method = RequestMethod.GET) @ResponseBody public String saveMood(){ Mood mood = new Mood(); mood.setId(1); mood.setUserId("21026"); mood.setContent("hello world!"); mood.setPublishTime(new Date()); String msg = moodImpl.asynSave(mood); return "发表说说:" + msg; } }

    10.启动项目 在启动类ActivemqApplication上添加注解“@MapperScan(basePackages = “com.leichuangkj.activemq.dao.mapper”)”,然后启动项目。

    @MapperScan(basePackages = "com.leichuangkj.activemq.dao.mapper") @SpringBootApplication public class ActivemqApplication { public static void main(String[] args) { SpringApplication.run(ActivemqApplication.class, args); } }

    11.测试 启动项目,然后在postman中请求“http://localhost:8080/mood/saveMood”,可以看到返回“发表说说:成功!”的信息,在数据库中也可以看到发表的说说记录,测试结果如下图所示。 在浏览器中访问“http://127.0.0.1:8161/admin/xml/queues.jsp”查看steven.queue队列的消费情况,具体如下图所示。

    12.工程目录结构

    Processed: 0.012, SQL: 9