Spring MVC 异步请求处理 DeferredResult 的学习

导语

​ 之前在学习 Apollo 配置中心的时候,发现 Apollo 通过 Spring MVC DeferredResult 实现长轮询服务推送。当时仅仅简单了解了一下,如今决定深入学习一下 Spring MVC 如何实现的 DeferredResult,期望可以解决以下问题:

  1. 什么是长轮询?除了长轮询还有什么方式?如何选择?
  2. 什么是 DeferredResult?如何使用?
  3. DeferredResult 的实现原理是什么?
  4. 在实际工作中什么时候会用到?

背景

什么是 DeferredResult

DeferredResult 字面意思就是推迟结果,是在 Servlet 3.0 以后引入了异步请求之后,在 Spring 3.2 版本封装了一下支持了 Servlet 这个异步请求。DeferredResult 可以允许容器中的线程快速释放以便可以接受更多的请求提升吞吐量,让真正的业务逻辑在其他的工作线程中去完成。

DeferredResult 的使用场景

服务端和客户端数据传输的方式:推模型和拉模型

服务端和客户端即时数据传输主要有四种方式,它们分别是轮询、长轮询、长连接、WebSocket。它们大体可以分为两类,一种是在 HTTP 基础上实现的,包括轮询、长轮询和长连接;另一种不是在HTTP基础上实现是,即 WebSocket。

推模型-长连接

​ 客户端与服务端建立 TCP 长连接,当服务端配置数据有变动,立刻通过建立的长连接将数据推送给客户端。

优势
  • 长链接的优点是实时性,一旦数据变动,立即推送变更数据给客户端,而且对于客户端而言,这种方式更为简单,只建立连接接收数据,并不需要关心是否有数据变更这类逻辑的处理。
弊端
  • 长连接可能会因为网络问题,导致不可用,也就是俗称的假死。连接状态正常,但实际上已无法通信,所以要有的心跳机制 KeepAlive 来保证连接的可用性,才可以保证配置数据的成功推送。
  • 服务端需要维护数据状态及失败重试机制
  • 服务端难以平衡客户端的消费速率
案例:
  • Dubbo 当服务提供者地址列表有变更,之策中心将基于长连接推送变更数据给消费者
  • Eureka 集群节点之间互相通讯,如果心跳机制检测到 client 节点有变更,在修改本地注册表后想集群中其他 server 节点发起数据同步

推模型-Web Socket

​ WebSocket 是 Html5 定义的一个新协议,与传统的 http 协议不同,该协议可以实现服务器与客户端之间全双工通信。简单来说,首先需要在客户端和服务器端建立起一个连接,这部分需要 http。连接一旦建立,客户端和服务器端就处于平等的地位,可以相互发送数据,不存在请求和响应的区别。

优势
  • 实现了双向通信
弊端
  • 服务器端的逻辑非常复杂
  • 相较于 HTTP 协议兼容性较低

拉模型-轮询

​ 客户端主动的向服务端发请求拉取数据,常见的方式就是轮询,比如每3s向服务端请求一次配置数据。

优势
  • 实现简单,客户端根据自身需求及能力拉取数据
弊端
  • 拉取频率和时效性难以平衡,导致时效性较差
  • 效率低下,无效请求非常多
  • 频繁创建连接

拉模型-长轮询

​ 客户端发起请求后,服务端不会立即返回请求结果,而是将请求挂起等待一段时间,如果此段时间内服务端数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间后响应请求,客户端重新发起长链接。

长轮询是由服务端控制响应客户端请求的返回时间,来减少客户端无效请求的一种优化手段。其实对于客户端来说与短轮询的使用并没有本质上的区别。

优势
  • 长轮询和短轮询比起来,明显减少了很多不必要的 http 请求次数,相比之下节约了资源。
弊端
  • 服务端将请求挂起仍然会浪费一定资源
案例:
  • 消息队列 RocketMQ 和 Kafka 都选择了拉模式(指的是 Comsumer 和 Broker 之间的交互。)

    详见:消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?

  • Eureka 注册中心客户端 Eureka Client 会拉取、更新和缓存 Eureka Server 中的信息。因此当所有的 Eureka Server 节点都宕掉,服务消费者依然可以使用缓存中的信息找到服务提供者,但是当服务有更改的时候会出现信息不一致。

  • Apollo 配置中心的发布配置推送变更消息

    Apollo 客户端会循环的向服务端发送长轮训 Http 请求,超时时间 60 秒 。当超时后返回客户端一个 Http Status 为 304 状态码的时候表明配置没有变更,客户端继续这个步骤重复发起请求。当有发布配置的时候,服务端会调用 DeferredResult.setResult 返回 200 状态码,然后轮训请求会立即返回(不会超时),客户端收到服务端的响应结果后,会发起向 Apollo 服务端请求获取变更后的配置信息。

如何选择推还是拉?

推拉模式的对比

具体的比较
1.Push 模式

  • 推模式是服务器端根据用户需要,由目的、按时将用户感兴趣的信息主动发送到用户的客户端
  • Push 模式的主要优点是
    • 对用户要求低,方便用户获取需要的信息
    • 及时性好,服务器端即使地向客户端推送更行的动态信息
  • Push 模式的缺点
    • 不能确保发送成功。Push模式采用广播方式,只有服务器端和客户端在同一个频道上,推模式才有效,用户才能接收到信息
    • 没有信息状态跟踪。Push模式采用开环控制技术,一个信息推送后的状态,比如客户端是否接收等,无从得知
    • 针对性较差。推送的信息可能并不能满足客户端的个性化需求。

