MQ中间件通常有ActiveMQ、RabbitMQ、Kafka等等。除此之外,Redis也可以用作基于“发布/订阅”模型的消息推送,不过Redis实现的是一种简单的消息队列,不仅在可靠性方面比不上其他专业的消息中间件,而且Redis的消息推送也不支持Topic分组、点对点模型的消息队列。
如果我们已经在项目中使用Redis作数据缓存,同时我们的消息推送数量也不大,对可靠性要求也不是特别高,那么我们就可以使用Redis来实现消息队列了。
今天我们就基于redis 来实现一个发布-订阅的消息队列
(1) 在pom.xml文件中添加相关依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.mscloudmesh</groupId> <artifactId>springboot-redis-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-redis-mq</name> <description>springboot-redis-mq</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>(2)、添加redis配置类信息
package com.mscloudmesh.topic.config; import com.mscloudmesh.topic.receive.Receiver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import java.util.concurrent.CountDownLatch; @Configuration public class RedisConfigMQ { private final String DEFAULT_TOPIC="myTopic"; @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic(DEFAULT_TOPIC)); return container; } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean Receiver receiver(CountDownLatch latch) { return new Receiver(latch); } @Bean CountDownLatch latch() { return new CountDownLatch(1); } }(3)、消息接收者实现
import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; public class Receiver { private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); private CountDownLatch latch; @Autowired public Receiver(CountDownLatch latch) { this.latch = latch; } public void receiveMessage(String message) { LOGGER.info("接收消息: <" + message + ">"); latch.countDown(); } }(4)、添加一个消息发送者
package com.mscloudmesh.topic.controller; import com.mscloudmesh.topic.sender.SenderMQ; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class Sender { @Autowired private RedisTemplate<String, Object> redisTemplate; @GetMapping("/send") public String send(String msg) { try { redisTemplate.convertAndSend("myTopic", msg); return "消息发送成功!"; }catch (Exception e){ e.printStackTrace(); return "消息发送失败!"; } } } (5)、主程序入口 package com.mscloudmesh; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession; @SpringBootApplication public class RedisApplication { public static void main(String[] args) { SpringApplication.run(RedisApplication.class, args); } }(6)、添加application.yml配置文件
server: port: 8080 spring: application: name: springboot-redis-mq redis: host: 127.0.0.1 port: 6379 database: 0 password: jedis: pool: max-active: 200 max-wait: -1 max-idle: 10 min-idle: 0 timeout: 1000最后,开始测试一下 http://localhost:8080/send?msg=hello redis 控制台输出:
至此全部完成。。。