和服务提供者端的ExecuteLimitFilter相似,ActiveLimitFilter是消费者端的过滤器,显示的是客户端的并发数。
<!-- 限制com.foo.BarService的每个方法在每个客户端的并发执行数(或占用连接的请求数)不能超过10个 --> <dubbo:service interface="com.foo.BarService" actives="10" />或 <dubbo:reference interface="com.foo.BarService" actives="10" /> <!-- 限制com.foo.BarService的sayHello方法在每个客户端的并发执行数(或占用连接的请求数)不能超过10个 --> <dubbo:service interface="com.foo.BarService" > <dubbo:method name="sayHello" actives="10" /> </dubbo:service> 或 <dubbo:reference interface="com.foo.BarService" > <dubbo:method name="sayHello" actives="10" /> </dubbo:reference>如果<dubbo:service>和<dubbo:reference>都配置了actives,则<dubbo:reference>优先。如果设置了acties小于等于0,则不作并发限制。
ActiveLimitFilter的具体实现逻辑:
获取参数。获取方法名、最大并发数等参数,为下面逻辑做准备。如果达到限流阈值,和服务提供者端的逻辑并不一样,并不是直接抛出异常,而是先等待直到超时,因为请求是有timeout属性的。当并发数达到阈值时,会先加锁抢占当前解耦的RpcStatus对象,然后通过wait方法进行等待。此时会有两种结果: 第一种是某个Invoker在调用结束后,并发把计数器原子-1并处罚一个norify,会有一个在wait状态的线程被唤醒并继续执行逻辑。第二种是wait等待超时都没有被唤醒,此时直接抛出异常3. 如果满足调用阈值,则直接进行调用,成功或失败都会原子-1对应并发计数。最后会唤醒一个等待中的线程。
注意:这种方式限流是有问题的,在大并发场景下容易出现超时限流阈值的情况。例如:当10个线程同时获取当前并发数时,都发现还差1个计数达到阈值,此时10个线程都符合要求并往下执行,即超了9个限制。不过该问题在新版本中已经修复。
ConsumerContextFilter会和ContextFilter配合使用。因为在微服务环境中,有很多链式调用,如A->B->C->D。收到请求时,当前节点可以被看做一个服务提供者,由ContextFilter设置上下文。当发起请求到下一个服务时,当前服务变为一个消费者,由ConsumerContextFilter设置上下文。
其工作逻辑主要如下:
设置当前请求上下文,如Invoker信息、地址信息、端口信息等;服务调用。清除Response上下文,然后继续调用过滤器链的下一个节点;清除上下文信息。每次服务调动完成,都会把附件上下文清除,如隐式传参。DEprecatedFilter会检查所有调用,如果方法已经通过dubbo:parameter设置了deprecated=true,则会打印一段ERROR级别的日志。这个错误日志只会打印一次,判断是否打印过的key维度是:接口名+方法名。打印过的key都会加入一个Set中保存,后续就不会再打印了:
FutureFilter主要实现框架在调用前后出现异常时,触发调用胡勇配置的回调方法,如以下配置:
<!-- 用户编写的回调方法,里面又onreturn、onthrow、oninvoke几个方法 --> <bean id="callBack" class="com.test.CallBack" /> <!-- 在testMethod的调用前后出现异常时,分别调用回调类的方法 --> <dubbo:reference id="testService" interface="com.testService"> <dubbo:method name="testMetho" onreturn="callBack.onreturn" onthrow="callBack.onthrow" oninvoke="callBack.oninvoke" /> </dubbo:reference>调用前的回调实现很简单,由于整个逻辑是在过滤器链中执行的,FutureFilter在执行下一个节点的invoke方法前调用oninvoke回调方法就能实现调用前的回调。方法在服务引用初始化的时候就会配置文件中的回调方法保存到ConsumerMethodModel中,后续使用的时候,直接取出来就可以调用。不过需要注意的是,oninvke回调只对异步调用有效。
当调用有返回结果的时候,会执行FutureFilter#onResponse的逻辑。对于同步调用的方法,则直接判断返回的result是否有异常,有异常则同步调用ontrow回调方法,没有异常则同步调用onreturn回调方法。对于异步调用,会通过CompletableFuture的thenApply方法来执行onthrow或onreturn的回调。CompletableFuture是JDK8中的新特性。