Apollo 通过 Spring Mvc DeferredResult 实现长轮询服务推送

    技术2025-01-08  15

    DeferredResult 字面意思就是推迟结果,是在 Servlet 3.0 以后引入了异步请求之后,在 Spring 3.2 版本封装了一下支持了 Servlet 这个异步请求。DeferredResult 可以允许容器中的线程快速释放以便可以接受更多的请求提升吞吐量,让真正的业务逻辑在其他的工作线程中去完成。

    最近在看 Apollo 配置中心的实现原理,Apollo 的发布配置推送变更消息就是用 DeferredResult 实现的。Apollo 客户端会循环的向服务端发送长轮训 Http 请求,超时时间 60 秒 。当超时后返回客户端一个 Http Status 为 304 状态码的时候表明配置没有变更,客户端继续这个步骤重复发起请求。当有发布配置的时候,服务端会调用 DeferredResult.setResult 返回 200 状态码,然后轮训请求会立即返回(不会超时),客户端收到服务端的响应结果后,会发起向 Apollo 服务端请求获取变更后的配置信息。

    1、模拟 Apollo 通知客户端消息更新

    这里我们通过使用 Spring Boot 来简单的模拟一下 Apollo 是如何通过 Spring MVC DeferredResult 来实现长轮询服务推送的。

    1.1 Bootstrap.java

    Bootstrap 类做为 Spring Boot 启动的启动类,通过实现 WebMvcConfigurer 配置 Spring MVC 支持异步的线程池。设置了一个用来异步执行业务逻辑的工作线程池,设置了默认的超时时间是 60 秒。

    @SpringBootApplication public class Bootstrap implements WebMvcConfigurer { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Bean public ThreadPoolTaskExecutor mvcTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setQueueCapacity(100); executor.setMaxPoolSize(25); return executor; } @Override public void configureAsyncSupport(AsyncSupportConfigurer configurer) { configurer.setTaskExecutor(mvcTaskExecutor()); configurer.setDefaultTimeout(60000L); } }

    1.2 GlobalExceptionHandler.java

    GlobalExceptionHandler 定义 Spring MVC 全局异常拦截类。当 Spring Mvc DeferredResult 处理请求的时候,如果超时没有响应结果就会抛出 AsyncRequestTimeoutException 异常。通过拦截此异常,返回 304 状态码告诉客户端当前命名空间的配置文件并没有更新。

    @ControllerAdvice class GlobalExceptionHandler { protected static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class); @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码 @ResponseBody @ExceptionHandler(AsyncRequestTimeoutException.class) //捕获特定异常 public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) { logger.info("handleAsyncRequestTimeoutException"); } }

    1.3 测试 Controller 返回 DeferredResult

    @RestController public class ApolloController { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //guava中的Multimap,多值map,对map的增强,一个key可以保持多个value private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create()); //模拟长轮询 @RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html") public DeferredResult<String> watch(@PathVariable("namespace") String namespace) { logger.info("Request received"); DeferredResult<String> deferredResult = new DeferredResult<>(); //当deferredResult完成时(不论是超时还是异常还是正常完成),移除watchRequests中相应的watch key deferredResult.onCompletion(new Runnable() { @Override public void run() { System.out.println("remove key:" + namespace); watchRequests.remove(namespace, deferredResult); } }); watchRequests.put(namespace, deferredResult); logger.info("Servlet thread released"); return deferredResult; } //模拟发布namespace配置 @RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html") public Object publishConfig(@PathVariable("namespace") String namespace) { if (watchRequests.containsKey(namespace)) { Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace); Long time = System.currentTimeMillis(); //通知所有watch这个namespace变更的长轮训配置变更结果 for (DeferredResult<String> deferredResult : deferredResults) { deferredResult.setResult(namespace + " changed:" + time); } } return "success"; } }

    首先我们通过 Postman 请求地址 http://localhost:8080/watch/123 不做其它操作,请求会被挂起,等待 60 秒后,会拋出 AsyncRequestTimeoutException 异常,全局异常捕获会返回 302 状态码。这样就表示期间配置没有被更改过。

    然后我们再次通过 Postman 请求地址 http://localhost:8080/watch/123,在超时时间(60 s) 之前请求 http://localhost:8080/publish/123 表示配置变更。这时 watch 地址会立即返回,并且状态码为 200,这时客户端就根据状态码就知道配置变更了。

    使用用了一个MultiMap来存放所有轮训的请求,Key 对应的是 namespace ,value 对应的是所有 watch 这个 namespace 变更的异步请求DeferredResult,需要注意的是:在DeferredResult 完成的时候记得移除 MultiMap中 相应的 key ,避免内存溢出请求。采用这种长轮询的好处是,相比一直循环请求服务器,实例一多的话会对服务器产生很大的压力,http长轮询的方式会在服务器变更的时候主动推送给客户端,其他时间客户端是挂起请求的,这样同时满足了性能和实时性。

    2、Servlet 3.0 AsyncListener 异步支持

    在Servlet 3.0版本之前,Servlet线程需要一直阻塞,直到业务处理完毕才能再输出响应,最后才结束该Servlet线程。而有了异步处理特性,Servlet线程不再需要一直阻塞,在接收到请求之后,Servlet线程可以将耗时的操作委派给另一个线程来完成,自己在不生成响应的情况下返回至容器。针对业务处理较耗时的情况,这可以大幅度降低服务器的资源消耗,并且提高并发处理速度。

    在 Servlet 3.0 之后新增了一组接口 AsyncListener、AsyncContext 与 AsyncContextCallback 用于支持异步处理,下面我们就针对接口来分析一下 Sevlet 是如何实现异步支持的。

    AsyncListener.java

    public interface AsyncListener extends EventListener { // 异步处理完成执行 void onComplete(AsyncEvent event) throws IOException; // 异步处理超时执行 void onTimeout(AsyncEvent event) throws IOException; // 异步处理错误执行 void onError(AsyncEvent event) throws IOException; // 异步处理开始时执行 void onStartAsync(AsyncEvent event) throws IOException; }

    实现了 java.util.EventListener ,通过实现它来达到异步处理的目的。里面定义了 4 个方法,分别对应异步处理执行不同的状态后的回调执行方法。

    AsyncContext.java

    public interface AsyncContext { ServletRequest getRequest(); ServletResponse getResponse(); void dispatch(); void dispatch(String path); void dispatch(ServletContext context, String path); void complete(); void start(Runnable run); void addListener(AsyncListener listener); void addListener(AsyncListener listener, ServletRequest request, ServletResponse response); void setTimeout(long timeout); long getTimeout(); }

    实现AsyncContext接口的对象可以通过startAsync方法从ServletRequest获得。有了AsyncContext后,可以使用它通过complete()方法完成对请求的处理,也可以使用下面描述的其中的一个 dispatch 方法。

    下面的方法可以用来分发从 AsyncContext 的请求:

    dispatch(path) :这个方法接受一个字符串参数,该参数描述 ServletContext 作用域内的路径。此路径必须相对于 ServletContext 的根,并以’ / '开头。dispatch(servletContext, path):这个方法方法接受一个字符串参数,该参数描述指定的 ServletContext 作用域内的路径。此路径必须相对于指定的 ServletContext 的根,并以’ / '开头。dispatch():这个方法不带参数。它使用原始 URI 作为路径。如果 AsyncContext 是通过 startAsync(ServletRequest, ServletResponse)初始化的,并且传递的请求是 HttpServletRequest 的一个实例,那么分派到 HttpServletRequest. getrequesturi() 返回的 URI。否则,该分派将在容器最后一次分派请求时发送到该请求的 URI .

    在等待异步事件发生时,应用程序可以调用 AsyncContext 接口的一个分派方法。如果在 AsyncContext 上调用了complete(),则必须抛出 IllegalStateException。分派方法的所有变体都立即返回,并且不提交响应。

    公开给目标 servlet 的请求对象的路径元素必须反映 AsyncContext.dispatch 中指定的路径。

    AsyncContextCallback.java

    public interface AsyncContextCallback { public void fireOnComplete(); /** * Reports if the web application associated with this async request is * available. * * @return {@code true} if the associated web application is available, * otherwise {@code false} */ public boolean isAvailable(); }

    AsyncContextCallback 是一个回调函数,它包装了多个 AsyncListener,用于执行异步事件完成后调用 AsyncListener#onComplete 。

    注意:Servlet 3.0 异步处理的整个流程包括以下两个步骤:

    调用 ServletRequest#startAsync 开始 Servlet 里面的异步执行调用 AsyncContext#dispatch 完成整个异步处理

    3、DeferredResult 实现原理

    在 Spring Mvc 3.2 后就可以使用 Servlet 3.0 之后的异步处理,对于 Spring mvc 框架使用者,我们只需要返回值定义为 DeferredResult 就可以实现这个功能,可以参考章节 1 的例子。下面我们就来看一下 Spring mvc 是如何集成 Servlet 3.0 中的异步处理的。

    3.1 StandardServletAsyncWebRequest.java

    实现 javax.servlet.AsyncListener 接口,集成 Serlvet 3.0 异步处理。同时也实现了 Spring 自定义的接口 org.springframework.web.context.request.async.AsyncWebRequest 。这个接口抽象了 Servlet 3.0 javax.servlet.AsyncContext 里面的异步t处理的核心方法。

    AsyncWebRequest.java

    public interface AsyncWebRequest extends NativeWebRequest { // 设置超时时间 void setTimeout(@Nullable Long timeout); // 设置异步处理超时间处理类 void addTimeoutHandler(Runnable runnable); // 设置异步处理异常处理类 void addErrorHandler(Consumer<Throwable> exceptionHandler); // 设置异步处理完成处理类 void addCompletionHandler(Runnable runnable); // 开始异步处理 void startAsync(); // 是否异步处理开始 boolean isAsyncStarted(); // 分发请求 void dispatch(); // 是否异步处理完成 boolean isAsyncComplete(); }

    这其实也是 Spring 的一种编程思想,对外部接口进行接口抽象,当 Servlet 容器替换的时候,只需要实现另外一种 Servlet 容器就可以了,对于 Spring Mvc 内部调用异步处理的操作并没有修改。

    3.2 WebAsyncUtils.java

    这个是用于关联处理异步 Web 请求的工具类,从 Spring Mvc 3.2 版本开始就有这个类了。

    /** * Utility methods related to processing asynchronous web requests. * * @author Rossen Stoyanchev * @author Juergen Hoeller * @since 3.2 */ public abstract class WebAsyncUtils { public static final String WEB_ASYNC_MANAGER_ATTRIBUTE = WebAsyncManager.class.getName() + ".WEB_ASYNC_MANAGER"; public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) { WebAsyncManager asyncManager = null; Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); if (asyncManagerAttr instanceof WebAsyncManager) { asyncManager = (WebAsyncManager) asyncManagerAttr; } if (asyncManager == null) { asyncManager = new WebAsyncManager(); servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager); } return asyncManager; } public static WebAsyncManager getAsyncManager(WebRequest webRequest) { int scope = RequestAttributes.SCOPE_REQUEST; WebAsyncManager asyncManager = null; Object asyncManagerAttr = webRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, scope); if (asyncManagerAttr instanceof WebAsyncManager) { asyncManager = (WebAsyncManager) asyncManagerAttr; } if (asyncManager == null) { asyncManager = new WebAsyncManager(); webRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager, scope); } return asyncManager; } public static AsyncWebRequest createAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) { return new StandardServletAsyncWebRequest(request, response); } }

    它的里面有三个核心的方法:

    getAsyncManager(servletRequest):通过 ServletRequest (原生 Servlet 请求 API)获取到 WebAsyncManager ,它是用于 Web 异步管理的getAsyncManager(webRequest):通过 WebRequest (Spring Mvc 封装的 Servlet 请求接口)获取到 WebAsyncManager ,它是用于 Web 异步管理的createAsyncWebRequest(request, response) :创建 AsyncWebRequest 接口实例,也就是上面的 StandardServletAsyncWebRequest 对象,它实现了 Servlet 3.0 javax.servlet.AsyncContext ,可以对处理异步是对异步请求的一个包装。

    WebAsyncManager 这个类非常重要,我们后续的流程中来具体的分析它。

    3.3 Spring Mvc 处理异步流程

    在这里并不会完整的讲解 Spring Mvc 处理 Http 请求的完整请求,只会说明 Spring Mvc 在整个流程里面处理异步请求的差异点。如果对 Spring Mvc 处理 Http 的完整流程不太清楚可以观看博主之前写的 – Spring MVC DispatcherServlet。

    3.3.1 调用 Controller 之前添加 WebAsyncManager

    Spring MVC 处理每次 http 请求的时候都会调用以下方法为当前 ServletRequest 设置 WebAsyncManager。每个 WebAsyncManager 是和一次 ServletRequest 绑定的。

    WebAsyncUtils#getAsyncManager(javax.servlet.ServletRequest)

    servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager);

    它的调用入口是 RequestMappingHandlerAdapter#invokeHandlerMethod:

    RequestMappingHandlerAdapter#invokeHandlerMethod

    protected ModelAndView invokeHandlerMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception { ServletWebRequest webRequest = new ServletWebRequest(request, response); try { WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod); ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory); ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod); if (this.argumentResolvers != null) { invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers); } if (this.returnValueHandlers != null) { invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers); } invocableMethod.setDataBinderFactory(binderFactory); invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer); ModelAndViewContainer mavContainer = new ModelAndViewContainer(); mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request)); modelFactory.initModel(webRequest, mavContainer, invocableMethod); mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect); AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response); asyncWebRequest.setTimeout(this.asyncRequestTimeout); // 为当前请求绑定 WebAsyncManager WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); asyncManager.setTaskExecutor(this.taskExecutor); asyncManager.setAsyncWebRequest(asyncWebRequest); asyncManager.registerCallableInterceptors(this.callableInterceptors); asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors); if (asyncManager.hasConcurrentResult()) { Object result = asyncManager.getConcurrentResult(); mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0]; asyncManager.clearConcurrentResult(); LogFormatUtils.traceDebug(logger, traceOn -> { String formatted = LogFormatUtils.formatValue(result, !traceOn); return "Resume with async result [" + formatted + "]"; }); invocableMethod = invocableMethod.wrapConcurrentResult(result); } invocableMethod.invokeAndHandle(webRequest, mavContainer); if (asyncManager.isConcurrentHandlingStarted()) { return null; } return getModelAndView(mavContainer, modelFactory, webRequest); } finally { webRequest.requestCompleted(); } }

    3.3.2 DeferredResult 返回值处理

    这个方法之后就是调用 URI 对应的 Spring Mvc Controller 里面的处理类,因为 Controller 里面的 RequestMapping 方法的返回值是 DeferredResult ,所以它对应的返回值处理类是 DeferredResultMethodReturnValueHandler ,在这个返回值处理类当中,它的核心代码是:

    DeferredResultMethodReturnValueHandler#handleReturnValue

    WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);

    通过 WebAsyncUtils#getAsyncManager(org.springframework.web.context.request.WebRequest),获取到之前在调用 Controller 之前设置的 WebAsyncManager 通过 WebAsyncManager#startDeferredResultProcessing 来进行异步处理。下面我们就来看一下 WebAsyncManager 是如何进行异步调用的。

    3.3.3 WebAsyncManager 处理异步请求

    上面已经说过,在 DeferredResult 的返回值处理器中会调用WebAsyncManager#startDeferredResultProcessing来进行异步调用,下面我们就来分析一下这个方法中是如何处理的。

    WebAsyncManager#startDeferredResultProcessing

    public void startDeferredResultProcessing( final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { Assert.notNull(deferredResult, "DeferredResult must not be null"); Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); Long timeout = deferredResult.getTimeoutValue(); if (timeout != null) { this.asyncWebRequest.setTimeout(timeout); } List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>(); interceptors.add(deferredResult.getInterceptor()); interceptors.addAll(this.deferredResultInterceptors.values()); interceptors.add(timeoutDeferredResultInterceptor); final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); this.asyncWebRequest.addTimeoutHandler(() -> { try { interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } }); this.asyncWebRequest.addErrorHandler(ex -> { try { if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) { return; } deferredResult.setErrorResult(ex); } catch (Throwable interceptorEx) { setConcurrentResultAndDispatch(interceptorEx); } }); this.asyncWebRequest.addCompletionHandler(() -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult)); interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult); startAsyncProcessing(processingContext); try { interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult); deferredResult.setResultHandler(result -> { result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result); setConcurrentResultAndDispatch(result); }); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } }

    上面这个方法代码看着挺多其实做的事情如下:

    AsyncWebRequest.setTimeout(timeout):如果 DeferredResult 设置了超时时间,就设置异步调用的超时时间,一般使用 使用 AsyncSupportConfigurer 配置异步执行线程池与超时时间,如章节 1 的示例。AsyncWebRequest.addTimeoutHandler(runnable):添加一个超时时间处理类,默认使用 TimeoutDeferredResultProcessingInterceptor,它在异步调用超时时会抛出 AsyncRequestTimeoutException 异常。同时也会会调用 DeferredResultProcessingInterceptor#handleTimeout 链,默认为空。AsyncWebRequest.addErrorHandler(runnable):添加一个异步执行异常处理类,会调用 DeferredResultProcessingInterceptor#handleError 方法链,默认为空。AsyncWebRequest.addCompletionHandler(runnable):添加一个异步执行完成处理类DeferredResultProcessingInterceptor#afterCompletion 方法链,默认为空。调用 DeferredResultProcessingInterceptor#beforeConcurrentHandling方法链默认为空实现。开始异步执行 WebAsyncManager#startAsyncProcessing ,调用 AsyncWebRequest#startAsync 接口开始异步处理调用 DeferredResultInterceptorChain#applyPreProcess 方法链,默认为空。调用 DeferredResult#setResultHandler 方法,设置当 DeferredResult 异步设置返回值时的处理方法,核心方法。

    上面的方法很多,最核心的是两个方法。

    WebAsyncManager#startAsyncProcessing,调用 AsyncWebRequest#startAsync 接口开始异步处理,在这里他用调用 AsyncWebRequest 对象的实例 StandardServletAsyncWebRequest#startAsync,它即实现了 Spring 提供的异步请求处理接口 AsyncWebRequest 又实现了javax.servlet.AsyncListener,集成了 Servlet 3.0 的处理。它会把之前设置的回调方法添加到 javax.servlet.AsyncContext 当中。并且执行 ServletRequest#startAsync 开始 Servlet 里面的异步执行。调用 DeferredResult#setResultHandler 方法,设置当 DeferredResult 异步设置返回值时的处理方法,核心方法。它会调用 WebAsyncManager#setConcurrentResultAndDispatch ,然后会调用 StandardServletAsyncWebRequest#dispatch ,最终会调用到 AsyncContext.dispatch()。

    3.3.4 Spring Mvc 的实现方式

    以实现 Servlet 3.0 的 Spring 举例,异步操作其实是基于事件的处理。当调用 ServletRequest#startAsync 会有一个开始处理的事件,然后调用 AsyncContext.dispatch() 异步请求完成,会完成当前的请求。

    在 Controller 的 RequestMapping 方法中定义一个 DeferredResult 响应对象, Spring Mvc 的返回值处理器会调用 ServletRequest#startAsync 的方法,设置一系统的回调方法(不是重点),核心是以下代码:

    WebAsyncManager#startDeferredResultProcessing

    deferredResult.setResultHandler(result -> { result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result); setConcurrentResultAndDispatch(result); }); // 调用 AsyncContext.dispatch() 完成整个异步处理 private void setConcurrentResultAndDispatch(Object result) { synchronized (WebAsyncManager.this) { if (this.concurrentResult != RESULT_NONE) { return; } this.concurrentResult = result; } if (this.asyncWebRequest.isAsyncComplete()) { if (logger.isDebugEnabled()) { logger.debug("Async result set but request already complete: " + formatRequestUri()); } return; } if (logger.isDebugEnabled()) { boolean isError = result instanceof Throwable; logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri()); } this.asyncWebRequest.dispatch(); }

    所以当调用 DeferredResult.setResult 方法就会触发 AsyncContext.dispatch() 方法调用, Servlet 异步处理的回调,这时 Tomcat 就会响应结果,完成整个方法的调用。

    参考文章:

    springmvc - 通过DeferredResult实现长轮询服务端推送消息Servlet 3.0的AsyncListener接口<Servlet 规范 3.0 – Dispatching Requests>
    Processed: 0.010, SQL: 9