Spring MVC 异步请求处理 DeferredResult 的学习 导语 之前在学习 Apollo 配置中心的时候,发现 Apollo 通过 Spring MVC DeferredResult
实现长轮询服务推送。当时仅仅简单了解了一下,如今决定深入学习一下 Spring MVC 如何实现的 DeferredResult,期望可以解决以下问题:
什么是长轮询?除了长轮询还有什么方式?如何选择?
什么是 DeferredResult
?如何使用?
DeferredResult
的实现原理是什么?
在实际工作中什么时候会用到?
背景 什么是 DeferredResult DeferredResult
字面意思就是推迟结果,是在 Servlet 3.0 以后引入了异步请求之后,在 Spring 3.2 版本封装了一下支持了 Servlet 这个异步请求。DeferredResult
可以允许容器中的线程快速释放以便可以接受更多的请求提升吞吐量,让真正的业务逻辑在其他的工作线程中去完成。
DeferredResult 的使用场景 服务端和客户端数据传输的方式:推模型和拉模型
服务端和客户端即时数据传输主要有四种方式,它们分别是轮询、长轮询、长连接、WebSocket。它们大体可以分为两类,一种是在 HTTP 基础上实现的,包括轮询、长轮询和长连接;另一种不是在HTTP基础上实现是,即 WebSocket。
推模型-长连接 客户端与服务端建立 TCP 长连接,当服务端配置数据有变动,立刻通过建立的长连接将数据推送给客户端。
优势
长链接的优点是实时性,一旦数据变动,立即推送变更数据给客户端,而且对于客户端而言,这种方式更为简单,只建立连接接收数据,并不需要关心是否有数据变更这类逻辑的处理。
弊端
长连接可能会因为网络问题,导致不可用,也就是俗称的假死。连接状态正常,但实际上已无法通信,所以要有的心跳机制 KeepAlive 来保证连接的可用性,才可以保证配置数据的成功推送。
服务端需要维护数据状态及失败重试机制
服务端难以平衡客户端的消费速率
案例:
Dubbo 当服务提供者地址列表有变更,之策中心将基于长连接推送变更数据给消费者
Eureka 集群节点之间互相通讯,如果心跳机制检测到 client 节点有变更,在修改本地注册表后想集群中其他 server 节点发起数据同步
推模型-Web Socket WebSocket 是 Html5 定义的一个新协议,与传统的 http 协议不同,该协议可以实现服务器与客户端之间全双工通信。简单来说,首先需要在客户端和服务器端建立起一个连接,这部分需要 http。连接一旦建立,客户端和服务器端就处于平等的地位,可以相互发送数据,不存在请求和响应的区别。
优势
弊端
服务器端的逻辑非常复杂
相较于 HTTP 协议兼容性较低
拉模型-轮询 客户端主动的向服务端发请求拉取数据,常见的方式就是轮询,比如每3s向服务端请求一次配置数据。
优势
弊端
拉取频率和时效性难以平衡,导致时效性较差
效率低下,无效请求非常多
频繁创建连接
拉模型-长轮询 客户端发起请求后,服务端不会立即返回请求结果,而是将请求挂起等待一段时间,如果此段时间内服务端数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间后响应请求,客户端重新发起长链接。
长轮询是由服务端控制响应客户端请求的返回时间,来减少客户端无效请求的一种优化手段。其实对于客户端来说与短轮询的使用并没有本质上的区别。
优势
长轮询和短轮询比起来,明显减少了很多不必要的 http 请求次数,相比之下节约了资源。
弊端
案例:
消息队列 RocketMQ 和 Kafka 都选择了拉模式(指的是 Comsumer 和 Broker 之间的交互 。)
详见:消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?
Eureka 注册中心客户端 Eureka Client 会拉取、更新和缓存 Eureka Server 中的信息。因此当所有的 Eureka Server 节点都宕掉,服务消费者依然可以使用缓存中的信息找到服务提供者,但是当服务有更改的时候会出现信息不一致。
Apollo 配置中心的发布配置推送变更消息
Apollo 客户端会循环的向服务端发送长轮训 Http 请求,超时时间 60 秒 。当超时后返回客户端一个 Http Status 为 304 状态码的时候表明配置没有变更,客户端继续这个步骤重复发起请求。当有发布配置的时候,服务端会调用 DeferredResult.setResult 返回 200 状态码,然后轮训请求会立即返回(不会超时),客户端收到服务端的响应结果后,会发起向 Apollo 服务端请求获取变更后的配置信息。
如何选择推还是拉? 推拉模式的对比 :
具体的比较 1.Push 模式
推模式是服务器端根据用户需要,由目的、按时将用户感兴趣的信息主动发送到用户的客户端
Push 模式的主要优点是
对用户要求低,方便用户获取需要的信息
及时性好,服务器端即使地向客户端推送更行的动态信息
Push 模式的缺点
不能确保发送成功。Push模式采用广播方式,只有服务器端和客户端在同一个频道上,推模式才有效,用户才能接收到信息
没有信息状态跟踪。Push模式采用开环控制技术,一个信息推送后的状态,比如客户端是否接收等,无从得知
针对性较差。推送的信息可能并不能满足客户端的个性化需求。
2.Pull 模式
Pull 模式是客户端主动从服务器端获取信息
Pull 模式的主要优点是
针对性强,能满足客户端的个性化需求
信息传输量较小,网络中传输的知识客户端的请求和服务器端对该请求的响应
服务器端的任务轻。服务器端只是被动接收查询,对客户端的查询请求做出响应
Pull 模式的缺点
实时较差,针对于服务器端实时更新的信息,客户端难以获取实时信息
对于客户端用户的要求较高,需要对服务器端具有一定的了解。
从兼容性角度考虑,短轮询>长轮询>长连接SSE>WebSocket;
从性能方面考虑,WebSocket>长连接SSE>长轮询>短轮询。
当前业务中可能使用到的场景 业务逻辑耗时长 比如当前业务中,目标计算过程比较耗时,不能及时给页面响应结果。
实时获取数据状态变化 比如支付状态、订单状态、机器状态的变更等
Spring 框架对异步的支持 前置了解: Web 容器对 HTTP 请求的处理 当一个 HTTP 请求到达 Tomcat,Tomcat 将会从线程池中取出线程,然后按照如下流程处理请求:
将请求信息解析为 HttpServletRequest
分发到具体 Servlet 处理相应的业务
通过 HttpServletResponse
将响应结果返回给等待客户端
整体流程如下所示:
这是我们日常最常用同步请求模型,所有动作都交给同一个 Tomcat 线程处理,所有动作处理完成,线程才会被释放回线程池。
想象一下如果业务需要较长时间处理,那么这个 Tomcat 线程其实一直在被占用,随着请求越来越多,可用 I/O 线程越来越少,直到被耗尽。这时后续请求只能等待空闲 Tomcat 线程,这将会加长了请求执行时间。
如果客户端不关心返回业务结果,这时我们可以自定义线程池,将请求任务提交给线程池,然后立刻返回。
也可以使用 Spring Async 任务
servlet 3.0 支持异步 异步 Servelt 执行请求流程:
将请求信息解析为 HttpServletRequest
分发到具体 Servlet
处理,将业务提交给自定义业务线程池,请求立刻返回,Tomcat 线程立刻被释放
当业务线程将任务执行结束,将会将结果转交给 Tomcat 线程
通过 HttpServletResponse
将响应结果返回给等待客户端
引入异步 Servelt3 整体流程如下:
使用异步 Servelt,Tomcat 线程仅仅处理请求解析动作,所有耗时较长的业务操作全部交给业务线程池,所以相比同步请求, Tomcat 线程可以处理更多请求。
虽然我们将业务处理交给业务线程池异步处理,但是对于客户端来讲,其还在同步等待响应结果 。
而且异步请求不一定会获得更快响应时间 ,相反可能由于引入了更多线程,增加线程上下文切换时间。
虽然没有降低响应时间,但是通过请求异步化带来其他明显优点 :
可以处理更高并发连接数,提高系统整体吞吐量
请求解析与业务处理完全分离,职责单一
自定义业务线程池,我们可以更容易对其监控,降级等处理
可以根据不同业务,自定义不同线程池,相互隔离,不用互相影响
所以具体使用过程,我们还需要进行的相应的压测,观察响应时间以及吞吐量等其他指标,综合选择。
spring MVC 对 异步的支持 Servlet3 API ,无法使用 SpringMVC 为我们提供的特性,我们需要自己处理响应信息,处理方式相对繁琐。
SpringMVC 3.2 基于 Servelt3 引入异步请求处理方式,我们可以跟使用同步请求一样,方便使用异步请求。
SpringMVC 提供有两种异步方式,只要将 Controller
方法返回值修改下述类即可:
spring MVC 的 DeferredResult 和 callable DeferredResult DeferredResult
是 SpringMVC 3.2 之后引入新的类,只要让请求方法返回 DeferredResult
,就可以快速使用异步请求,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @RequestMapping(value = "/deferredResult", method = RequestMethod.GET) public DeferredResult<ResponseMsg<String>> deferredResult() { DeferredResult<ResponseMsg<String>> deferredResult = new DeferredResult<>(10000L ); deferredResult.onCompletion(() -> { log.info("异步线程处理结束" ); }); deferredResult.onTimeout(() -> { log.info("异步线程超时" ); deferredResult.setErrorResult("timeout error" ); }); deferredResult.onError(throwable -> { log.error("异常" , throwable); deferredResult.setErrorResult("other error" ); }); executorService.submit(() -> { try { deferredResult.setResult(taskService.getResult()); } catch (Exception e) { e.printStackTrace(); deferredResult.setErrorResult("error" ); } }); log.info("servlet 线程处理结束" ); return deferredResult; }
创建 DeferredResult
实例时可以传入特定超时时间。另外我们可以设置默认超时时间:
1 2 # 异步请求超时时间 spring.mvc.async.request-timeout=2000
如果异步程序执行完成,可以调用 DeferredResult#setResult
返回响应结果。此时若有设置 DeferredResult#onCompletion
回调方法,将会触发该回调方法。
同时还有其他异常回调方法:
Callable Spring 另外还提供一种异步请求使用方式,直接使用 JDK Callable
。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 @RequestMapping(value = "/callable",method = RequestMethod.GET) public Callable<ResponseMsg<String>> getResult(){ log.info("接收请求,开始处理..." ); Callable<ResponseMsg<String>> result = (()-> taskService.getResult()); log.info("接收任务线程完成并退出" ); return result; }
默认情况会使用 SimpleAsyncTaskExecutor
执行异步请求,每次调用执行都将会新建线程。由于这种方式不复用线程,生产不推荐 使用这种方式,所以我们需要使用线程池代替。
我们可以使用如下方式自定义线程池:
1 2 3 4 5 6 7 8 @Bean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME) public AsyncTaskExecutor executor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setThreadNamePrefix("test-"); threadPoolTaskExecutor.setCorePoolSize(10); threadPoolTaskExecutor.setMaxPoolSize(20); return threadPoolTaskExecutor; }
注意 Bean 名称一定要是 applicationTaskExecutor
,若不一致, Spring 将不会使用自定义线程池。
或者可以直接使用 SpringBoot 配置文件方式配置代替:
1 2 3 4 5 6 7 # 核心线程数 spring.task.execution.pool.core-size=10 # 最大线程数 spring.task.execution.pool.max-size=20 # 线程名前缀 spring.task.execution.thread-name-prefix=test # 还有另外一些配置,读者们可以自行配置
这种方式异步请求的超时时间只能通过配置文件方式配置。
1 spring.mvc.async.request-timeout=10000
如果需要为单独请求的配置特定的超时时间,我们需要使用 WebAsyncTask
包装 Callable
。
1 2 3 4 5 6 7 8 9 10 11 12 @RequestMapping("/webAsyncTask") public WebAsyncTask<ResponseMsg<String>> webAsyncTask() { log.info("接收请求,开始处理...webAsyncTask"); Callable<ResponseMsg<String>> callable= () -> { ResponseMsg<String> result = taskService.getResult(); log.info("异步方法结束 webAsyncTask"); return result; }; // 单位 ms WebAsyncTask<ResponseMsg<String>> webAsyncTask=new WebAsyncTask<>(10000,callable); return webAsyncTask; }
实现原理探究 Spring MVC 在调用处理器方法后会对返回值做处理,如果发现返回值为异步请求类型,则不会立即响应客户端,而是直接将请求挂起,中断当前请求的处理流程,直到异步任务超时或者被设置了结果值才响应客户端。这样处理请求的线程将被释放,可以用于处理新的请求,从而提高服务端的吞吐量。在这个过程中,异步请求的核心处理流程大致可以分为两步:
请求的异步处理
以下将从 DispatcherServlet#doDispatch()
开始分析。这个方法是请求处理的主干逻辑,主要注意以下几点:
WebAsyncUtils.getAsyncManager()
根据 HttpServletRequest
创建或者从缓存中获取 WebAsyncManager
对象
ha.handle()
进入处理器方法被调用的逻辑
asyncManager.isConcurrentHandlingStarted()
判断是否开始了异步处理,是的话直接 return,不进入响应处理的流程,也就是说异步请求第一次处理在此就中断了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 protected void doDispatch (HttpServletRequest request, HttpServletResponse response) throws Exception { HttpServletRequest processedRequest = request; HandlerExecutionChain mappedHandler = null ; boolean multipartRequestParsed = false ; WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); try { ModelAndView mv = null ; Exception dispatchException = null ; try { processedRequest = checkMultipart(request); multipartRequestParsed = (processedRequest != request); mappedHandler = getHandler(processedRequest); if (mappedHandler == null ) { noHandlerFound(processedRequest, response); return ; } HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler()); String method = request.getMethod(); boolean isGet = HttpMethod.GET.matches(method); if (isGet || HttpMethod.HEAD.matches(method)) { long lastModified = ha.getLastModified(request, mappedHandler.getHandler()); if (new ServletWebRequest(request, response).checkNotModified(lastModified) && isGet) { return ; } } if (!mappedHandler.applyPreHandle(processedRequest, response)) { return ; } mv = ha.handle(processedRequest, response, mappedHandler.getHandler()); if (asyncManager.isConcurrentHandlingStarted()) { return ; } applyDefaultViewName(processedRequest, mv); mappedHandler.applyPostHandle(processedRequest, response, mv); } catch (Exception ex) { dispatchException = ex; } catch (Throwable err) { dispatchException = new NestedServletException("Handler dispatch failed" , err); } processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException); } catch (Exception ex) { triggerAfterCompletion(processedRequest, response, mappedHandler, ex); } catch (Throwable err) { triggerAfterCompletion(processedRequest, response, mappedHandler, new NestedServletException("Handler processing failed" , err)); } finally { if (asyncManager.isConcurrentHandlingStarted()) { if (mappedHandler != null ) { mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response); } } else { if (multipartRequestParsed) { cleanupMultipart(processedRequest); } } } }
WebAsyncUtils#getAsyncManager()
方法如下,可以看到主要逻辑是从 ServletRequest
对象的属性中获取缓存的 WebAsyncManager
对象或者直接创建新的 WebAsyncManager
对象并缓存。此处需要注意,之所以需要缓存WebAsyncManager
对象,是因为异步请求将会被二次分发处理,且后续流程都将用到WebAsyncManager
对象,需要保证请求的相关信息一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static final String WEB_ASYNC_MANAGER_ATTRIBUTE = WebAsyncManager.class.getName() + ".WEB_ASYNC_MANAGER" ; public static WebAsyncManager getAsyncManager (ServletRequest servletRequest) { WebAsyncManager asyncManager = null ; Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); if (asyncManagerAttr instanceof WebAsyncManager) { asyncManager = (WebAsyncManager) asyncManagerAttr; } if (asyncManager == null ) { asyncManager = new WebAsyncManager(); servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager); } return asyncManager; }
回到步骤1 ha.handle()
的调用,此处首先调用到 AbstractHandlerMethodAdapter#handle()
方法,接着调用子类实现 RequestMappingHandlerAdapter#handleInternal()
方法,可以看到核心逻辑其实是执行 RequestMappingHandlerAdapter#invokeHandlerMethod()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override protected ModelAndView handleInternal (HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception { ModelAndView mav; checkRequest(request); if (this .synchronizeOnSession) { HttpSession session = request.getSession(false ); if (session != null ) { Object mutex = WebUtils.getSessionMutex(session); synchronized (mutex) { mav = invokeHandlerMethod(request, response, handlerMethod); } } else { mav = invokeHandlerMethod(request, response, handlerMethod); } } else { mav = invokeHandlerMethod(request, response, handlerMethod); } if (!response.containsHeader(HEADER_CACHE_CONTROL)) { if (getSessionAttributesHandler(handlerMethod).hasSessionAttributes()) { applyCacheSeconds(response, this .cacheSecondsForSessionAttributeHandlers); } else { prepareResponse(response); } } return mav; }
RequestMappingHandlerAdapter#invokeHandlerMethod()
方法非常重要,此处是处理器方法被执行的触发点,主要注意以下几点:
调用RequestMappingHandlerAdapter#createInvocableHandlerMethod()
创建处理器的适配对象 ServletInvocableHandlerMethod
WebAsyncUtils.createAsyncWebRequest()
新建异步请求对象 StandardServletAsyncWebRequest
从 ServletRequest 请求对象的缓存中获取 AsyncManager
对象,并对其进行配置
asyncManager.hasConcurrentResult()
判断异步请求是否已经有结果了,此处逻辑将在异步请求二次分发处理时进入
invocableMethod.invokeAndHandle()
实际调用处理器方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Nullable protected ModelAndView invokeHandlerMethod (HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception { ServletWebRequest webRequest = new ServletWebRequest(request, response); try { WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod); ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory); ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod); if (this .argumentResolvers != null ) { invocableMethod.setHandlerMethodArgumentResolvers(this .argumentResolvers); } if (this .returnValueHandlers != null ) { invocableMethod.setHandlerMethodReturnValueHandlers(this .returnValueHandlers); } invocableMethod.setDataBinderFactory(binderFactory); invocableMethod.setParameterNameDiscoverer(this .parameterNameDiscoverer); ModelAndViewContainer mavContainer = new ModelAndViewContainer(); mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request)); modelFactory.initModel(webRequest, mavContainer, invocableMethod); mavContainer.setIgnoreDefaultModelOnRedirect(this .ignoreDefaultModelOnRedirect); AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response); asyncWebRequest.setTimeout(this .asyncRequestTimeout); WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); asyncManager.setTaskExecutor(this .taskExecutor); asyncManager.setAsyncWebRequest(asyncWebRequest); asyncManager.registerCallableInterceptors(this .callableInterceptors); asyncManager.registerDeferredResultInterceptors(this .deferredResultInterceptors); if (asyncManager.hasConcurrentResult()) { Object result = asyncManager.getConcurrentResult(); mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0 ]; asyncManager.clearConcurrentResult(); LogFormatUtils.traceDebug(logger, traceOn -> { String formatted = LogFormatUtils.formatValue(result, !traceOn); return "Resume with async result [" + formatted + "]" ; }); invocableMethod = invocableMethod.wrapConcurrentResult(result); } invocableMethod.invokeAndHandle(webRequest, mavContainer); if (asyncManager.isConcurrentHandlingStarted()) { return null ; } return getModelAndView(mavContainer, modelFactory, webRequest); } finally { webRequest.requestCompleted(); } }
ServletInvocableHandlerMethod#invokeAndHandle()
的重要处理步骤如下:
ServletInvocableHandlerMethod#invokeForRequest()
反射调用处理器方法,获得方法返回值
this.returnValueHandlers.handleReturnValue()
使用返回值处理器处理返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void invokeAndHandle (ServletWebRequest webRequest, ModelAndViewContainer mavContainer, Object... providedArgs) throws Exception { Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs); setResponseStatus(webRequest); if (returnValue == null ) { if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) { disableContentCachingIfNecessary(webRequest); mavContainer.setRequestHandled(true ); return ; } } else if (StringUtils.hasText(getResponseStatusReason())) { mavContainer.setRequestHandled(true ); return ; } mavContainer.setRequestHandled(false ); Assert.state(this .returnValueHandlers != null , "No return value handlers" ); try { this .returnValueHandlers.handleReturnValue( returnValue, getReturnValueType(returnValue), mavContainer, webRequest); } catch (Exception ex) { if (logger.isTraceEnabled()) { logger.trace(formatErrorForReturnValue(returnValue), ex); } throw ex; } }
this.returnValueHandlers.handleReturnValue()
实际是调用到返回值处理器的聚合类 HandlerMethodReturnValueHandlerComposite#handleReturnValue()
方法,其主要逻辑如下:
调用 selectHandler()
方法从聚合类对象内部的返回值处理器列表中查找到能够处理当前返回值的处理器,判断依据为 HandlerMethodReturnValueHandler#supportsReturnType()
的返回值
handler.handleReturnValue()
调用返回值处理器进行返回值的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public void handleReturnValue (@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { HandlerMethodReturnValueHandler handler = selectHandler(returnValue, returnType); if (handler == null ) { throw new IllegalArgumentException("Unknown return value type: " + returnType.getParameterType().getName()); } handler.handleReturnValue(returnValue, returnType, mavContainer, webRequest); } @Nullable private HandlerMethodReturnValueHandler selectHandler (@Nullable Object value, MethodParameter returnType) { boolean isAsyncValue = isAsyncReturnValue(value, returnType); for (HandlerMethodReturnValueHandler handler : this .returnValueHandlers) { if (isAsyncValue && !(handler instanceof AsyncHandlerMethodReturnValueHandler)) { continue ; } if (handler.supportsReturnType(returnType)) { return handler; } } return null ; }
对于类型为 DeferredResult 的返回值,对应的处理器为 DeferredResultMethodReturnValueHandler
,则DeferredResultMethodReturnValueHandler#handleReturnValue()
方法将被调用,可以看到这里主要的处理步骤如下:
对于类型为 ListenableFuture
或者 CompletionStage
的返回值,需要将其包装为 DeferredResult
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing()
开始进行异步的 DeferredResult
处理,这一步实际调用到 AsyncManager#startDeferredResultProcessing()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 @Override public void handleReturnValue (@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { if (returnValue == null ) { mavContainer.setRequestHandled(true ); return ; } DeferredResult<?> result; if (returnValue instanceof DeferredResult) { result = (DeferredResult<?>) returnValue; } else if (returnValue instanceof ListenableFuture) { result = adaptListenableFuture((ListenableFuture<?>) returnValue); } else if (returnValue instanceof CompletionStage) { result = adaptCompletionStage((CompletionStage<?>) returnValue); } else { throw new IllegalStateException("Unexpected return value type: " + returnValue); } WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); } private DeferredResult<Object> adaptListenableFuture (ListenableFuture<?> future) { DeferredResult<Object> result = new DeferredResult<>(); future.addCallback(new ListenableFutureCallback<Object>() { @Override public void onSuccess (@Nullable Object value) { result.setResult(value); } @Override public void onFailure (Throwable ex) { result.setErrorResult(ex); } }); return result; } private DeferredResult<Object> adaptCompletionStage (CompletionStage<?> future) { DeferredResult<Object> result = new DeferredResult<>(); future.handle((BiFunction<Object, Throwable, Object>) (value, ex) -> { if (ex != null ) { if (ex instanceof CompletionException && ex.getCause() != null ) { ex = ex.getCause(); } result.setErrorResult(ex); } else { result.setResult(value); } return null ; }); return result; }
WebAsyncManager#startDeferredResultProcessing()
方法是 DeferredResult
处理的重点,此处关键步骤如下:
首先将 DeferredResult
设置的超时时间取出来,设置到 AsyncWebRequest
对象中
创建 DeferredResult
的拦截器执行链对象 DeferredResultInterceptorChain
,将 DeferredResult 中设置的各个回调方法保存起来
设置 AsyncWebRequest
对象的各个处理器,包括超时处理器,异常处理器等
调用 WebAsyncManager#startAsyncProcessing()
开始进行请求异步处理,这个步骤在上层比较简短,就是调用this.asyncWebRequest.startAsync()
,实际调用到 StandardServletAsyncWebRequest#startAsync()
方法
deferredResult.setResultHandler()
设置 DeferredResult
的结果处理器,需注意此处传入的是 Lambda 函数
从 第 4 步开始实际进入到了 Tomcat 服务器代码(本文 9 - 16)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public void startDeferredResultProcessing ( final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { Assert.notNull(deferredResult, "DeferredResult must not be null" ); Assert.state(this .asyncWebRequest != null , "AsyncWebRequest must not be null" ); Long timeout = deferredResult.getTimeoutValue(); if (timeout != null ) { this .asyncWebRequest.setTimeout(timeout); } List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>(); interceptors.add(deferredResult.getInterceptor()); interceptors.addAll(this .deferredResultInterceptors.values()); interceptors.add(timeoutDeferredResultInterceptor); final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); this .asyncWebRequest.addTimeoutHandler(() -> { try { interceptorChain.triggerAfterTimeout(this .asyncWebRequest, deferredResult); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } }); this .asyncWebRequest.addErrorHandler(ex -> { if (!this .errorHandlingInProgress) { try { if (!interceptorChain.triggerAfterError(this .asyncWebRequest, deferredResult, ex)) { return ; } deferredResult.setErrorResult(ex); } catch (Throwable interceptorEx) { setConcurrentResultAndDispatch(interceptorEx); } } }); this .asyncWebRequest.addCompletionHandler(() -> interceptorChain.triggerAfterCompletion(this .asyncWebRequest, deferredResult)); interceptorChain.applyBeforeConcurrentHandling(this .asyncWebRequest, deferredResult); startAsyncProcessing(processingContext); try { interceptorChain.applyPreProcess(this .asyncWebRequest, deferredResult); deferredResult.setResultHandler(result -> { result = interceptorChain.applyPostProcess(this .asyncWebRequest, deferredResult, result); setConcurrentResultAndDispatch(result); }); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } } private void startAsyncProcessing (Object[] processingContext) { synchronized (WebAsyncManager.this ) { this .concurrentResult = RESULT_NONE; this .concurrentResultContext = processingContext; this .errorHandlingInProgress = false ; } this .asyncWebRequest.startAsync(); if (logger.isDebugEnabled()) { logger.debug("Started async request" ); } }
StandardServletAsyncWebRequest#startAsync()
方法主要的处理分为两步:
getRequest().startAsync(getRequest(), getResponse())
创建 AsyncContext
对象,此处实际进入到了 Tomcat 服务器代码,最终调用到连接器层的 org.apache.catalina.connector.Request#startAsync()
方法
this.asyncContext.addListener(this)
将当前 StandardServletAsyncWebRequest
对象作为监听器添加到 AsyncContext 对象中,并将上层的超时时间设置到 AsyncContext
内部
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void startAsync () { Assert.state(getRequest().isAsyncSupported(), "Async support must be enabled on a servlet and for all filters involved " + "in async request processing. This is done in Java code using the Servlet API " + "or by adding \"<async-supported>true</async-supported>\" to servlet and " + "filter declarations in web.xml." ); Assert.state(!isAsyncComplete(), "Async processing has already completed" ); if (isAsyncStarted()) { return ; } this .asyncContext = getRequest().startAsync(getRequest(), getResponse()); this .asyncContext.addListener(this ); if (this .timeout != null ) { this .asyncContext.setTimeout(this .timeout); } }
org.apache.catalina.connector.Request#startAsync()
方法简单明了,主要的处理如下:
新建 AsyncContextImpl
对象,并使用该对象调用 AsyncContextImpl#setStarted()
通知服务器底层当前请求为开始异步处理状态
使用AsyncContextImpl
对象调用 AsyncContextImpl#setTimeout()
将连接层默认的超时时间作为异步请求的超时时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public AsyncContext startAsync (ServletRequest request, ServletResponse response) { if (!isAsyncSupported()) { IllegalStateException ise = new IllegalStateException(sm.getString("request.asyncNotSupported" )); log.warn(sm.getString("coyoteRequest.noAsync" , StringUtils.join(getNonAsyncClassNames())), ise); throw ise; } if (asyncContext == null ) { asyncContext = new AsyncContextImpl(this ); } asyncContext.setStarted(getContext(), request, response, request==getRequest() && response==getResponse().getResponse()); asyncContext.setTimeout(getConnector().getAsyncTimeout()); return asyncContext; }
AsyncContextImpl#setStarted()
方法将保存异步请求的上下文,并负责与底层服务器进行交互,重要的处理如下:
this.request.getCoyoteRequest().action()
将当前请求开始异步的状态通知到底层,并将自身作为监听器注册到服务器层,此处的请求对象实际为org.apache.coyote.Request
AsyncContextImpl
作为服务器底层与上层交互的中间层,会将底层请求的状态变化通知到上层,体现在代码中也就是遍历内部的监听器列表,一一回调监听器的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void setStarted (Context context, ServletRequest request, ServletResponse response, boolean originalRequestResponse) { synchronized (asyncContextLock) { this .request.getCoyoteRequest().action( ActionCode.ASYNC_START, this ); this .context = context; context.incrementInProgressAsyncCount(); this .servletRequest = request; this .servletResponse = response; this .hasOriginalRequestAndResponse = originalRequestResponse; this .event = new AsyncEvent(this , request, response); List<AsyncListenerWrapper> listenersCopy = new ArrayList<>(listeners); listeners.clear(); if (log.isDebugEnabled()) { log.debug(sm.getString("asyncContextImpl.fireOnStartAsync" )); } for (AsyncListenerWrapper listener : listenersCopy) { try { listener.fireOnStartAsync(event); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.warn(sm.getString("asyncContextImpl.onStartAsyncError" , listener.getClass().getName()), t); } } } }
org.apache.coyote.Request#action()
方法其实只是负责分发动作事件,最终的处理还是在 AbstractProcessor#action()
中
1 2 3 4 5 6 7 8 9 public void action (ActionCode actionCode, Object param) { if (hook != null ) { if (param == null ) { hook.action(actionCode, this ); } else { hook.action(actionCode, param); } } }
AbstractProcessor#action()
对于 ActionCode.ASYNC_START
的处理如下,可以看到实际是调用了异步状态机的 AsyncStateMachine#asyncStart()
方法
此处需要注意,Tomcat 会根据每一个请求的协议为其创建一个 AbstractProcessor
实现类,这个实现类负责这个请求全部生命周期的处理,这也就是AbstractProcessor
内部保存 AsyncStateMachine
状态机的原因
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public final void action (ActionCode actionCode, Object param) { switch (actionCode) { ...... case ASYNC_START: { asyncStateMachine.asyncStart((AsyncContextCallback) param); break ; } case ASYNC_COMPLETE: { clearDispatches(); if (asyncStateMachine.asyncComplete()) { processSocketEvent(SocketEvent.OPEN_READ, true ); } break ; } case ASYNC_DISPATCH: { if (asyncStateMachine.asyncDispatch()) { processSocketEvent(SocketEvent.OPEN_READ, true ); } break ; } case ASYNC_DISPATCHED: { asyncStateMachine.asyncDispatched(); break ; } case ASYNC_ERROR: { asyncStateMachine.asyncError(); break ; } ...... } }
AsyncStateMachine#asyncStart()
方法实际保存了异步开始时间,并进行状态更新,此处可以看到其将步骤11提到的 AsyncContextImpl
保存了下来,后续将用于回调上层
AsyncStateMachine
这个类实际维护了一个异步请求的状态流转,伴随着这个请求的整个生命周期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 synchronized void asyncStart (AsyncContextCallback asyncCtxt) { if (state == AsyncState.DISPATCHED) { generation.incrementAndGet(); updateState(AsyncState.STARTING); this .asyncCtxt = asyncCtxt; lastAsyncStart = System.currentTimeMillis(); } else { throw new IllegalStateException( sm.getString("asyncStateMachine.invalidAsyncState" , "asyncStart()" , state)); } }
此时回到步骤 9,创建完 AsyncContextImpl
对象,还会为其注册监听器,设置超时时间,方法源码如下:
至此我们可以理出一条异步请求完成时的传播链,自底向上的传播方向为 AsyncStateMatch
–> AsyncContextImpl
–> StandardServletAsyncWebRequest
–> DeferredResult
可以看到超时时间在 AsyncContextImpl
内部保存了一份,也通过 org.apache.coyote.Request
设置到了底层,此处不再赘述方法堆栈,直接看AbstractProcessor#action()
对于 ActionCode.ASYNC_SETTIMEOUT
的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void addListener (AsyncListener listener) { check(); AsyncListenerWrapper wrapper = new AsyncListenerWrapper(); wrapper.setListener(listener); listeners.add(wrapper); } @Override public void setTimeout (long timeout) { check(); this .timeout = timeout; request.getCoyoteRequest().action(ActionCode.ASYNC_SETTIMEOUT, Long.valueOf(timeout)); }
AbstractProcessor#action()
对于 ActionCode.ASYNC_SETTIMEOUT
的处理如下,可以看到实际只是作了保存操作而已
至此超时时间已经设置到底层,那么 Tomcat 是如何做到异步请求超时返回呢?答案如下:
Tomcat 不断轮询 socket,当轮询到可读写的 socket 时会创建 SocketProcessorBase
异步任务扔到线程池中执行 socket 读写任务。上文提到过 AbstractProcessor
实现类负责一个请求的全部生命周期处理,在处理 socket 的过程中,连接处理器 ConnectionHandler
如果发现当前请求是异步请求,则会将负责处理这个请求的 Processor 添加到等待队列中 Tomcat 服务器启动时会开启一个周期线程池,处理等待队列中的 Processor,检查其是否超时。超时则触发处理 Socket 超时事件,最终通过 CoyoteAdapter
适配器将超时回调到上层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public final void action (ActionCode actionCode, Object param) { switch (actionCode) { ...... case ASYNC_SETTIMEOUT: { if (param == null ) { return ; } long timeout = ((Long) param).longValue(); setAsyncTimeout(timeout); break ; } ...... } } public void setAsyncTimeout (long timeout) { asyncTimeout = timeout; }
此时回到步骤 8 第 5 点 ,DeferredResult#setResultHandler()
的处理如下,可以看到这个步骤主要目的是保存 DeferredResultHandler
,此时 DeferredResult
内部还没有有效的结果,所以不会调用 DeferredResultHandler#handleResult()
处理结果,至此请求的异步处理结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public final void setResultHandler (DeferredResultHandler resultHandler) { Assert.notNull(resultHandler, "DeferredResultHandler is required" ); if (this .expired) { return ; } Object resultToHandle; synchronized (this ) { if (this .expired) { return ; } resultToHandle = this .result; if (resultToHandle == RESULT_NONE) { this .resultHandler = resultHandler; return ; } } try { resultHandler.handleResult(resultToHandle); } catch (Throwable ex) { logger.debug("Failed to process async result" , ex); } }
请求的再次分发
当 DeferredResult#setResult()
方法被调用时将会对客户端请求进行响应,请求的再次分发也是由此触发
DeferredResult#setResult()
方法只是个入口,可以看到当设置了结果之后,将调用 DeferredResultHandler#handleResult()
处理结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public boolean setResult (T result) { return setResultInternal(result); } private boolean setResultInternal (Object result) { if (isSetOrExpired()) { return false ; } DeferredResultHandler resultHandlerToUse; synchronized (this ) { if (isSetOrExpired()) { return false ; } this .result = result; resultHandlerToUse = this .resultHandler; if (resultHandlerToUse == null ) { return true ; } this .resultHandler = null ; } resultHandlerToUse.handleResult(result); return true ; }
DeferredResultHandler#handleResult()
的实现其实在 ??节步骤 8 提到过,实际在 WebAsyncManager#startDeferredResultProcessing()
中,如下所示,可以看到核心逻辑是调用 WebAsyncManager#setConcurrentResultAndDispatch()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void startDeferredResultProcessing ( final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { ...... try { interceptorChain.applyPreProcess(this .asyncWebRequest, deferredResult); deferredResult.setResultHandler(result -> { result = interceptorChain.applyPostProcess(this .asyncWebRequest, deferredResult, result); setConcurrentResultAndDispatch(result); }); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } }
WebAsyncManager#setConcurrentResultAndDispatch()
方法的核心处理如下:
将 DeferredResult#setResult()
设置的结果保存在当前 WebAsyncManager
对象中
this.asyncWebRequest.dispatch()
将异步请求再次分发,请求分发的流程与本文相关较小,暂不做分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void setConcurrentResultAndDispatch (Object result) { synchronized (WebAsyncManager.this ) { if (this .concurrentResult != RESULT_NONE) { return ; } this .concurrentResult = result; this .errorHandlingInProgress = (result instanceof Throwable); } if (this .asyncWebRequest.isAsyncComplete()) { if (logger.isDebugEnabled()) { logger.debug("Async result set but request already complete: " + formatRequestUri()); } return ; } if (logger.isDebugEnabled()) { boolean isError = result instanceof Throwable; logger.debug("Async " + (isError ? "error" : "result set" ) + ", dispatch to " + formatRequestUri()); } this .asyncWebRequest.dispatch(); }
再次分发的请求将重新进入 DispatcherServlet#doDispatch()
处理,其流程与2.1节步骤1-4基本相同,只不过这次请求处理将进入2.1节步骤4第4点逻辑,此时 WebAsyncManager
中已经有异步请求的结果了, invocableMethod.wrapConcurrentResult()
将会创建一个新的ConcurrentResultHandlerMethod
对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Nullable protected ModelAndView invokeHandlerMethod (HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception { ...... if (asyncManager.hasConcurrentResult()) { Object result = asyncManager.getConcurrentResult(); mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0 ]; asyncManager.clearConcurrentResult(); LogFormatUtils.traceDebug(logger, traceOn -> { String formatted = LogFormatUtils.formatValue(result, !traceOn); return "Resume with async result [" + formatted + "]" ; }); invocableMethod = invocableMethod.wrapConcurrentResult(result); } invocableMethod.invokeAndHandle(webRequest, mavContainer); if (asyncManager.isConcurrentHandlingStarted()) { return null ; } return getModelAndView(mavContainer, modelFactory, webRequest); } finally { webRequest.requestCompleted(); } }
ConcurrentResultHandlerMethod
的构造方法如下,可以看到这个对象把异步请求的结果包装到一个 Callable
对象中,并设置了其反射调用的 Method 为 call
,这一步其实就是把原本应该反射调用的处理器方法丢弃了,直接返回异步请求已经设置好的返回结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public ConcurrentResultHandlerMethod (final Object result, ConcurrentResultMethodParameter returnType) { super ((Callable<Object>) () -> { if (result instanceof Exception) { throw (Exception) result; } else if (result instanceof Throwable) { throw new NestedServletException("Async processing failed" , (Throwable) result); } return result; }, CALLABLE_METHOD); if (ServletInvocableHandlerMethod.this .returnValueHandlers != null ) { setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this .returnValueHandlers); } this .returnType = returnType; }
继续本节步骤4流程,此时 invocableMethod.invokeAndHandle()
依然将调用到 ConcurrentResultHandlerMethod
父类ServletInvocableHandlerMethod#invokeAndHandle()
方法,此处的处理是异步请求响应的最后一步
需注意 ConcurrentResultHandlerMethod
内部保存的反射所需的 Method 对象已经变成了 Callable
的 call
方法,则此处invokeForRequest()
反射调用实际调用到 Callable#call()
方法,返回值已经是 DeferredResult
实际应该返回的数据了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void invokeAndHandle (ServletWebRequest webRequest, ModelAndViewContainer mavContainer, Object... providedArgs) throws Exception { Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs); setResponseStatus(webRequest); if (returnValue == null ) { if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) { disableContentCachingIfNecessary(webRequest); mavContainer.setRequestHandled(true ); return ; } } else if (StringUtils.hasText(getResponseStatusReason())) { mavContainer.setRequestHandled(true ); return ; } mavContainer.setRequestHandled(false ); Assert.state(this .returnValueHandlers != null , "No return value handlers" ); try { this .returnValueHandlers.handleReturnValue( returnValue, getReturnValueType(returnValue), mavContainer, webRequest); } catch (Exception ex) { if (logger.isTraceEnabled()) { logger.trace(formatErrorForReturnValue(returnValue), ex); } throw ex; } }
对于普通类型的返回值,this.returnValueHandlers.handleReturnValue()
处理返回值时会选择对应的返回值处理器,而不会再次由 DeferredResultMethodReturnValueHandler
处理,至此,异步请求的结果将顺利响应给客户端,异步请求结束
总结 经过以上学习,现简答一下开文的问题:
什么是长轮询?除了长轮询还有什么方式?如何选择?
答:它是实现服务端和客户端即时数据传输的一种主要方式,其他还有轮询、长连接、WebSocket 等。
至于如何选择,需要结合自身业务场景和各个方式的特点进行判断
什么是 DeferredResult?如何使用?
答:DeferredResult
是 SpringMVC 3.2 之后引入新的类,只要让请求方法返回 DeferredResult
,就可以快速使用异步请求
DeferredResult 的实现原理是什么?
答:Spring MVC 在调用 Controller 方法后会对返回值做处理,如果发现返回值为异步请求类型(DeferredResult/Callable),则不会立即响应客户端,而是异步执行方法,并将请求挂起,中断当前请求的处理流程,直到异步任务超时或者被设置了结果值(DeferredResult 调用setResult 方法,callable执行完成自动发起)时,将该请求再次分发并且直接用执行结果作为返回值响应客户端。这样处理请求的线程将被释放,可以用于处理新的请求,从而提高服务端的吞吐量。
在实际工作中什么时候会用到?
答:
需要实时获取数据状态变化
业务执行时间较长,长时间占用服务器连接资源
在众多技术方案中没有”银弹“,总是需要根据自身业务灵活判断,而众多优秀的开源案例为我们提供了非常好的参考案例。并且这些案例都是经过大量生产实践后沉淀下来的宝贵经验。一些细微处的设计通常不会立刻应用在自身工作中,但是长期积累总会有用武之地。
参考资料 消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?
servlet 3异步请求
消息队列中的推拉模式
异步请求 DeferredResult 的原理