2.Pull 模式

  • Pull 模式是客户端主动从服务器端获取信息

  • Pull 模式的主要优点是

    • 针对性强,能满足客户端的个性化需求
    • 信息传输量较小,网络中传输的知识客户端的请求和服务器端对该请求的响应
    • 服务器端的任务轻。服务器端只是被动接收查询,对客户端的查询请求做出响应
  • Pull 模式的缺点

    • 实时较差,针对于服务器端实时更新的信息,客户端难以获取实时信息
    • 对于客户端用户的要求较高,需要对服务器端具有一定的了解。

从兼容性角度考虑,短轮询>长轮询>长连接SSE>WebSocket;

从性能方面考虑,WebSocket>长连接SSE>长轮询>短轮询。

当前业务中可能使用到的场景

业务逻辑耗时长

比如当前业务中,目标计算过程比较耗时,不能及时给页面响应结果。

实时获取数据状态变化

比如支付状态、订单状态、机器状态的变更等

Spring 框架对异步的支持

前置了解:

Web 容器对 HTTP 请求的处理

当一个 HTTP 请求到达 Tomcat,Tomcat 将会从线程池中取出线程,然后按照如下流程处理请求:

  • 将请求信息解析为 HttpServletRequest
  • 分发到具体 Servlet 处理相应的业务
  • 通过 HttpServletResponse 将响应结果返回给等待客户端

整体流程如下所示:

这是我们日常最常用同步请求模型,所有动作都交给同一个 Tomcat 线程处理,所有动作处理完成,线程才会被释放回线程池。

想象一下如果业务需要较长时间处理,那么这个 Tomcat 线程其实一直在被占用,随着请求越来越多,可用 I/O 线程越来越少,直到被耗尽。这时后续请求只能等待空闲 Tomcat 线程,这将会加长了请求执行时间。

如果客户端不关心返回业务结果,这时我们可以自定义线程池,将请求任务提交给线程池,然后立刻返回。

也可以使用 Spring Async 任务

servlet 3.0 支持异步

异步 Servelt 执行请求流程:

  • 将请求信息解析为 HttpServletRequest
  • 分发到具体 Servlet 处理,将业务提交给自定义业务线程池,请求立刻返回,Tomcat 线程立刻被释放
  • 当业务线程将任务执行结束,将会将结果转交给 Tomcat 线程
  • 通过 HttpServletResponse 将响应结果返回给等待客户端

引入异步 Servelt3 整体流程如下:

使用异步 Servelt,Tomcat 线程仅仅处理请求解析动作,所有耗时较长的业务操作全部交给业务线程池,所以相比同步请求, Tomcat 线程可以处理更多请求。

虽然我们将业务处理交给业务线程池异步处理,但是对于客户端来讲,其还在同步等待响应结果

而且异步请求不一定会获得更快响应时间,相反可能由于引入了更多线程,增加线程上下文切换时间。

虽然没有降低响应时间,但是通过请求异步化带来其他明显优点

  • 可以处理更高并发连接数,提高系统整体吞吐量
  • 请求解析与业务处理完全分离,职责单一
  • 自定义业务线程池,我们可以更容易对其监控,降级等处理
  • 可以根据不同业务,自定义不同线程池,相互隔离,不用互相影响

所以具体使用过程,我们还需要进行的相应的压测,观察响应时间以及吞吐量等其他指标,综合选择。

spring MVC 对 异步的支持

Servlet3 API ,无法使用 SpringMVC 为我们提供的特性,我们需要自己处理响应信息,处理方式相对繁琐。

SpringMVC 3.2 基于 Servelt3 引入异步请求处理方式,我们可以跟使用同步请求一样,方便使用异步请求。

SpringMVC 提供有两种异步方式,只要将 Controller 方法返回值修改下述类即可:

  • DeferredResult
  • Callable
spring MVC 的 DeferredResult 和 callable
DeferredResult

DeferredResult 是 SpringMVC 3.2 之后引入新的类,只要让请求方法返回 DeferredResult,就可以快速使用异步请求,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@RequestMapping(value = "/deferredResult", method = RequestMethod.GET)
public DeferredResult<ResponseMsg<String>> deferredResult() {
// 设置超时时间
DeferredResult<ResponseMsg<String>> deferredResult = new DeferredResult<>(10000L);
// 异步线程处理结束,将会执行该回调方法
deferredResult.onCompletion(() -> {
log.info("异步线程处理结束");
});
// 如果异步线程执行时间超过设置超时时间,将会执行该回调方法
deferredResult.onTimeout(() -> {
log.info("异步线程超时");
// 设置返回结果
deferredResult.setErrorResult("timeout error");
});
deferredResult.onError(throwable -> {
log.error("异常", throwable);
// 设置返回结果
deferredResult.setErrorResult("other error");
});
executorService.submit(() -> {
try {
deferredResult.setResult(taskService.getResult());
// 设置返回结果
} catch (Exception e) {
e.printStackTrace();
// 若异步方法内部异常
deferredResult.setErrorResult("error");
}
});
log.info("servlet 线程处理结束");
return deferredResult;
}

创建 DeferredResult 实例时可以传入特定超时时间。另外我们可以设置默认超时时间:

