@Activate注解上可以设置group属性,从而设定某些过滤器只有在服务提供者端才生效。
1. AccessLogFilter的使用:
AccessLogFilter是一个日志过滤器,如果想记录服务每一次的请求日志,则可以开启这个过滤器。虽然AccessLogFilter有@Activate注解,默认会被激活,但还是需要手动配置来开启日志的打印。有两种方式来配置开启AccessLogFilter:
<!-- ①将日志输出到应用本身的log中 --> <dubbo:protocol accessog="true" /> <!-- 只是某个服务提供者或消费者打印log --> <dubbo:provider accesslog="default" /> <!-- ②将日志输出到指定的文件 --> <dubbo:protocol accesslog="custom-access.log" />将日志输出到应用本身的日志组件(如log4j、logback等)时,可以配置accesslog的值为true或default。如果想输出到指定路径的文件,则可以直接设置accesslog的值作为文件路径。
2. AccessLogFilter的实现:
主要分为构造方法和invoke方法 两大逻辑。在AccessLogFilter的构造方法中加锁并初始化一个定时线程池ScheduledThreadPool。该线程池只有在指定了输出的log文件时才会用到,ScheduledThreadPool中的线程会定时把队列中的日志数据写入文件。在构造方法中主要是初始化线程池,而打印日志的逻辑主要在invoker方法中,其逻辑如下:
获取参数。获取上下问、接口名、版本、分组信息等参数,用于日志的构建;构建日志字符串。根据步骤1中的数据开始组装日志,最终会得到一个日志字符串;日志打印。如果用户配置了使用应用本身的日志组件,则直接通过封装的LoggerFactory打印日志;如果用户配置了日志要输出到自定义的文件中,则会把日志加入一个ConcurrentMap中暂存,key是自定义的accesslog值(如accesslog="custom-access.log"),value就是对应的日志集合。后续等待定时线程不断遍历整个Map,把日志写入对应的文件。这里有两个问题需要注意:首先由于Set集合是无序的,因此日志输出到文件也是无序的;其次由于是异步刷盘,突然宕机会导致一小部分日志丢失。
ExecuteLimitFilter用于限制每个服务中每个方法的最大并发数,有接口级别和方法级别的配置方式。
<!-- 每个方法的并发执行数(或占用线程池线程数)不能超过10个 --> <dubbo:service interface="com.foo.BarService" executes="10" /> <!-- 限制com.foo.BarService的sayHello方法的并发执行数(或占用线程池线程数)不能超过10个 --> <dubbo:service interface="com.foo.BarService"> <dubbo:method name="sayHello" executes="10" /> </dubbo:service>如果不设置,则默认不做限制;如果设置了小于等于0的数值,那么也不会做任何限制。
其实现原理是:在框架中使用一个ConcurrentMap缓存了并发数的计数器。为每个请求URL生成一个IdentityString,并以此为key;再以每个IdentityString生成一个RpcStatus对象,并以此为value。RpcStatus对象用于记录对应的并发数。在过滤器中,会以try-catch-finally的形式调用过滤器链的下一个节点。因此,在开始调用之前,会通过URL获得RpcStatus对象,把对象中的并发计数器原子+1,在finally中再将原子-1。只要在计数器+1的回收,发现当前计数比设置的最大并发数大时,就会抛出异常,提示已经超过最大并发数,请求就会被终止并直接返回。
ClassLoaderFilter主要的工作是:切换当前工作线程的累加器到接口的类加载器,以便和接口的类加载器上下文一起工作。
ContextFilter主要记录每个请求的调用上下文。每个调用都有可能产生很多中间临时信息,我们不可能要求在每个接口上都加一个上下文的参数,然后一路往下传。通常做法都是放在ThreadLocal中,作为一个全局参数,当前线程中的任何一个地方都可以直接读写上下文信息。
ContextFilter就是统一在过滤器中处理请求的上下文信息,它为每个请求维护一个RpcContext对象,该对象中维护两个InternalThreadLocal(它是优化过的ThreadLocal),分别记录local和server的上下文。每次收到或发起RPC调用的时候,上下文信息都会发生该改变。
例如:A调用B,B调用C。当A调用B且B还未调用C时,RpcContext中保存A调用B的上下文;当B开始调用C的时候,RpcContext中保存B调用C的上下文。发起调用的上下文是由CnsumerContextFIlter实现的,这个是消费者端的过滤器。
ContextFilter的主要逻辑如下:
清除异步属性。防止异步属性传到过滤器链的下一个环节;设置当前请求的上下文,如Invoker信息、地址信息、端口信息等。如果前面的过滤器已经对上下文设置了一些附件信息(attachments是一个Map,里面可以保存各种key-value数据),则和Invoker的附件信息合并;调用过滤器链的下一个节点;清除上下文信息。对于异步调用的场景,即使是同一个线程,处理不同的请求也会创建一个新的RpcContext对象。因此调用完成后,需要清理对应的上下文信息。它的关注点不在于捕获异常,而是为了找到那些返回的自定义异常,但异常类可能不存在与消费者端,从而防止消费者序列化失败。对于所有没有声明Unchecked的方法抛出的异常,ExceptionFilter会把未引入的异常包装到RuntimeException中,并把遗产原因字符串化后返回。因此,ExceptionFilter的逻辑都在onResponse方法中。
ExceptionFilter过滤器会打印出ERROR级别的错误日志,但并不会处理泛化调用,即Invoker的接口是GenericService。
onResponse方法中的逻辑:
判断是否是泛化调用。如果是泛化调用则直接不处理了;直接抛出一些异常。如果异常是Java自带异常,并且是必须显示使用try-catch来捕获的异常,则直接抛出。如果异常在Invoker的签名中出现,则直接抛出异常,并打印error log。如果异常类和接口在容一个jar包中,则直接抛出异常;如果是JDK中的异常,则直接抛出;如果是Dubbo中定义的异常,则直接抛出。处理在步骤2中无法处理的异常。把异常转化为字符串,并包装成一个RuntimeException放入RpcResult中返回。TimeoutFilter主要是日志类型的过滤器,它会记录每个Invoker的调用时间,如果超过了接口设置的timeout值,则会打印一条警告日志,并不会干扰业务的正常运行:
在Dubbo中,如果某些服务提供者不想让消费者绕过注册中心直连自己,则可以使用令牌验证。总体的工作原理是,服务提供者在发布自己的服务时会生成令牌,与服务一起注册到注册中心。消费者必须通过注册中心才能获取有令牌的服务提供者的URL。TokenFilter是在服务提供者端生效的过滤器,它的工作就是对请求的令牌做校验。
开启令牌校验的配置方式:
<!-- 全局设置开启令牌验证 --> <!-- 随机Token令牌,使用UUID生成 --> <dubbo:provider interface="com.foo.BarService" token="true" /> <!-- 固定Token令牌,相当于密码 --> <dubbo:provider interface="com.foo.BarService" token="123456" /> <!-- 服务级别的设置 --> <dubbo:service interface="com.foo.BarService" token="true" /> <dubbo:service interface="com.foo.BarService" token="true" /> <!-- 协议级别设置 --> <dubbo:protocol name="dubbo" token="true" /> <dubbo:protocol nae="dubbo" token="123456" />整个令牌的工作流程:
消费者从注册中心获取提供者包含令牌的URL。消费者RPC调用设置令牌。具体是在RpcInvocation的构造方法中,把服务提供者的令牌设置到附件中一起请求服务提供者。服务提供者认证令牌。TokenFilter的工作原理很简单,收到请求后,首先检车这个暴露出去的服务是否有令牌信息。如果有,则获取请求中的令牌,如果和接口的令牌匹配,则通过认证,否则认证失败并抛出异常。
TpsLimitFilter主要用于服务提供者的限流。我们会发现在org.apache.dubbo.rpc.Filter这个SPI配置文件中,并没有TpsLimitFilter的配置,因此如果需要使用,则用户要自己添加对应的配置。
TpsLimitFilter的限流是基于令牌的,即一个事件段内只分配N个令牌,每个请求过来都会消耗一个令牌,耗完即止,后面再来的请求都会被拒绝。限流对象的维度支持分组、版本和接口级别,默认通过interface + group + version作为唯一标识来判断是否超过最大值。
<!-- 每次发放1000个令牌 --> <dubbo:parameter key="tps" value="1000" /> <!-- 令牌刷新的间隔是1秒,如果不配置,则默认是60秒 --> <dubbo:parameter key="tps.interval" value="1000" />具体的实现逻辑主要关注DefaultTPSLimiter#isAllowable,会用这个方法判断是否触发限流,如果不触发就直接通过了。
isAllowable方法的逻辑如下:
获取URL中的参数,包含每次发放的令牌数、令牌刷新时间间隔。如果设置了每次发放的令牌则开始限流校验。DefaultTPSLimiter内部用一个ConcurrentMap缓存每个接口的令牌数,key是interface + group + version,value是一个StatItem对象,它包装了令牌刷新时间间隔、每次发放的令牌数等属性。首先判断上次发放令牌的时间点到现在是否超过时间间隔了,如果超过了就重新发放令牌,之前没用完的不会叠加,而是直接覆盖。然后,通过CAS的方式-1令牌,减掉后令牌数如果小于0则会触发限流。