SpringCloud学习系列Gateway-(3)限流篇

    技术2025-10-11  4

    目录

    前言

    简单限流

    源码分析

    扩展限流


    前言

    通常一个应用,无论是全部请求还是细化到每一个接口的请求,都会有一个每秒请求量QPS的峰值,一般会通过压测的方式评估出接口的峰值是多少,一旦超过这个峰值,应用将会响应缓慢,严重的甚至会导致应用假死。

    如何防止这种情况的发生呢?这时候就需要对请求进行限流,在某一时刻只允许一部分低于峰值的请求流量进来应用,提高系统的可用性。

    这章节就会基于SpringCloud Gateway进行限流实现。

    简单限流

    一、通过配置的方式进行限流,配置gateway的application.yml文件,如下:

    server: port: 8099 spring: application: name: gateway-frame cloud: gateway: discovery: locator: enabled: true # 服务名小写 lower-case-service-id: true routes: - id: gateway-service # lb代表从注册中心获取服务,且已负载均衡方式转发 uri: lb://gateway-service predicates: - Path=/service/** # 限流filter配置 filters: - name: RequestRateLimiter args: key-resolver: '#{@uriKeyResolver}' redis-rate-limiter.replenishRate: 300 redis-rate-limiter.burstCapacity: 300 redis: host: xxxxxxx port: 6379 password: xxxxxx database: 0 # 注册中心 eureka: instance: prefer-ip-address: true client: service-url: defaultZone: http://xxxxxxx:8761/eureka/

    限流配置组成解读:

    1、引入filte的配置-name: RequestRateLimiter,这个是使用已经实现好的RequestRateLimiterGatewayFilterFactory类进行限流;

    2、key-resolver:用于获取限流维度的实现类,可以根据ip、uri、设备号、用户id等进行限流,这里使用的uriKeyResolver对应实现使用uri限流的类;

    3、redis-rate-limiter.burstCapacity:令牌桶容量,就是没秒能够同时有多少个访问请求;

    4、redis-rate-limiter.replenishRate:令牌桶每秒的填充量

    5、因为限流类依赖于redis进行统计数据的存储,所以这里要加上redis的连接配置;

    二、编写key-resolver

    使用uri的维度限流

    import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; public class UriKeyResolver implements KeyResolver { @Override public Mono<String> resolve(ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getURI().getPath()); } }

    三、bean初始化

    @Component public class RateConfig { @Bean public UriKeyResolver uriKeyResolver() { return new UriKeyResolver(); } }

    这样就完成了应用的简单限流。测试的时候可以把令牌桶的参数设置小一点,然后使用jmeter进行压测。

    源码分析

    首先,我们来看下SpringCloud Gateway的初始化方式和路由执行方式。(gateway的maven版本:2.2.3.RELEASE)

    1、初始化配置信息,我们来看下org.springframework.cloud:spring-cloud-gateway-core包下面的META-INF/spring.factories文件;

    # Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ // 校验引用配置 org.springframework.cloud.gateway.config.GatewayClassPathWarningAutoConfiguration,\ // gateway网关的核心配置,路由等 org.springframework.cloud.gateway.config.GatewayAutoConfiguration,\ // 熔断配置 org.springframework.cloud.gateway.config.GatewayHystrixCircuitBreakerAutoConfiguration,\ org.springframework.cloud.gateway.config.GatewayResilience4JCircuitBreakerAutoConfiguration,\ // 负载均衡配置 org.springframework.cloud.gateway.config.GatewayLoadBalancerClientAutoConfiguration,\ org.springframework.cloud.gateway.config.GatewayNoLoadBalancerClientAutoConfiguration,\ org.springframework.cloud.gateway.config.GatewayMetricsAutoConfiguration,\ // redis限流配置 org.springframework.cloud.gateway.config.GatewayRedisAutoConfiguration,\ // 注册中心配置 org.springframework.cloud.gateway.discovery.GatewayDiscoveryClientAutoConfiguration,\ org.springframework.cloud.gateway.config.SimpleUrlHandlerMappingGlobalCorsAutoConfiguration,\ org.springframework.cloud.gateway.config.GatewayReactiveLoadBalancerClientAutoConfiguration org.springframework.boot.env.EnvironmentPostProcessor=\ org.springframework.cloud.gateway.config.GatewayEnvironmentPostProcessor

    这一章,我们关注redis限流配置类org.springframework.cloud.gateway.config.GatewayRedisAutoConfiguration

    class GatewayRedisAutoConfiguration { @Bean @SuppressWarnings("unchecked") public RedisScript redisRequestRateLimiterScript() { DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource( new ClassPathResource("META-INF/scripts/request_rate_limiter.lua"))); redisScript.setResultType(List.class); return redisScript; } @Bean @ConditionalOnMissingBean public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate, @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript, ConfigurationService configurationService) { return new RedisRateLimiter(redisTemplate, redisScript, configurationService); } }

    这里初始化了两个类:

    一个是RedisScript,用于执行redis的限流数据操作的lua脚本,lua文件路径定义;

    另外一个是RedisRateLimiter,负责限流判断逻辑,这里采用@ConditionalOnMissingBean,就是当没有继承RateLimiter接口类的bean进行初始化时才生效。(注意:这里就是预留的让我们进行自定义限流的实现方式,后面实现动态限流就可以从这里入手)

    那么什么时候才会用到这个RedisRateLimiter类呢,先别急,我们回头再看我们配置里的-name: RequestRateLimiter对应的工厂类RequestRateLimiterGatewayFilterFactory,限流的入口就是从这里来的:

    @ConfigurationProperties("spring.cloud.gateway.filter.request-rate-limiter") public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> { @SuppressWarnings("unchecked") @Override public GatewayFilter apply(Config config) { // 这里设置使用配置的限流维度实现类KeyResolver和限流逻辑类RateLimiter KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver); RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter); boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey); HttpStatusHolder emptyKeyStatus = HttpStatusHolder .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode)); return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY) .flatMap(key -> { if (EMPTY_KEY.equals(key)) { if (denyEmpty) { setResponseStatus(exchange, emptyKeyStatus); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } String routeId = config.getRouteId(); if (routeId == null) { Route route = exchange .getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); routeId = route.getId(); } // 执行限流判断 return limiter.isAllowed(routeId, key).flatMap(response -> { for (Map.Entry<String, String> header : response.getHeaders() .entrySet()) { exchange.getResponse().getHeaders().add(header.getKey(), header.getValue()); } if (response.isAllowed()) { return chain.filter(exchange); } setResponseStatus(exchange, config.getStatusCode()); return exchange.getResponse().setComplete(); }); }); } }

    这里就会调用RedisRateLimiter的isAllowed方法判断是否进行限流:

    @ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter") public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config> implements ApplicationContextAware { @Override @SuppressWarnings("unchecked") public Mono<Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } // 根据服务应用id获取限流配置 Config routeConfig = loadConfiguration(routeId); // 设置的令牌每秒加入数量 int replenishRate = routeConfig.getReplenishRate(); // 设置的令牌桶容量 int burstCapacity = routeConfig.getBurstCapacity(); // How many tokens are requested per request? int requestedTokens = routeConfig.getRequestedTokens(); try { // 获取redis的中的统计数据查询key List<String> keys = getKeys(id); // The arguments to the LUA script. time() returns unixtime in seconds. List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", requestedTokens + ""); // allowed, tokens_left = redis.eval(SCRIPT, keys, args) Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // .log("redisratelimiter", Level.FINER); return flux.onErrorResume(throwable -> { if (log.isDebugEnabled()) { log.debug("Error calling rate limiter lua", throwable); } return Flux.just(Arrays.asList(1L, -1L)); }).reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { /* * We don't want a hard dependency on Redis to allow traffic. Make sure to set * an alert so you know if this is happening too much. Stripe's observed * failure rate is 0.01%. */ log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); } }

    我们可以debug看下传入的参数routeId和id,分别是请求的应用服务id和uri:

    看下getKeys(id)方法,知道redis中的统计数据存储格式:

    static List<String> getKeys(String id) { String prefix = "request_rate_limiter.{" + id; String tokenKey = prefix + "}.tokens"; String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); } request_rate_limiter.{uri}.tokens 可使用的令牌数量request_rate_limiter.{uri}.timestamp  最后一个请求的时间

    扩展限流

    在实际生产中,限流参数不是一开始就定好不变,会根据实际业务情况进行相应的调整,而且一般会采用数据库存储的方式。但是由于RequestRateLimiterGatewayFilterFactory初始化后,不能实时进行限流参数的变更,那么限流的作用就要大打折扣了。

    这时候,我们可以通过继承RateLimiter接口类实现isAllow方法进行拓展成实现刷新限流配置的效果。

    1、仿照RedisRateLimiter类实现DynamicRedisRateLimiter

    import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; import org.springframework.cloud.gateway.filter.ratelimit.AbstractRateLimiter; import org.springframework.cloud.gateway.support.ConfigurationService; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Instant; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; /** * @PackageName * @ClassName DynamicRedisRateLimiter * @Description 动态限流模块 * @Auth * @Date 2020/6/20 19:02 * @Version 1.0 **/ public class DynamicRedisRateLimiter extends AbstractRateLimiter<DynamicRedisRateConfig> implements ApplicationContextAware { private static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter"; private static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript"; private static final String REMAINING_HEADER = "X-RateLimit-Remaining"; private static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate"; private static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity"; private static final String REQUESTED_TOKENS_HEADER = "X-RateLimit-Requested-Tokens"; private Log log; private ReactiveStringRedisTemplate redisTemplate; private RedisScript<List<Long>> script; private AtomicBoolean initialized; private DynamicRedisRateConfig defaultConfig; private boolean includeHeaders; private String remainingHeader; private String replenishRateHeader; private String burstCapacityHeader; private String requestedTokensHeader; public DynamicRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity, int defaultRequestedTokens) { super(DynamicRedisRateConfig.class, CONFIGURATION_PROPERTY_NAME, (ConfigurationService)null); this.log = LogFactory.getLog(this.getClass()); this.initialized = new AtomicBoolean(false); this.includeHeaders = true; this.remainingHeader = REMAINING_HEADER; this.replenishRateHeader = REPLENISH_RATE_HEADER; this.burstCapacityHeader = BURST_CAPACITY_HEADER; this.requestedTokensHeader = REQUESTED_TOKENS_HEADER; this.defaultConfig = (new DynamicRedisRateConfig()).setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity).setRequestedTokens(defaultRequestedTokens); } public DynamicRedisRateLimiter(DynamicRedisRateConfig defaultConfig) { super(DynamicRedisRateConfig.class, CONFIGURATION_PROPERTY_NAME, (ConfigurationService)null); this.log = LogFactory.getLog(this.getClass()); this.initialized = new AtomicBoolean(false); this.includeHeaders = true; this.remainingHeader = REMAINING_HEADER; this.replenishRateHeader = REPLENISH_RATE_HEADER; this.burstCapacityHeader = BURST_CAPACITY_HEADER; this.requestedTokensHeader = REQUESTED_TOKENS_HEADER; this.defaultConfig = defaultConfig; } private static List<String> getKeys(String routeKey) { String prefix = "request_rate_limiter.{" + routeKey; String tokenKey = prefix + "}.tokens"; String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); } public boolean isIncludeHeaders() { return this.includeHeaders; } public void setIncludeHeaders(boolean includeHeaders) { this.includeHeaders = includeHeaders; } public String getRemainingHeader() { return this.remainingHeader; } public void setRemainingHeader(String remainingHeader) { this.remainingHeader = remainingHeader; } public String getReplenishRateHeader() { return this.replenishRateHeader; } public void setReplenishRateHeader(String replenishRateHeader) { this.replenishRateHeader = replenishRateHeader; } public String getBurstCapacityHeader() { return this.burstCapacityHeader; } public void setBurstCapacityHeader(String burstCapacityHeader) { this.burstCapacityHeader = burstCapacityHeader; } public String getRequestedTokensHeader() { return this.requestedTokensHeader; } public void setRequestedTokensHeader(String requestedTokensHeader) { this.requestedTokensHeader = requestedTokensHeader; } public DynamicRedisRateConfig getDefaultConfig() { return defaultConfig; } public void setDefaultConfig(DynamicRedisRateConfig defaultConfig) { this.defaultConfig = defaultConfig; } public void setApplicationContext(ApplicationContext context) throws BeansException { if (this.initialized.compareAndSet(false, true)) { if (this.redisTemplate == null) { this.redisTemplate = context.getBean(ReactiveStringRedisTemplate.class); } this.script = (RedisScript)context.getBean(REDIS_SCRIPT_NAME, RedisScript.class); if (context.getBeanNamesForType(ConfigurationService.class).length > 0) { this.setConfigurationService(context.getBean(ConfigurationService.class)); } } } public Mono<Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } else { // 采用服务应用id+uri的方式组成统计唯一key,防止多应用的uri存在冲突问题 String routeKey = routeId + id; // 实时获取限流配置的地方 DynamicRedisRateConfig routeConfig = this.loadConfiguration(routeKey); int replenishRate = routeConfig.getReplenishRate(); int burstCapacity = routeConfig.getBurstCapacity(); int requestedTokens = routeConfig.getRequestedTokens(); try { List<String> keys = getKeys(routeKey); List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", requestedTokens + ""); Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); return flux.onErrorResume((throwable) -> { if (this.log.isDebugEnabled()) { this.log.debug("Error calling rate limiter lua", throwable); } return Flux.just(Arrays.asList(1L, -1L)); }).reduce(new ArrayList(), (longs, l) -> { longs.addAll(l); return longs; }).map((results) -> { boolean allowed = ((Long)results.get(0)).longValue() == 1L; Long tokensLeft = (Long)results.get(1); Response response = new Response(allowed, this.getHeaders(routeConfig, tokensLeft)); if (this.log.isDebugEnabled()) { this.log.debug("response: " + response); } return response; }); } catch (Exception var10) { this.log.error("Error determining if user allowed from redis", var10); return Mono.just(new Response(true, this.getHeaders(routeConfig, -1L))); } } } private DynamicRedisRateConfig loadConfiguration(String routeKey) { // 根据routeId到配置map中获取对应的配置 DynamicRedisRateConfig routeConfig = BizRateRule.BIZ_RATE_MAP.get(routeKey); if (routeConfig == null) { routeConfig = this.defaultConfig; } if (routeConfig == null) { throw new IllegalArgumentException("No Configuration found for routeKey " + routeKey); } else { return routeConfig; } } private Map<String, String> getHeaders(DynamicRedisRateConfig config, Long tokensLeft) { Map<String, String> headers = new HashMap<>(8); if (this.isIncludeHeaders()) { headers.put(this.remainingHeader, tokensLeft.toString()); headers.put(this.replenishRateHeader, String.valueOf(config.getReplenishRate())); headers.put(this.burstCapacityHeader, String.valueOf(config.getBurstCapacity())); headers.put(this.requestedTokensHeader, String.valueOf(config.getRequestedTokens())); } return headers; } }

    关键不同代码就是isAllow()方法中:

    这里的this.loadConfiguration(routeKey)方法实现获取动态限流配置:

    private DynamicRedisRateConfig loadConfiguration(String routeKey) { // 根据routeId到配置map中获取对应的配置 DynamicRedisRateConfig routeConfig = BizRateRule.BIZ_RATE_MAP.get(routeKey); if (routeConfig == null) { routeConfig = this.defaultConfig; } if (routeConfig == null) { throw new IllegalArgumentException("No Configuration found for routeKey " + routeKey); } else { return routeConfig; } }

    2、定义DynamicRedisRateConfig类

    import org.springframework.core.style.ToStringCreator; import org.springframework.validation.annotation.Validated; import javax.validation.constraints.Min; /** * @PackageName * @ClassName DynamicRedisRateConfig * @Description 配置类 * @Auth Sam.Chen * @Date 2020/6/20 19:10 * @Version 1.0 **/ @Validated public class DynamicRedisRateConfig { // 令牌桶每秒填充平均速率 @Min(1L) private int replenishRate; // 令牌桶总容量 @Min(0L) private int burstCapacity = 1; @Min(1L) private int requestedTokens = 1; public DynamicRedisRateConfig() { } public int getReplenishRate() { return this.replenishRate; } public DynamicRedisRateConfig setReplenishRate(int replenishRate) { this.replenishRate = replenishRate; return this; } public int getBurstCapacity() { return this.burstCapacity; } public DynamicRedisRateConfig setBurstCapacity(int burstCapacity) { this.burstCapacity = burstCapacity; return this; } public int getRequestedTokens() { return this.requestedTokens; } public DynamicRedisRateConfig setRequestedTokens(int requestedTokens) { this.requestedTokens = requestedTokens; return this; } public String toString() { return (new ToStringCreator(this)).append("replenishRate", this.replenishRate).append("burstCapacity", this.burstCapacity).append("requestedTokens", this.requestedTokens).toString(); } }

    3、定义BizRateRule.BIZ_RATE_MAP对应的缓存类BizRateRule

    import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @PackageName * @ClassName BizRateRule * @Description 存储业务限流规则 * @Auth * @Date 2020/6/20 20:28 * @Version 1.0 **/ public class BizRateRule { public static Map<String, DynamicRedisRateConfig> BIZ_RATE_MAP = new ConcurrentHashMap<>(); private static final String LOCK = "1"; /** * @Description 刷新bizMap * @Date 2020/6/20 20:34 **/ public static void refreshBizRateMap(Map<String, DynamicRedisRateConfig> newMap) { Map<String, DynamicRedisRateConfig> tmpMap = new ConcurrentHashMap<>(); tmpMap.putAll(newMap); synchronized (LOCK) { BIZ_RATE_MAP.clear(); BIZ_RATE_MAP = tmpMap; } } }

    4、定义初始化这些类的config:

    import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.HashMap; import java.util.Map; /** * @PackageName * @ClassName BizRateConfig * @Description 限流配置 * @Auth * @Date 2020/6/20 20:10 * @Version 1.0 **/ @Component public class BizRateConfig { @Value("${rate.limiter.config.default.replenishRate}") private int defaultReplenishRate=1; @Value("${rate.limiter.config.default.burstCapacity}") private int defaultBurstCapacity = 1; @Value("${rate.limiter.config.default.requestedTokens}") private int defaultRequestedTokens = 1; @Bean @Primary public DynamicRedisRateLimiter customRedisRateLimiter(DynamicRedisRateConfig defaultDynamicRedisRateConfig) { return new DynamicRedisRateLimiter(defaultDynamicRedisRateConfig); } @Bean public DynamicRedisRateConfig defaultCustomRedisRateConfig() { DynamicRedisRateConfig config = new DynamicRedisRateConfig(); config.setReplenishRate(defaultReplenishRate); config.setBurstCapacity(defaultBurstCapacity); config.setRequestedTokens(defaultRequestedTokens); return config; } @Bean public UriKeyResolver uriKeyResolver() { return new UriKeyResolver(); } @PostConstruct public void initBizData() { // 后面可以接入配置中心或者定时任务刷新数据库,这里先采用硬编码方式 Map<String, DynamicRedisRateConfig> map = new HashMap<>(4); DynamicRedisRateConfig config = new DynamicRedisRateConfig(); config.setReplenishRate(1); config.setBurstCapacity(1); config.setRequestedTokens(1); map.put("gateway-service/service/client/test", config); BizRateRule.refreshBizRateMap(map); } }

    扩展限流结束,赶紧跑起来试下!!!886.。。。。

     

    Processed: 0.012, SQL: 9