1
2
# 异步请求超时时间
spring.mvc.async.request-timeout=2000

如果异步程序执行完成,可以调用 DeferredResult#setResult返回响应结果。此时若有设置 DeferredResult#onCompletion 回调方法,将会触发该回调方法。

同时还有其他异常回调方法:

  • DeferredResult#onTimeout 异步线程执行超时回调

  • DeferredResult#onError 异步线程异常回调

Callable

Spring 另外还提供一种异步请求使用方式,直接使用 JDK Callable。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12

@RequestMapping(value = "/callable",method = RequestMethod.GET)
public Callable<ResponseMsg<String>> getResult(){

log.info("接收请求,开始处理...");

Callable<ResponseMsg<String>> result = (()-> taskService.getResult());

log.info("接收任务线程完成并退出");

return result;
}

默认情况会使用 SimpleAsyncTaskExecutor 执行异步请求,每次调用执行都将会新建线程。由于这种方式不复用线程,生产不推荐使用这种方式,所以我们需要使用线程池代替。

我们可以使用如下方式自定义线程池:

1
2
3
4
5
6
7
8
@Bean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)
public AsyncTaskExecutor executor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("test-");
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(20);
return threadPoolTaskExecutor;
}

注意 Bean 名称一定要是 applicationTaskExecutor,若不一致, Spring 将不会使用自定义线程池。

或者可以直接使用 SpringBoot 配置文件方式配置代替:

1
2
3
4
5
6
7
# 核心线程数
spring.task.execution.pool.core-size=10
# 最大线程数
spring.task.execution.pool.max-size=20
# 线程名前缀
spring.task.execution.thread-name-prefix=test
# 还有另外一些配置,读者们可以自行配置

这种方式异步请求的超时时间只能通过配置文件方式配置。

1
spring.mvc.async.request-timeout=10000

如果需要为单独请求的配置特定的超时时间,我们需要使用 WebAsyncTask 包装 Callable

1
2
3
4
5
6
7
8
9
10
11
12
@RequestMapping("/webAsyncTask")
public WebAsyncTask<ResponseMsg<String>> webAsyncTask() {
log.info("接收请求,开始处理...webAsyncTask");
Callable<ResponseMsg<String>> callable= () -> {
ResponseMsg<String> result = taskService.getResult();
log.info("异步方法结束 webAsyncTask");
return result;
};
// 单位 ms
WebAsyncTask<ResponseMsg<String>> webAsyncTask=new WebAsyncTask<>(10000,callable);
return webAsyncTask;
}

实现原理探究

​ Spring MVC 在调用处理器方法后会对返回值做处理,如果发现返回值为异步请求类型,则不会立即响应客户端,而是直接将请求挂起,中断当前请求的处理流程,直到异步任务超时或者被设置了结果值才响应客户端。这样处理请求的线程将被释放,可以用于处理新的请求,从而提高服务端的吞吐量。在这个过程中,异步请求的核心处理流程大致可以分为两步:

  • 请求的异步处理
  • 请求的再次分发

请求的异步处理

  1. 以下将从 DispatcherServlet#doDispatch() 开始分析。这个方法是请求处理的主干逻辑,主要注意以下几点:
    1. WebAsyncUtils.getAsyncManager() 根据 HttpServletRequest 创建或者从缓存中获取 WebAsyncManager 对象
    2. ha.handle() 进入处理器方法被调用的逻辑
    3. asyncManager.isConcurrentHandlingStarted() 判断是否开始了异步处理,是的话直接 return,不进入响应处理的流程,也就是说异步请求第一次处理在此就中断了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
HttpServletRequest processedRequest = request;
HandlerExecutionChain mappedHandler = null;
boolean multipartRequestParsed = false;
//根据 HttpServletRequest 创建或者从缓存中获取 WebAsyncManager 对象
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);

try {
ModelAndView mv = null;
Exception dispatchException = null;

try {
processedRequest = checkMultipart(request);
multipartRequestParsed = (processedRequest != request);

// Determine handler for the current request.
mappedHandler = getHandler(processedRequest);
if (mappedHandler == null) {
noHandlerFound(processedRequest, response);
return;
}

// Determine handler adapter for the current request.
HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());

// Process last-modified header, if supported by the handler.
String method = request.getMethod();
boolean isGet = HttpMethod.GET.matches(method);
if (isGet || HttpMethod.HEAD.matches(method)) {
long lastModified = ha.getLastModified(request, mappedHandler.getHandler());
if (new ServletWebRequest(request, response).checkNotModified(lastModified) && isGet) {
return;
}
}

if (!mappedHandler.applyPreHandle(processedRequest, response)) {
return;
}

// Actually invoke the handler.进入处理器方法被调用的逻辑
mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
//判断是否开始了异步处理,是的话直接 return,不进入响应处理的流程,也就是说异步请求第一次处理在此就中断了
if (asyncManager.isConcurrentHandlingStarted()) {
return;
}

applyDefaultViewName(processedRequest, mv);
mappedHandler.applyPostHandle(processedRequest, response, mv);
}
catch (Exception ex) {
dispatchException = ex;
}
catch (Throwable err) {
// As of 4.3, we're processing Errors thrown from handler methods as well,
// making them available for @ExceptionHandler methods and other scenarios.
dispatchException = new NestedServletException("Handler dispatch failed", err);
}
processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException);
}
catch (Exception ex) {
triggerAfterCompletion(processedRequest, response, mappedHandler, ex);
}
catch (Throwable err) {
triggerAfterCompletion(processedRequest, response, mappedHandler,
new NestedServletException("Handler processing failed", err));
}
finally {
if (asyncManager.isConcurrentHandlingStarted()) {
// Instead of postHandle and afterCompletion
if (mappedHandler != null) {
mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response);
}
}
else {
// Clean up any resources used by a multipart request.
if (multipartRequestParsed) {
cleanupMultipart(processedRequest);
}
}
}
}
  1. WebAsyncUtils#getAsyncManager()方法如下,可以看到主要逻辑是从 ServletRequest 对象的属性中获取缓存的 WebAsyncManager 对象或者直接创建新的 WebAsyncManager 对象并缓存。此处需要注意,之所以需要缓存WebAsyncManager 对象,是因为异步请求将会被二次分发处理,且后续流程都将用到WebAsyncManager 对象,需要保证请求的相关信息一致
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* The name attribute containing the {@link WebAsyncManager}.
*/
public static final String WEB_ASYNC_MANAGER_ATTRIBUTE =
WebAsyncManager.class.getName() + ".WEB_ASYNC_MANAGER";


