不知道小伙伴们是否经历过当年风靡一时的游戏《地下城与勇士》,曾记得当年一听说周年庆,网吧满满的都挂这款游戏。不过总有人流泪的是网络连接中断,可谓是欲哭无泪。当年企鹅就是为了限流,因为我们的程序,它处理请求的能力是有限的,一旦请求多到超出它的处理极限就会崩溃。限流是保证系统高可用的重要手段! 由于互联网公司的流量巨大,系统上线会做一个流量峰值的评估,尤其是像各种促销活动,为了保证系统不被巨大的流量压垮,会在系统流量到达一定阈值时,拒绝掉一部分流量。(所以这么多年也是找到答案了。哭)
一、计数器 在JAVA内部也可以通过原子类计数器AtomicInteger、Semaphore信号量来做简单的限流。
package cn.enjoyedu.ch3; import java.util.concurrent.atomic.AtomicInteger; /** * java计数器实现限流 */ public class Counter { // 限流的个数 private int maxCount = 10; // 指定的时间内 private long interval = 60; // 原子类计数器 private AtomicInteger atomicInteger = new AtomicInteger(0); // 起始时间 private long startTime = System.currentTimeMillis(); public boolean limit(int maxCount, int interval) { atomicInteger.addAndGet(1); if (atomicInteger.get() == 1) { startTime = System.currentTimeMillis(); atomicInteger.addAndGet(1); return true; } // 超过了间隔时间,直接重新开始计数 if (System.currentTimeMillis() - startTime > interval * 1000) { startTime = System.currentTimeMillis(); atomicInteger.set(1); return true; } // 还在间隔时间内,check有没有超过限流的个数 if (atomicInteger.get() > maxCount) { return false; } return true; } }二、漏桶算法 漏桶算法思路很简单,我们把水比作是请求,漏桶比作是系统处理能力极限,水先进入到漏桶里,漏桶里的水按一定速率流出,当流出的速率小于流入的速率时,由于漏桶容量有限,后续进入的水直接溢出(拒绝请求),以此实现限流。(紧参考不是重点)
三、网关层限流 限流常在网关这一层做,比如Nginx、Openresty、kong、zuul、Spring Cloud Gateway等,而像spring cloud - gateway网关限流底层实现原理,就是基于Redis + Lua,通过内置Lua限流脚本的方式。
我通过自定义注解、aop、Redis + Lua 实现限流,步骤会比较详细,请有耐心。 1、环境准备 springboot 项目创建: 2、准备maven依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>3、配置application.properties文件中提前搭建好的 redis 服务地址和端口 4、配置RedisTemplate实例
package com.example.current.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.io.Serializable; /** * 配置RedisTemplate实例 **/ @Configuration public class RedisHelper { @Bean public RedisTemplate<String, Serializable> limitRedisTemplate(LettuceConnectionFactory redisConnectionFactory) { RedisTemplate<String, Serializable> template = new RedisTemplate<>(); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setConnectionFactory(redisConnectionFactory); return template; } }限流类型枚举类
package com.example.current.enums; /**** * 限流类型 */ public enum LtType { /** * 自定义key */ KEY, /** * 请求者的IP */ IP; }5、自定义注解 然后自定义一个@Current注解,注解类型为ElementType.METHOD即作用于方法上。
package com.example.current.annotation; import com.example.current.enums.LtType; import java.lang.annotation.*; /** * 自定义限流注解 * **/ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface Current { /** * 名字 */ String names() default ""; /** * key */ String keys() default ""; /** * Key的前缀 */ String prefixs() default ""; /** * 给定的时间范围 单位(秒) */ int period(); /** * 一定时间内最多访问次数 */ int counts(); /** * 限流的类型(用户自定义key 或者 请求ip) */ LtType LtType() default LtType.KEY; }6、切面代码实现
package com.example.current.config; import com.example.current.annotation.Current; import com.example.current.enums.LtType; import com.google.common.collect.ImmutableList; import io.lettuce.core.Limit; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import java.io.Serializable; import java.lang.reflect.Method; /** * * 限流切面实现 * */ @Aspect @Configuration public class CurrentAspect { private static final Logger logger = LoggerFactory.getLogger(CurrentAspect.class); private static final String UNKNOWN = "unknown"; private final RedisTemplate<String, Serializable> RedisTemplate; @Autowired public CurrentAspect(RedisTemplate<String, Serializable> limitRedisTemplate) { this.RedisTemplate = limitRedisTemplate; } /** * 配置切面 * */ public Object interceptor(ProceedingJoinPoint pjp) { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); Current CurrentAnnotation = method.getAnnotation(Current.class); LtType ltTypeType = CurrentAnnotation.LtType(); String name = ltTypeType.name(); String key; int currentPeriod = CurrentAnnotation.period(); int currentCount = CurrentAnnotation.counts(); /** * 根据限流类型获取不同的key ,如果不传我们会以方法名作为key */ switch (ltTypeType) { case IP: key = getIpAddress(); break; case KEY: key = CurrentAnnotation.keys(); break; default: key = StringUtils.upperCase(method.getName()); } ImmutableList<String> keys = ImmutableList.of(StringUtils.join(CurrentAnnotation.prefixs(), key)); try { String lua = sendLuaScript(); RedisScript<Number> redisScript = new DefaultRedisScript<>(lua, Number.class); Number count = RedisTemplate.execute(redisScript, keys, currentCount, currentPeriod); logger.info("Access try count is {} for name={} and key = {}", count, name, key); if (count != null && count.intValue() <= currentCount) { return pjp.proceed(); } else { throw new RuntimeException("不好意思,你进黑名单了"); } } catch (Throwable e) { if (e instanceof RuntimeException) { throw new RuntimeException(e.getLocalizedMessage()); } throw new RuntimeException("服务器异常"); } } /*** * 编写 redis Lua 限流脚本 * @return */ private String sendLuaScript() { StringBuilder lua = new StringBuilder(); lua.append("local c"); lua.append("c = redis.call('get',KEYS[1])"); // 调用不超过最大值,则直接返回 lua.append("if c and tonumber(c) > tonumber(ARGV[1]) then"); lua.append("return c;"); lua.append("end"); // 执行计算器自加 lua.append("c = redis.call('incr',KEYS[1])"); lua.append("if tonumber(c) == 1 then"); // 从第一次调用开始限流,设置对应键值的过期 lua.append("redis.call('expire',KEYS[1],ARGV[2])"); lua.append("end"); lua.append("return c;"); return lua.toString(); } /*** * 获取id地址 * @return ip */ private String getIpAddress() { HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); String ip = request.getHeader("x-forwarded-for"); if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("WL-Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getRemoteAddr(); } return ip; } }7、控制层实现 将@Current注解作用在需要进行限流的接口方法上,下边我们给方法设置@Current注解,在10秒内只允许放行2个请求,直观一点用AtomicInteger计数。
package com.example.current.controller; import com.example.current.annotation.Current; import com.example.current.enums.LtType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.atomic.AtomicInteger; /** * 限流控制 */ @RestController public class CurrentController { //模拟两个计数 private static final AtomicInteger ATOMIC__1 = new AtomicInteger(); private static final AtomicInteger ATOMIC__2 = new AtomicInteger(); @Current(keys = "currentTest1", period = 10, counts= 3) @GetMapping("/currentTest1") public int testLimiter1() { return ATOMIC__1.incrementAndGet(); } @Current(keys = "customer_limit_test", period = 10, counts = 2, LtType = LtType.KEY) @GetMapping("/currentTest2") public int testLimiter2() { return ATOMIC__2.incrementAndGet(); } }8、测试
测试「预期」:连续请求2次均可以成功,第3次请求被拒绝。接下来看一下是不是我们预期的效果
可以看到第三次请求时,应用直接拒绝了请求,说明我们的 Springboot + aop + lua 限流方案搭建成功。 总结 好好学习,好好搬砖!