Spring Cloud Gateway 源码解析(3) —— Predicate

    技术2023-07-01  108

    目录

    RoutePredicateFactory 

    GatewayPredicate

    AfterRoutePredicateFactory

    RoutePredicateHandlerMapping 

    FilteringWebHandler

    handle


    RoutePredicateFactory 

    RoutePredicateFactory 涉及到的类在 org.springframework.cloud.gateway.handler.predicate 包下。Spring Cloud Gateway 创建 Route 对象时,使用 RoutePredicateFactory 创建 Predicate 对象。Predicate 对象可以赋值给 Route.predicate 属性,用于匹配请求对应的 Route 。

    public interface RoutePredicateFactory<C> extends ShortcutConfigurable, Configurable<C> { String PATTERN_KEY = "pattern"; // useful for javadsl default Predicate<ServerWebExchange> apply(Consumer<C> consumer) { C config = newConfig(); //由实现类实现的。 consumer.accept(config); beforeApply(config); return apply(config); } default AsyncPredicate<ServerWebExchange> applyAsync(Consumer<C> consumer) { C config = newConfig(); consumer.accept(config); beforeApply(config); return applyAsync(config); } default void beforeApply(C config) { } Predicate<ServerWebExchange> apply(C config);

    具体实现类见:Spring Cloud Gateway介绍(一),此处仅给一个实现类的源码解析。

    大部分工厂返回的Predicate为接口GatewayPredicate的匿名实现类。

    GatewayPredicate

    接口GatewayPredicate继承于Predicate<T>,增加了3个默认方法:and,or,negative,分别返回3个实现类。

    public interface GatewayPredicate extends Predicate<ServerWebExchange> { @Override default Predicate<ServerWebExchange> and(Predicate<? super ServerWebExchange> other) { return new AndGatewayPredicate(this, wrapIfNeeded(other)); } @Override default Predicate<ServerWebExchange> negate() { return new NegateGatewayPredicate(this); } @Override default Predicate<ServerWebExchange> or(Predicate<? super ServerWebExchange> other) { return new OrGatewayPredicate(this, wrapIfNeeded(other)); } }

     

    AfterRoutePredicateFactory

    配置:

    spring: cloud: gateway: routes: # ===================================== - id: after_route uri: http://example.org predicates: - After=2017-01-20T17:42:47.789-07:00[America/Denver]

    源码:

    配置类:

    public static class Config { @NotNull private ZonedDateTime datetime; public ZonedDateTime getDatetime() { return datetime; } public void setDatetime(ZonedDateTime datetime) { this.datetime = datetime; } }

    apply方法:

    @Override public Predicate<ServerWebExchange> apply(Config config) { return new GatewayPredicate() { @Override public boolean test(ServerWebExchange serverWebExchange) { final ZonedDateTime now = ZonedDateTime.now(); //当前时间晚于配置的时间 return now.isAfter(config.getDatetime()); } @Override public String toString() { return String.format("After: %s", config.getDatetime()); } }; }

    RoutePredicateHandlerMapping 

    一个请求被Gateway处理过程如下(见Spring Cloud Gateway介绍(一)):

    org.springframework.web.reactive.DispatcherHandler :接收到请求,匹配 HandlerMapping ,此处会匹配到 RoutePredicateHandlerMapping 。RoutePredicateHandlerMapping :接收到请求,匹配 Route 。FilteringWebHandler :获得 Route 的 GatewayFilter 数组,创建 GatewayFilterChain 处理请求。

    RoutePredicateHandlerMapping ,匹配 Route ,并返回处理 Route 的 FilteringWebHandler 。

    RoutePredicateHandlerMapping 继承org.springframework.web.reactive.handler.AbstractHandlerMapping,并覆盖了getHandlerInternal方法。

    public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { private final FilteringWebHandler webHandler; private final RouteLocator routeLocator; private final Integer managementPort; private final ManagementPortType managementPortType; public RoutePredicateHandlerMapping(FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) { this.webHandler = webHandler; this.routeLocator = routeLocator; this.managementPort = getPortProperty(environment, "management.server."); this.managementPortType = getManagementPortType(environment); setOrder(1); setCorsConfigurations(globalCorsProperties.getCorsConfigurations()); } } @Override protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { // don't handle requests on management port if set and different than server port if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return lookupRoute(exchange) // .log("route-predicate-handler-mapping", Level.FINER) //name this .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug( "Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))); } protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes() .concatMap(route -> Mono.just(route).filterWhen(r -> { // add the current route we are testing exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) // instead of immediately stopping main flux due to error, log and // swallow it .doOnError(e -> logger.error( "Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) // .defaultIfEmpty() put a static Route not found // or .switchIfEmpty() // .switchIfEmpty(Mono.<Route>empty().log("noroute")) .next() // TODO: error handling .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); return route; }); }

    FilteringWebHandler

    SimpleHandlerAdapter#handle(ServerWebExchange, Object) 调用 FilteringWebHandler#handle(ServerWebExchange) 方法,处理请求。

    FilteringWebHandler 通过创建请求对应的 Route 对应的 GatewayFilterChain 进行处理。

    public class FilteringWebHandler implements WebHandler { protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class); private final List<GatewayFilter> globalFilters; public FilteringWebHandler(List<GlobalFilter> globalFilters) { this.globalFilters = loadFilters(globalFilters); } }

    handle

    @Override public Mono<Void> handle(ServerWebExchange exchange) { Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); //获取Filters。 List<GatewayFilter> gatewayFilters = route.getFilters(); //合并所有Filters,排序。 List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters); // TODO: needed or cached? AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } //构造Chain。 return new DefaultGatewayFilterChain(combined).filter(exchange); }

     

    private static class DefaultGatewayFilterChain implements GatewayFilterChain { @Override public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < filters.size()) { GatewayFilter filter = filters.get(this.index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); // complete } }); } }

     

    Processed: 0.010, SQL: 9