/**
* Obtain the {@link WebAsyncManager} for the current request, or if not
* found, create and associate it with the request.
*/
public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) {
WebAsyncManager asyncManager = null;
Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
if (asyncManagerAttr instanceof WebAsyncManager) {
asyncManager = (WebAsyncManager) asyncManagerAttr;
}
if (asyncManager == null) {
asyncManager = new WebAsyncManager();
servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager);
}
return asyncManager;
}
  1. 回到步骤1 ha.handle() 的调用,此处首先调用到 AbstractHandlerMethodAdapter#handle()方法,接着调用子类实现 RequestMappingHandlerAdapter#handleInternal() 方法,可以看到核心逻辑其实是执行 RequestMappingHandlerAdapter#invokeHandlerMethod() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
protected ModelAndView handleInternal(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

ModelAndView mav;
checkRequest(request);

// Execute invokeHandlerMethod in synchronized block if required.
if (this.synchronizeOnSession) {
HttpSession session = request.getSession(false);
if (session != null) {
Object mutex = WebUtils.getSessionMutex(session);
synchronized (mutex) {
mav = invokeHandlerMethod(request, response, handlerMethod);
}
}
else {
// No HttpSession available -> no mutex necessary
mav = invokeHandlerMethod(request, response, handlerMethod);
}
}
else {
// No synchronization on session demanded at all...
mav = invokeHandlerMethod(request, response, handlerMethod);
}

if (!response.containsHeader(HEADER_CACHE_CONTROL)) {
if (getSessionAttributesHandler(handlerMethod).hasSessionAttributes()) {
applyCacheSeconds(response, this.cacheSecondsForSessionAttributeHandlers);
}
else {
prepareResponse(response);
}
}

return mav;
}
  1. RequestMappingHandlerAdapter#invokeHandlerMethod() 方法非常重要,此处是处理器方法被执行的触发点,主要注意以下几点:
    1. 调用RequestMappingHandlerAdapter#createInvocableHandlerMethod() 创建处理器的适配对象 ServletInvocableHandlerMethod
    2. WebAsyncUtils.createAsyncWebRequest() 新建异步请求对象 StandardServletAsyncWebRequest
    3. 从 ServletRequest 请求对象的缓存中获取 AsyncManager 对象,并对其进行配置
    4. asyncManager.hasConcurrentResult() 判断异步请求是否已经有结果了,此处逻辑将在异步请求二次分发处理时进入
    5. invocableMethod.invokeAndHandle() 实际调用处理器方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Invoke the {@link RequestMapping} handler method preparing a {@link ModelAndView}
* if view resolution is required.
* @since 4.2
* @see #createInvocableHandlerMethod(HandlerMethod)
*/
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);

//创建处理器的适配对象 ServletInvocableHandlerMethod
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);

ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);

//新建异步请求对象 StandardServletAsyncWebRequest
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);

//(从 ServletRequest 请求对象的缓存中获取 AsyncManager 对象,并对其进行配置)
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

//判断异步请求是否已经有结果了,此处逻辑将在异步请求二次分发处理时进入,暂且不表
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}

//实际调用处理器方法
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}

return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
  1. ServletInvocableHandlerMethod#invokeAndHandle()的重要处理步骤如下:
    1. ServletInvocableHandlerMethod#invokeForRequest()反射调用处理器方法,获得方法返回值
    2. this.returnValueHandlers.handleReturnValue() 使用返回值处理器处理返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Invoke the method and handle the return value through one of the
