谢谢参考:https://blog.csdn.net/CNZYYH/article/details/85696674
一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。 定义:
生产者消费者模式 :生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示: 由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。
在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。 从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。
Redis List的主要操作为lpush/lpop/rpush/rpop四种,分别代表从头部和尾部的push/pop,除此之外List还提供了两种pop操作的阻塞版本blpop/brpop,用于阻塞获取一个对象。
Redis通常都被用做一个处理各种后台工作或消息任务的消息服务器。 一个简单的队列模式就是:生产者把消息放入一个列表中,等待消息的消费者用 RPOP 命令(用轮询方式), 或者用 BRPOP 命令(如果客户端使用阻塞操作会更好)来得到这个消息。
以下列举SpringBoot集成redis的JedisCluster和RedisTemplate
引入依赖到pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>application.yml增加redis集群配置
spring: redis: password: clusters: 10.10.1.238:7000,10.10.1.238:7001,10.10.1.238:7002,10.10.1.238:7003,10.10.1.238:7004,10.10.1.238:7005RedisConfig配置
package com.jzlife.servantapplet.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.cache.interceptor.KeyGenerator; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheConfiguration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.cache.RedisCacheWriter; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.lang.reflect.Field; import java.time.Duration; @Configuration //必须加,使配置生效 @EnableCaching public class RedisConfig extends CachingConfigurerSupport { private static final Logger lg = LoggerFactory.getLogger(RedisConfig.class); /**redis密码**/ @Value("${spring.redis.password}") public String password; @Value("${spring.redis.clusters}") public String cluster; @Bean public KeyGenerator keyGenerator() { return (target, method, params) -> { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } lg.info("自动生成Redis Key -> [{}]", sb.toString()); return sb.toString(); }; } /** * @Description: 使用@Cacheable注解的时候会将返回的对象缓存起来,默认缓存的值是二进制的, * 为此我们自定义序列化配置,改成JSON格式的 */ @Bean public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) { RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory()); RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofHours(1))// 设置缓存有效期一小时 .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer())); return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration); } @Bean(name="redistemp") public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory jedisConnectionFactory ) { //设置序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(jedisConnectionFactory); RedisSerializer stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer); // key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value序列化 redisTemplate.setHashKeySerializer(stringSerializer); // Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); // Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } public Object getFieldValueByObject (Object object , String targetFieldName) throws Exception { // 获取该对象的Class Class objClass = object.getClass(); // 获取所有的属性数组 Field[] fields = objClass.getDeclaredFields(); for (Field field:fields) { // 属性名称 field.setAccessible(true); String currentFieldName = field.getName(); if(currentFieldName.equals(targetFieldName)){ return field.get(object); // 通过反射拿到该属性在此对象中的值(也可能是个对象) } } return null; } /** * 通过反射获取JedisCluster * @param factory * @return */ @Bean public JedisCluster redisCluster(RedisConnectionFactory factory){ Object object =null; try { object= getFieldValueByObject(factory,"cluster"); } catch (Exception e) { e.printStackTrace(); } return (JedisCluster)object; } @Bean(name="factory") public RedisConnectionFactory factory(RedisClusterConfiguration clusterConfig){ JedisConnectionFactory redisConnectionFactory=new JedisConnectionFactory(clusterConfig); String redisPassword = password; redisConnectionFactory.setPassword(redisPassword); redisConnectionFactory.setPoolConfig(createJedisPoolConfig()); redisConnectionFactory.setTimeout(30000); redisConnectionFactory.setUsePool(true); return redisConnectionFactory; } @Bean(name="clusterConfig") public RedisClusterConfiguration clusterConfig(){ RedisClusterConfiguration config = new RedisClusterConfiguration(); String[] nodes = cluster.split(","); lg.info("redis--clusterConfig---"+cluster); for(String node : nodes){ String[] host = node.split(":"); RedisNode redis = new RedisNode(host[0], Integer.parseInt(host[1])); config.addClusterNode(redis); } return config; } public JedisPoolConfig createJedisPoolConfig(){ JedisPoolConfig config = new JedisPoolConfig(); //连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true config.setBlockWhenExhausted(false); //设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数) config.setEvictionPolicyClassName("org.apache.commons.pool2.impl.DefaultEvictionPolicy"); //是否启用pool的jmx管理功能, 默认true config.setJmxEnabled(true); //MBean ObjectName = new ObjectName("org.apache.commons.pool2:type=GenericObjectPool,name=" + "pool" + i); 默 认为"pool", JMX不熟,具体不知道是干啥的...默认就好. config.setJmxNamePrefix("pool"); //是否启用后进先出, 默认true config.setLifo(true); //最大空闲连接数, 默认8个 config.setMaxIdle(20);//2000 //最大连接数, 默认8个 config.setMaxTotal(5000); //获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1 config.setMaxWaitMillis(10000); //逐出连接的最小空闲时间 默认1800000毫秒(30分钟) config.setMinEvictableIdleTimeMillis(180000);//3分钟 //最小空闲连接数, 默认0 config.setMinIdle(0); //每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3 config.setNumTestsPerEvictionRun(3); //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略) config.setSoftMinEvictableIdleTimeMillis(1800000); //在获取连接的时候检查有效性, 默认false config.setTestOnBorrow(false); //在空闲时检查有效性, 默认false config.setTestWhileIdle(false); //逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1 config.setTimeBetweenEvictionRunsMillis(-1); return config; } }RedisService.java辅助类
package com.example.myframe.redis.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.*; import org.springframework.stereotype.Service; import java.io.Serializable; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @Service("myRedisService") public class RedisService { @Autowired private RedisTemplate redisTemplate; /** * 写入缓存 * @param key * @param value * @return */ public boolean set(final String key, Object value) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存设置时效时间 * @param key * @param value * @return */ public boolean set(final String key, Object value, Long expireTime) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 批量删除对应的value * @param keys */ public void remove(final String... keys) { for (String key : keys) { remove(key); } } /** * 批量删除key * @param pattern */ public void removePattern(final String pattern) { Set<Serializable> keys = redisTemplate.keys(pattern); if (keys.size() > 0){ redisTemplate.delete(keys); } } /** * 删除对应的value * @param key */ public void remove(final String key) { if (exists(key)) { redisTemplate.delete(key); } } /** * 判断缓存中是否有对应的value * @param key * @return */ public boolean exists(final String key) { return redisTemplate.hasKey(key); } /** * 读取缓存 * @param key * @return */ public Object get(final String key) { Object result = null; ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); result = operations.get(key); return result; } /** * 哈希 添加 * @param key * @param hashKey * @param value */ public void hmSet(String key, Object hashKey, Object value){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); hash.put(key,hashKey,value); } /** * 哈希 获取哈希的key集合 * @param key * @return */ public Set<Object> hmKeys(String key){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); return hash.keys(key); } /** * 哈希 删除哈希的key * @param key * @param hashKey */ public void hmDelete(String key,String hashKey){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); hash.delete(key, hashKey); } /** * 哈希获取数据 * @param key * @param hashKey * @return */ public Object hmGet(String key, Object hashKey){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); return hash.get(key,hashKey); } /** * 获取所有key值 * @param key * @return */ public Set<Object> hmKeySet(String key){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); return hash.keys(key); } /** * 获取所有key值 * @param key * @return */ public void hmRemove(String key, Object hashKey){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); hash.delete(key, hashKey); } /** * 列表添加 * @param k * @param v */ public void lPush(String k,Object v){ ListOperations<String, Object> list = redisTemplate.opsForList(); list.rightPush(k,v); } /** * 列表获取 * @param k * @param l * @param l1 * @return */ public List<Object> lRange(String k, long l, long l1){ ListOperations<String, Object> list = redisTemplate.opsForList(); return list.range(k,l,l1); } /** * 集合添加 * @param key * @param value */ public void add(String key,Object value){ SetOperations<String, Object> set = redisTemplate.opsForSet(); set.add(key,value); } /** * 集合获取 * @param key * @return */ public Set<Object> setMembers(String key){ SetOperations<String, Object> set = redisTemplate.opsForSet(); return set.members(key); } /** * 集合长度 * @param key * @return */ public Long setSize(String key){ SetOperations<String, Object> set = redisTemplate.opsForSet(); return set.size(key); } /** * 集合获取 * @param key * @param count * @return */ public Set<Object> setMembers(String key, int count){ SetOperations<String, Object> set = redisTemplate.opsForSet(); return set.distinctRandomMembers(key, count); } /** * 删除集合数据 * @param key * @param value */ public void remove(String key, Object value){ SetOperations<String, Object> set = redisTemplate.opsForSet(); set.remove(key, value); } /** * 有序集合添加 * @param key * @param value * @param scoure */ public void zAdd(String key,Object value,double scoure){ ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); zset.add(key,value,scoure); } /** * 有序集合获取 * @param key * @param scoure * @param scoure1 * @return */ public Set<Object> rangeByScore(String key,double scoure,double scoure1){ ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); return zset.rangeByScore(key, scoure, scoure1); } /** * 消息队列实现 * @param channel * @param message */ public void convertAndSend(String channel, Object message){ redisTemplate.convertAndSend(channel, message); } /** * 数列添加 * @param key * @param value */ public void addList(String key,Object value){ ListOperations<String, Object> list = redisTemplate.opsForList(); list.rightPush(key, value); } /** * 数列获取 * @param key * @return */ public List<Object> getList(String key){ ListOperations<String, Object> list = redisTemplate.opsForList(); return list.range(key, 0, list.size(key)); } /** * 左弹出数列 * @param key * @return */ public Object popList(String key) { ListOperations<String, Object> list = redisTemplate.opsForList(); return list.leftPop(key); } public Long increment(String k, Long l) { return redisTemplate.opsForValue().increment(k, l); } }TestRedisController.java
package com.example.myframe.redis; import com.example.myframe.redis.service.RedisService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.JedisCluster; import java.util.List; @Component public class RedisUtil { @Autowired JedisCluster jedisCluster; @Autowired private RedisService redisService; /** * 生产者 */ public void dealShengChangZhe() { for (int i = 0; i < 10; i++) { jedisCluster.rpush("ceshi", "value1_" + i); } } /** * 消费者 */ public void dealXiaoFeiZhe() { while (true) { //阻塞式brpop,List中无数据时阻塞,参数0表示一直阻塞下去,直到List出现数据 List<String> listingList = jedisCluster.blpop(0, "ceshi"); System.out.println("线程取数据:{}" + listingList.get(1)); } } /** * 发布订阅模式 */ public void dealFaBuDingYue() { redisService.convertAndSend("dealFaBuDingYue", "我是来发布信息的"); } }redis消息队列监听信息
package com.example.myframe.redis.msg; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; @Configuration public class RedisMsgListener { @Autowired Receiver receiver; /** * redis消息队列监听信息 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("dealFaBuDingYue")); return container; } /** * 监听方法 * * @return */ @Bean(name = "listenerAdapter") MessageListenerAdapter listenerAdapter() { // 回调数据处理方法 return new MessageListenerAdapter(receiver, "dealJt"); } } package com.example.myframe.redis.msg; import com.example.myframe.redis.service.RedisService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @Service("receiver") public class Receiver{ @Autowired private RedisService redisService; @Autowired private StringRedisTemplate redisTemplate; /** * 清除外部广告位本地缓存 * @param message */ public void dealJt(String message){ System.out.println("我是用来监听信息的"); System.out.println(message); } /** * 清除外部广告位本地缓存 * @param message */ public void dealJt1(String message){ System.out.println("我是用来监听信息的1"); System.out.println(message); } }