项目配置多Redis数据源(Redis分片)

    技术2023-09-27  79

    背景

      随着项目使用量的增长,前期为解决高峰请求设置大量缓存应对的方式对单一Redis的压力越来越大,经常收到大量报警,网络IO占用很大,因此打算进行Redis分片,将流量较均衡分散到多台Redis上。

    目标

    支持多Redis数据源;支持@Cacheable/@CachePut/@CacheEvict缓存注解;

    方案

      基于jedis的ShardedJedis扩展开发,因其可以根据Jedis j = getShard(key)将key较均匀的分散到各个实例中(具体选择过程可看代码详解,大致就是初始化的会根据实例数及各实例的权重,将每个实例分成权重值*160个虚拟节点并做成TreeMap存储,再加一个map2存储实例和配置的映射,每次getShard(key)先去虚拟节点map找到符合的虚拟节点对应的实例,再从map2获取实例的配置信息),我们所要做的工作就是进行封装,包括模版、管理器、缓存等(ShardedJedisTemplate、ShardedJedisCache、ShardedJedisCacheManager);

    详细代码

    首先是yml配置,配置遵循一个格式redis://xxxx@localhost:6379/9,9表示的db索引值 redis: jedisInfos: - host: redis://localhost:37379/0 connectionTimeout: 3000 pool: max-total: 10 # 连接池最大连接数 max-idle: 10 # 连接池中的最大空闲连接 min-idle: 5 # 连接池中的最小空闲连接 max-wait-millis: 2000 # 连接池最大阻塞等待时间(使用负值表示没有限制) jedis配置信息类,用于读取yml中的配置信息并注入bean(这边主要就是配置格式注意下,根据自己需求可以添加需要用的字段) import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import redis.clients.jedis.JedisPoolConfig; import java.util.List; /** * 多节点配置类 * jedis配置信息 */ @Configuration @ConfigurationProperties(prefix = "redis") public class ShardedJedisProperties { private List<JedisInfo> jedisInfos; private JedisPoolConfig pool; //ignore getter/setter public static class JedisInfo { private int connectionTimeout; //host被URI解析,模版格式为 "redis://xxxx@localhost:6379/9"; 9为db private String host; //ignore getter/setter } } jedis连接工厂的配置,这边主要就是用到上面的jedis的配置信息 import redis.clients.jedis.ShardedJedis; import redis.clients.jedis.ShardedJedisPool; /** * 连接工厂 */ public class ShardedJedisConnectionFactory { private ShardedJedisPool shardedJedisPool; /** * 获取连接 */ public ShardedJedis getConnection() { return shardedJedisPool.getResource(); } //ignore partial getter/setter } 自定义jedis模版(ShardedJedisTemplate),下面只提供部分代码,只是对所有的ShardedJedis做了一层简单封装获取工厂连接,(注意某些参数有时候是byte[]有时候是string,自行对key进行转换)。 import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.Assert; import redis.clients.jedis.*; import java.util.*; public class ShardedJedisTemplate<K, V> { private ShardedJedisConnectionFactory factory; private RedisSerializer keySerializer; private RedisSerializer valueSerializer; //ignore getter/setter /** * 设置单个值 * * @param key * @param value * @return */ public String set(String key, String value) { try (ShardedJedis shardedJedis = factory.getConnection()) { return shardedJedis.set(key, value); } } /** * 获取单个值 * * @param key * @return */ public String get(String key) { try (ShardedJedis shardedJedis = factory.getConnection()) { return shardedJedis.get(key); } } } 自定义cache,毕竟需要支持缓存注解 import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.Cache; import org.springframework.cache.support.SimpleValueWrapper; import org.springframework.data.redis.cache.DefaultRedisCachePrefix; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.SerializationException; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.util.Assert; import java.util.concurrent.Callable; public class ShardedJedisCache implements Cache { private static final Logger log = LoggerFactory.getLogger(ShardedJedisCache.class); private ShardedJedisTemplate shardedJedisTemplate; private String name; private RedisSerializer keySerializer; private RedisSerializer valueSerializer; private long defaultExpiration = 0; public ShardedJedisCache(String name, ShardedJedisTemplate shardedJedisTemplate) { this.name = name; this.shardedJedisTemplate = shardedJedisTemplate; this.keySerializer = shardedJedisTemplate.getKeySerializer() == null ? new StringRedisSerializer() : shardedJedisTemplate.getKeySerializer(); this.valueSerializer = shardedJedisTemplate.getValueSerializer() == null ? new GenericJackson2JsonRedisSerializer() : shardedJedisTemplate.getValueSerializer(); } @Override public ValueWrapper get(Object key) { // log.debug("======get from cache '{}' by key : {} ======", name, key); try { byte[] computeKey = computeKey(key); byte[] bs = shardedJedisTemplate.get(computeKey); return (bs == null ? null : new SimpleValueWrapper(deserializeValue(bs))); } catch (SerializationException e) { log.error("cache get error", e); return null; } } @Override public <T> T get(Object key, Class<T> type) { ValueWrapper wrapper = this.get(key); return wrapper == null ? null : (T) wrapper.get(); } @Override public <T> T get(Object key, Callable<T> callable) { ValueWrapper wrapper = this.get(key); return wrapper == null ? null : (T) wrapper.get(); } @Override public void put(Object key, Object value) { // log.debug("======put to cache '{}' with key : {} ======", name, key); byte[] k = computeKey(key); byte[] v = rawValue(value); shardedJedisTemplate.setAndExpire(k, v, defaultExpiration); } @Override public ValueWrapper putIfAbsent(Object key, Object value) { // log.debug("======put to cache '{}' with key : {} ======", name, key); byte[] k = computeKey(key); byte[] v = rawValue(value); String s = shardedJedisTemplate.setAndExpire(k, v, defaultExpiration); return (s == null ? null : new SimpleValueWrapper(s)); } @Override public void evict(Object key) { // log.info("======delete from cache '{}' by key : {} ======", name, key); shardedJedisTemplate.del(computeKey(key)); } @Override public void clear() { } /** * 加前缀和分隔符 默认: * 在cacheName和key之间加分隔符 * 有兴趣可以看看{@link DefaultRedisCachePrefix} * * @return * @paramkey */ private byte[] computeKey(Object key) { byte[] bytes = rawKey(key); return ArrayUtils.addAll((name.concat(":")).getBytes(), bytes); } /** * 设置Key失效时间 * * @param key */ private void expires(byte[] key) { shardedJedisTemplate.expire(key, defaultExpiration); } /** * key转字节 * * @param key * @return */ @SuppressWarnings("unchecked") private byte[] rawKey(Object key) { Assert.notNull(key, "non null key required"); if (keySerializer == null && key instanceof byte[]) { return (byte[]) key; } return keySerializer.serialize(key); } /** * value转字节 * * @param value * @return */ @SuppressWarnings("unchecked") private byte[] rawValue(Object value) { if (valueSerializer == null && value instanceof byte[]) { return (byte[]) value; } return valueSerializer.serialize(value); } @SuppressWarnings("unchecked") private Object deserializeValue(byte[] value) { if (valueSerializer == null) { return value; } return valueSerializer.deserialize(value); } 自定义jedis缓存管理器(ShardedJedisCacheManager),这样Spring就不会自行创建RedisCacheManager了,一次注解缓存就可生效了,其实注解都可以指定manager,即可以同时存在多个manager,但期望统一,没必要多个。 import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.util.*; /** * cacheManager实现类 */ public class ShardedJedisCacheManager implements CacheManager { private final ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<>(16); private volatile Set<String> cacheNames; private long defaultExpiration; private Map<String, Long> expires; private ShardedJedisTemplate<Object, Object> shardedJedisTemplate; public ShardedJedisCacheManager(ShardedJedisTemplate<Object, Object> shardedJedisTemplate) { this(shardedJedisTemplate, Collections.emptySet()); } public ShardedJedisCacheManager(ShardedJedisTemplate<Object, Object> shardedJedisTemplate, Set<String> cacheNames) { this.shardedJedisTemplate = shardedJedisTemplate; this.cacheNames = cacheNames; this.defaultExpiration = 0L; } @Override public Cache getCache(String name) { Cache cache = cacheMap.get(name); if (cache != null) { return cache; } synchronized (this.cacheMap) { cache = this.cacheMap.get(name); if (cache == null) { cache = createCache(name); } return cache; } } //可能需要一个预加载 问题不大 private Cache createCache(String cacheName) { ShardedJedisCache cache = new ShardedJedisCache(cacheName, shardedJedisTemplate); cache.setDefaultExpiration(this.computeExpiration(cacheName)); if (cacheNames == null || cacheNames.size() == 0) { cacheNames = new LinkedHashSet<>(); } cacheNames.add(cacheName); cacheMap.put(cacheName, cache); return cache; } @Override public Collection<String> getCacheNames() { return cacheNames; } //ignore partial getter/setter protected long computeExpiration(String name) { Long expiration = null; if (this.expires != null) { expiration = this.expires.get(name); } return expiration != null ? expiration : this.defaultExpiration; } } 配置类,将前面自定义的注册bean import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.ShardedJedisPool; import java.util.List; /** * jedis连接池的基本配置 * * @author bbcui */ @Configuration public class ShardedJedisConfiguration { @Autowired private ShardedJedisProperties shardedJedisProperties; public static final String SHARDED_JEDIS_POOL = "shardedJedisPool"; public static final String SHARDed_REDIS_CONNECTION_FACTORY = "shardedRedisConnectionFactory"; public static final String SHARDed_REDIS_TEMPLATE = "shardedRedisTemplate"; @Bean(name = SHARDED_JEDIS_POOL) public ShardedJedisPool shardedJedisPool() { List<JedisShardInfo> jedisShardInfoList = Lists.newArrayList(); JedisShardInfo jedisShardInfo; for (ShardedJedisProperties.JedisInfo jedisInfo : shardedJedisProperties.getJedisInfos()) { jedisShardInfo = new JedisShardInfo(jedisInfo.getHost()); jedisShardInfo.setConnectionTimeout(jedisInfo.getConnectionTimeout()); jedisShardInfoList.add(jedisShardInfo); } return new ShardedJedisPool(shardedJedisProperties.getPool(), jedisShardInfoList); } @Bean(name = SHARDed_REDIS_CONNECTION_FACTORY) public ShardedJedisConnectionFactory shardRedisConnectionFactory(@Qualifier(SHARDED_JEDIS_POOL) ShardedJedisPool shardedJedisPool) { return new ShardedJedisConnectionFactory(shardedJedisPool); } @Bean(name = SHARDed_REDIS_TEMPLATE) public ShardedJedisTemplate shardRedisTemplate(@Qualifier(SHARDed_REDIS_CONNECTION_FACTORY) ShardedJedisConnectionFactory shardedJedisConnectionFactory) { ShardedJedisTemplate shardedJedisTemplate = new ShardedJedisTemplate(); shardedJedisTemplate.setFactory(shardedJedisConnectionFactory); shardedJedisTemplate.setKeySerializer(new StringRedisSerializer()); shardedJedisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); return shardedJedisTemplate; } } 再包装一层 提供一个统一的使用入口 (部分代码省略) @Configuration @EnableCaching @EnableAutoConfiguration public class CacheConfig extends CachingConfigurerSupport { /** * 拼接符 */ private static final String SPLICE = ":"; @Autowired private ShardedJedisTemplate shardedJedisTemplate; /** * 注入bean */ @Bean(name = “shardRedisCacheManager”) @Override public ShardedJedisCacheManager cacheManager() { // 自定义缓存名对应的过期时间 Map<String, Long> expires = ImmutableMap.<String, Long>builder() .put(CacheConfig.CacheNames.CACHE_30SECS, TimeUnit.SECONDS.toSeconds(30)) .put(CacheConfig.CacheNames.CACHE_5MINS, TimeUnit.MINUTES.toSeconds(5)) .build(); ShardedJedisCacheManager manager = new ShardedJedisCacheManager(shardedJedisTemplate); //默认30分钟 manager.setDefaultExpiration(TimeUnit.MINUTES.toSeconds(30)); manager.setExpires(expires); return manager; } @Bean @Override public KeyGenerator keyGenerator() { return (target, method, params) -> { StringBuilder sb = new StringBuilder("cache:"); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { if (obj != null) { sb.append(obj.toString()); } } return sb.toString(); }; } } import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.lianjia.kaoqin.saas.common.utils.DateUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cache.Cache; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.TimeUnit; /** * 额外的缓存操作工具 * 使用shardedJedis,分片分流缓存,支持springCache注解,对外的get、put等接口都用shardedJedis; */ @Component public class CacheUtil { private static ShardedJedisCacheManager shardedJedisCacheManager; private static ShardedJedisTemplate shardedJedisTemplate; private static RedisTemplate redisTemplate; private static DefaultRedisScript<Number> redisluaScript; /** * spring启动时,自动初始化redisCacheManager和redisTemplate * <p> * 使用时不再需要进行初始操作 */ @Autowired public CacheUtil(@Qualifier(value = CacheConfig.SHARD_REDIS_CACHE_MANAGER) ShardedJedisCacheManager shardedJedisCacheManager, @Qualifier(value = ShardedJedisConfiguration.SHARDed_REDIS_TEMPLATE) ShardedJedisTemplate shardedJedisTemplate, RedisTemplate redisTemplate, @Qualifier(value = "redisluaScript") DefaultRedisScript defaultRedisScript) { this.shardedJedisCacheManager = shardedJedisCacheManager; this.shardedJedisTemplate = shardedJedisTemplate; this.redisTemplate = redisTemplate; this.redisluaScript = defaultRedisScript; } /** * 缓存数据 */ public static void put(String cache, Object key, Object value) { shardedJedisCacheManager.getCache(cache).put(key, value); } /** * 删除缓存 */ public static void evict(String cache, Object key) { shardedJedisCacheManager.getCache(cache).evict(key); } /** * 查询缓存 */ public static <T> T get(String cache, Object key) { Object val = shardedJedisCacheManager.getCache(cache).get(key); if (val instanceof Cache.ValueWrapper) { return (T) ((Cache.ValueWrapper) val).get(); } else { return null; } } /** * 缓存数据 */ public static void put(Object key, Object value, long timeout, TimeUnit timeUnit) { shardedJedisTemplate.setex(key, (int) TimeUnit.SECONDS.convert(timeout, timeUnit), value); } }

    总结

      原理较简单,本质就是自行封装一些类,但是有部分缺陷,ShardedJedis命令较少一些,可能会觉得这么麻烦还不如直接用集群,反正都有一些命令不支持(DBA不让玩),自己业务内定义分流策略可能会更好,这是DBA说的,所以只能简单做个分片了。实践下来,基本能较均匀的分流,性能没有可见的影响(hash)。   虽然key的分布比较均衡,但不代表各个实例的使用率(比如一些指标,请求量、网络IO等)差不多,因为存在一些热点key。。。这部分需要综合考虑是否使用本地缓存来解决了,但是本地缓存的引入会使得系统更加复杂,如何保证数据的一致性又成了新的挑战,建议本地缓存必要用的话存储一些只读的或者更新频率极低的数据。   项目后来仍然保留了RedisTemplate用作一些特殊场景,比如执行一些限流脚本等。。。

    Processed: 0.010, SQL: 9