* configured {@link HandlerMethodReturnValueHandler HandlerMethodReturnValueHandlers}.
* @param webRequest the current request
* @param mavContainer the ModelAndViewContainer for this request
* @param providedArgs "given" arguments matched by type (not resolved)
*/
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {

//反射调用处理器方法,获得方法返回值
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
setResponseStatus(webRequest);

if (returnValue == null) {
if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
}
else if (StringUtils.hasText(getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}

mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {//使用返回值处理器处理返回值
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
}
catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}
  1. this.returnValueHandlers.handleReturnValue() 实际是调用到返回值处理器的聚合类 HandlerMethodReturnValueHandlerComposite#handleReturnValue() 方法,其主要逻辑如下:
    1. 调用 selectHandler() 方法从聚合类对象内部的返回值处理器列表中查找到能够处理当前返回值的处理器,判断依据为 HandlerMethodReturnValueHandler#supportsReturnType() 的返回值
    2. handler.handleReturnValue() 调用返回值处理器进行返回值的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Iterate over registered {@link HandlerMethodReturnValueHandler HandlerMethodReturnValueHandlers} and invoke the one that supports it.
* @throws IllegalStateException if no suitable {@link HandlerMethodReturnValueHandler} is found.
*/
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

//调用 selectHandler() 方法从聚合类对象内部的返回值处理器列表中查找到能够处理当前返回值的处理器,
// 判断依据为 HandlerMethodReturnValueHandler#supportsReturnType() 的返回值
HandlerMethodReturnValueHandler handler = selectHandler(returnValue, returnType);
if (handler == null) {
throw new IllegalArgumentException("Unknown return value type: " + returnType.getParameterType().getName());
}
//调用返回值处理器进行返回值的处理
handler.handleReturnValue(returnValue, returnType, mavContainer, webRequest);
}

@Nullable
private HandlerMethodReturnValueHandler selectHandler(@Nullable Object value, MethodParameter returnType) {
boolean isAsyncValue = isAsyncReturnValue(value, returnType);
for (HandlerMethodReturnValueHandler handler : this.returnValueHandlers) {
if (isAsyncValue && !(handler instanceof AsyncHandlerMethodReturnValueHandler)) {
continue;
}
if (handler.supportsReturnType(returnType)) {
return handler;
}
}
return null;
}
  1. 对于类型为 DeferredResult 的返回值,对应的处理器为 DeferredResultMethodReturnValueHandler,则DeferredResultMethodReturnValueHandler#handleReturnValue() 方法将被调用,可以看到这里主要的处理步骤如下:
    1. 对于类型为 ListenableFuture 或者 CompletionStage 的返回值,需要将其包装为 DeferredResult
    2. WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing() 开始进行异步的 DeferredResult 处理,这一步实际调用到 AsyncManager#startDeferredResultProcessing() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}

DeferredResult<?> result;

if (returnValue instanceof DeferredResult) {
result = (DeferredResult<?>) returnValue;
}
//包装为 DeferredResult
else if (returnValue instanceof ListenableFuture) {
result = adaptListenableFuture((ListenableFuture<?>) returnValue);
}
//包装为 DeferredResult
else if (returnValue instanceof CompletionStage) {
result = adaptCompletionStage((CompletionStage<?>) returnValue);
}
else {
// Should not happen...
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
//开始进行异步的 DeferredResult 处理
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}

private DeferredResult<Object> adaptListenableFuture(ListenableFuture<?> future) {
DeferredResult<Object> result = new DeferredResult<>();
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object value) {
result.setResult(value);
}
@Override
public void onFailure(Throwable ex) {
result.setErrorResult(ex);
}
});
return result;
}

private DeferredResult<Object> adaptCompletionStage(CompletionStage<?> future) {
DeferredResult<Object> result = new DeferredResult<>();
future.handle((BiFunction<Object, Throwable, Object>) (value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
}
result.setErrorResult(ex);
}
else {
result.setResult(value);
}
return null;
});
return result;
}
  1. WebAsyncManager#startDeferredResultProcessing() 方法是 DeferredResult 处理的重点,此处关键步骤如下:

    1. 首先将 DeferredResult 设置的超时时间取出来,设置到 AsyncWebRequest 对象中
    2. 创建 DeferredResult 的拦截器执行链对象 DeferredResultInterceptorChain,将 DeferredResult 中设置的各个回调方法保存起来
    3. 设置 AsyncWebRequest 对象的各个处理器,包括超时处理器,异常处理器等
    4. 调用 WebAsyncManager#startAsyncProcessing() 开始进行请求异步处理,这个步骤在上层比较简短,就是调用this.asyncWebRequest.startAsync(),实际调用到 StandardServletAsyncWebRequest#startAsync() 方法
    5. deferredResult.setResultHandler() 设置 DeferredResult 的结果处理器,需注意此处传入的是 Lambda 函数

    从 第 4 步开始实际进入到了 Tomcat 服务器代码(本文 9 - 16)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
//首先将 DeferredResult 设置的超时时间取出来,设置到 AsyncWebRequest 对象中
Long timeout = deferredResult.getTimeoutValue();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}

List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(deferredResult.getInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
//创建 DeferredResult 的拦截器执行链对象 DeferredResultInterceptorChain,将 DeferredResult 中设置的各个回调方法保存起来
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
//设置 AsyncWebRequest 对象的各个处理器,包括超时处理器,异常处理器等
this.asyncWebRequest.addTimeoutHandler(() -> {
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
});

this.asyncWebRequest.addErrorHandler(ex -> {
if (!this.errorHandlingInProgress) {
try {
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
return;
}
deferredResult.setErrorResult(ex);
}
catch (Throwable interceptorEx) {
setConcurrentResultAndDispatch(interceptorEx);
}
}
});

this.asyncWebRequest.addCompletionHandler(()
-> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));

interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);

//调用 WebAsyncManager#startAsyncProcessing() 开始进行请求异步处理
startAsyncProcessing(processingContext);

try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
//设置 DeferredResult 的结果处理器,需注意此处传入的是 Lambda 函数
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}

private void startAsyncProcessing(Object[] processingContext) {
synchronized (WebAsyncManager.this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = processingContext;
this.errorHandlingInProgress = false;
}
this.asyncWebRequest.startAsync();

if (logger.isDebugEnabled()) {
logger.debug("Started async request");
}
}
  1. StandardServletAsyncWebRequest#startAsync() 方法主要的处理分为两步:
    1. getRequest().startAsync(getRequest(), getResponse()) 创建 AsyncContext 对象,此处实际进入到了 Tomcat 服务器代码,最终调用到连接器层的 org.apache.catalina.connector.Request#startAsync() 方法
    2. this.asyncContext.addListener(this) 将当前 StandardServletAsyncWebRequest 对象作为监听器添加到 AsyncContext 对象中,并将上层的超时时间设置到 AsyncContext 内部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
"in async request processing. This is done in Java code using the Servlet API " +
"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
"filter declarations in web.xml.");
Assert.state(!isAsyncComplete(), "Async processing has already completed");

if (isAsyncStarted()) {
return;
}
//创建 AsyncContext 对象,此处实际进入到了 Tomcat 服务器代码,
// 最终调用到连接器层的 org.apache.catalina.connector.Request#startAsync() 方法
this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
//将当前 StandardServletAsyncWebRequest 对象作为监听器添加到 AsyncContext 对象中,
// 并将上层的超时时间设置到 AsyncContext 内部
this.asyncContext.addListener(this);
if (this.timeout != null) {
this.asyncContext.setTimeout(this.timeout);
}
}
  1. org.apache.catalina.connector.Request#startAsync() 方法简单明了,主要的处理如下:
    1. 新建 AsyncContextImpl 对象,并使用该对象调用 AsyncContextImpl#setStarted() 通知服务器底层当前请求为开始异步处理状态
    2. 使用AsyncContextImpl 对象调用 AsyncContextImpl#setTimeout() 将连接层默认的超时时间作为异步请求的超时时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
IllegalStateException ise =
new IllegalStateException(sm.getString("request.asyncNotSupported"));
log.warn(sm.getString("coyoteRequest.noAsync",
StringUtils.join(getNonAsyncClassNames())), ise);
throw ise;
}

if (asyncContext == null) {
asyncContext = new AsyncContextImpl(this);
}

asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
asyncContext.setTimeout(getConnector().getAsyncTimeout());

return asyncContext;
}
  1. AsyncContextImpl#setStarted()方法将保存异步请求的上下文,并负责与底层服务器进行交互,重要的处理如下:
    1. this.request.getCoyoteRequest().action() 将当前请求开始异步的状态通知到底层,并将自身作为监听器注册到服务器层,此处的请求对象实际为org.apache.coyote.Request
    2. AsyncContextImpl 作为服务器底层与上层交互的中间层,会将底层请求的状态变化通知到上层,体现在代码中也就是遍历内部的监听器列表,一一回调监听器的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void setStarted(Context context, ServletRequest request,
ServletResponse response, boolean originalRequestResponse) {

synchronized (asyncContextLock) {
this.request.getCoyoteRequest().action(
ActionCode.ASYNC_START, this);

this.context = context;
context.incrementInProgressAsyncCount();
this.servletRequest = request;
this.servletResponse = response;
this.hasOriginalRequestAndResponse = originalRequestResponse;
this.event = new AsyncEvent(this, request, response);

List<AsyncListenerWrapper> listenersCopy = new ArrayList<>(listeners);
listeners.clear();
if (log.isDebugEnabled()) {
log.debug(sm.getString("asyncContextImpl.fireOnStartAsync"));
}
for (AsyncListenerWrapper listener : listenersCopy) {
try {
listener.fireOnStartAsync(event);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.warn(sm.getString("asyncContextImpl.onStartAsyncError",
listener.getClass().getName()), t);
}
}
}
}
  1. org.apache.coyote.Request#action() 方法其实只是负责分发动作事件,最终的处理还是在 AbstractProcessor#action()
1
2
3
4
5
6
7
8
9
public void action(ActionCode actionCode, Object param) {
if (hook != null) {
if (param == null) {
hook.action(actionCode, this);
} else {
hook.action(actionCode, param);
}
}
}
  1. AbstractProcessor#action() 对于 ActionCode.ASYNC_START 的处理如下,可以看到实际是调用了异步状态机的 AsyncStateMachine#asyncStart() 方法

此处需要注意,Tomcat 会根据每一个请求的协议为其创建一个 AbstractProcessor 实现类,这个实现类负责这个请求全部生命周期的处理,这也就是AbstractProcessor内部保存 AsyncStateMachine 状态机的原因

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final void action(ActionCode actionCode, Object param) {
switch (actionCode) {

......


// Servlet 3.0 asynchronous support
case ASYNC_START: {
asyncStateMachine.asyncStart((AsyncContextCallback) param);
break;
}
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
case ASYNC_DISPATCH: {
if (asyncStateMachine.asyncDispatch()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
case ASYNC_DISPATCHED: {
asyncStateMachine.asyncDispatched();
break;
}
case ASYNC_ERROR: {
asyncStateMachine.asyncError();
break;
}

......

}
}
  1. AsyncStateMachine#asyncStart() 方法实际保存了异步开始时间,并进行状态更新,此处可以看到其将步骤11提到的 AsyncContextImpl 保存了下来,后续将用于回调上层

    AsyncStateMachine这个类实际维护了一个异步请求的状态流转,伴随着这个请求的整个生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
synchronized void asyncStart(AsyncContextCallback asyncCtxt) {
if (state == AsyncState.DISPATCHED) {
generation.incrementAndGet();
updateState(AsyncState.STARTING);
// Note: In this instance, caller is responsible for calling
// asyncCtxt.incrementInProgressAsyncCount() as that allows simpler
// error handling.
this.asyncCtxt = asyncCtxt;
lastAsyncStart = System.currentTimeMillis();
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState",
"asyncStart()", state));
}
}
  1. 此时回到步骤 9,创建完 AsyncContextImpl 对象,还会为其注册监听器,设置超时时间,方法源码如下:

  2. 至此我们可以理出一条异步请求完成时的传播链,自底向上的传播方向为 AsyncStateMatch –> AsyncContextImpl –> StandardServletAsyncWebRequest –> DeferredResult

  3. 可以看到超时时间在 AsyncContextImpl 内部保存了一份,也通过 org.apache.coyote.Request 设置到了底层,此处不再赘述方法堆栈,直接看AbstractProcessor#action() 对于 ActionCode.ASYNC_SETTIMEOUT 的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void addListener(AsyncListener listener) {
check();
AsyncListenerWrapper wrapper = new AsyncListenerWrapper();
wrapper.setListener(listener);
listeners.add(wrapper);
}



@Override
public void setTimeout(long timeout) {
check();
this.timeout = timeout;
request.getCoyoteRequest().action(ActionCode.ASYNC_SETTIMEOUT,
Long.valueOf(timeout));
}
  1. AbstractProcessor#action() 对于 ActionCode.ASYNC_SETTIMEOUT 的处理如下,可以看到实际只是作了保存操作而已

    至此超时时间已经设置到底层,那么 Tomcat 是如何做到异步请求超时返回呢?答案如下:

    Tomcat 不断轮询 socket,当轮询到可读写的 socket 时会创建 SocketProcessorBase 异步任务扔到线程池中执行 socket 读写任务。上文提到过 AbstractProcessor 实现类负责一个请求的全部生命周期处理,在处理 socket 的过程中,连接处理器 ConnectionHandler 如果发现当前请求是异步请求,则会将负责处理这个请求的 Processor 添加到等待队列中
    Tomcat 服务器启动时会开启一个周期线程池,处理等待队列中的 Processor,检查其是否超时。超时则触发处理 Socket 超时事件,最终通过 CoyoteAdapter 适配器将超时回调到上层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void action(ActionCode actionCode, Object param) {
switch (actionCode) {

......

case ASYNC_SETTIMEOUT: {
if (param == null) {
return;
}
long timeout = ((Long) param).longValue();
setAsyncTimeout(timeout);
break;
}


......

}
}

public void setAsyncTimeout(long timeout) {
asyncTimeout = timeout;
}
  1. 此时回到步骤 8 第 5 点DeferredResult#setResultHandler() 的处理如下,可以看到这个步骤主要目的是保存 DeferredResultHandler,此时 DeferredResult 内部还没有有效的结果,所以不会调用 DeferredResultHandler#handleResult() 处理结果,至此请求的异步处理结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void setResultHandler(DeferredResultHandler resultHandler) {
Assert.notNull(resultHandler, "DeferredResultHandler is required");
// Immediate expiration check outside of the result lock
if (this.expired) {
return;
}
Object resultToHandle;
synchronized (this) {
// Got the lock in the meantime: double-check expiration status
if (this.expired) {
return;
}
resultToHandle = this.result;
if (resultToHandle == RESULT_NONE) {
// No result yet: store handler for processing once it comes in
this.resultHandler = resultHandler;
return;
}
}
// If we get here, we need to process an existing result object immediately.
// The decision is made within the result lock; just the handle call outside
// of it, avoiding any deadlock potential with Servlet container locks.
try {
resultHandler.handleResult(resultToHandle);
}
catch (Throwable ex) {
logger.debug("Failed to process async result", ex);
}
}

请求的再次分发

  1. DeferredResult#setResult() 方法被调用时将会对客户端请求进行响应,请求的再次分发也是由此触发
    1. DeferredResult#setResult() 方法只是个入口,可以看到当设置了结果之后,将调用 DeferredResultHandler#handleResult() 处理结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Set the value for the DeferredResult and handle it.
* @param result the value to set
* @return {@code true} if the result was set and passed on for handling;
* {@code false} if the result was already set or the async request expired
* @see #isSetOrExpired()
*/
public boolean setResult(T result) {
return setResultInternal(result);
}

private boolean setResultInternal(Object result) {
// Immediate expiration check outside of the result lock
if (isSetOrExpired()) {
return false;
}
DeferredResultHandler resultHandlerToUse;
synchronized (this) {
// Got the lock in the meantime: double-check expiration status
if (isSetOrExpired()) {
return false;
}
// At this point, we got a new result to process
this.result = result;
resultHandlerToUse = this.resultHandler;
if (resultHandlerToUse == null) {
// No result handler set yet -> let the setResultHandler implementation
// pick up the result object and invoke the result handler for it.
return true;
}
// Result handler available -> let's clear the stored reference since
// we don't need it anymore.
this.resultHandler = null;
}
// If we get here, we need to process an existing result object immediately.
// The decision is made within the result lock; just the handle call outside
// of it, avoiding any deadlock potential with Servlet container locks.
resultHandlerToUse.handleResult(result);
return true;
}
  1. DeferredResultHandler#handleResult() 的实现其实在 ??节步骤 8 提到过,实际在 WebAsyncManager#startDeferredResultProcessing() 中,如下所示,可以看到核心逻辑是调用 WebAsyncManager#setConcurrentResultAndDispatch() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

......

try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
//此处
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
  1. WebAsyncManager#setConcurrentResultAndDispatch() 方法的核心处理如下:
    1. DeferredResult#setResult() 设置的结果保存在当前 WebAsyncManager 对象中
    2. this.asyncWebRequest.dispatch() 将异步请求再次分发,请求分发的流程与本文相关较小,暂不做分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
//将 DeferredResult#setResult() 设置的结果保存在当前 WebAsyncManager 对象中
this.concurrentResult = result;
this.errorHandlingInProgress = (result instanceof Throwable);
}

if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatRequestUri());
}
return;
}

if (logger.isDebugEnabled()) {
boolean isError = result instanceof Throwable;
logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
}
//将异步请求再次分发
this.asyncWebRequest.dispatch();
}
  1. 再次分发的请求将重新进入 DispatcherServlet#doDispatch() 处理,其流程与2.1节步骤1-4基本相同,只不过这次请求处理将进入2.1节步骤4第4点逻辑,此时 WebAsyncManager中已经有异步请求的结果了, invocableMethod.wrapConcurrentResult() 将会创建一个新的ConcurrentResultHandlerMethod 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

......

//判断异步请求是否已经有结果了,此处逻辑将在异步请求二次分发处理时进入,暂且不表
if (asyncManager.hasConcurrentResult()) {
//此时 asyncManager 中已经有异步请求的结果了
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
//创建一个新的 ConcurrentResultHandlerMethod 对象
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}

//实际调用处理器方法
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}

return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
  1. ConcurrentResultHandlerMethod 的构造方法如下,可以看到这个对象把异步请求的结果包装到一个 Callable 对象中,并设置了其反射调用的 Method 为 call,这一步其实就是把原本应该反射调用的处理器方法丢弃了,直接返回异步请求已经设置好的返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
super((Callable<Object>) () -> {
if (result instanceof Exception) {
throw (Exception) result;
}
else if (result instanceof Throwable) {
throw new NestedServletException("Async processing failed", (Throwable) result);
}
//将异步请求的结果包装到一个 Callable 对象中
return result;
//反射调用的 Method 为 call
}, CALLABLE_METHOD);

if (ServletInvocableHandlerMethod.this.returnValueHandlers != null) {
setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
}
this.returnType = returnType;
}
  1. 继续本节步骤4流程,此时 invocableMethod.invokeAndHandle() 依然将调用到 ConcurrentResultHandlerMethod 父类ServletInvocableHandlerMethod#invokeAndHandle() 方法,此处的处理是异步请求响应的最后一步

    需注意 ConcurrentResultHandlerMethod 内部保存的反射所需的 Method 对象已经变成了 Callable
    call 方法,则此处invokeForRequest()反射调用实际调用到 Callable#call() 方法,返回值已经是 DeferredResult 实际应该返回的数据了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {

//反射调用处理器方法,获得方法返回值
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
setResponseStatus(webRequest);

if (returnValue == null) {
if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
}
else if (StringUtils.hasText(getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}

mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {//使用返回值处理器处理返回值
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
}
catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}
  1. 对于普通类型的返回值,this.returnValueHandlers.handleReturnValue() 处理返回值时会选择对应的返回值处理器,而不会再次由 DeferredResultMethodReturnValueHandler 处理,至此,异步请求的结果将顺利响应给客户端,异步请求结束

总结

​ 经过以上学习,现简答一下开文的问题:

  1. 什么是长轮询?除了长轮询还有什么方式?如何选择?

    答:它是实现服务端和客户端即时数据传输的一种主要方式,其他还有轮询、长连接、WebSocket 等。

    至于如何选择,需要结合自身业务场景和各个方式的特点进行判断

  2. 什么是 DeferredResult?如何使用?

    答:DeferredResult 是 SpringMVC 3.2 之后引入新的类,只要让请求方法返回 DeferredResult,就可以快速使用异步请求

  3. DeferredResult 的实现原理是什么?

    答:Spring MVC 在调用 Controller 方法后会对返回值做处理,如果发现返回值为异步请求类型(DeferredResult/Callable),则不会立即响应客户端,而是异步执行方法,并将请求挂起,中断当前请求的处理流程,直到异步任务超时或者被设置了结果值(DeferredResult 调用setResult 方法,callable执行完成自动发起)时,将该请求再次分发并且直接用执行结果作为返回值响应客户端。这样处理请求的线程将被释放,可以用于处理新的请求,从而提高服务端的吞吐量。

  4. 在实际工作中什么时候会用到?

    答:

    1. 需要实时获取数据状态变化
    2. 业务执行时间较长,长时间占用服务器连接资源

​ 在众多技术方案中没有”银弹“,总是需要根据自身业务灵活判断,而众多优秀的开源案例为我们提供了非常好的参考案例。并且这些案例都是经过大量生产实践后沉淀下来的宝贵经验。一些细微处的设计通常不会立刻应用在自身工作中,但是长期积累总会有用武之地。

参考资料

消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?

servlet 3异步请求

消息队列中的推拉模式

异步请求 DeferredResult 的原理