上一篇文章主要讲了EnableAsync
注解是如何创建aop并生效的,这一篇讲springboot
是如何处理被拦截的方法的;
这里看AsyncExecutionInterceptor
类中的invoke
方法,源码如下:
public Object invoke(final MethodInvocation invocation) throws Throwable {
//1 获取拦截的方法
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//2 根据被拦截的方法来选取执行异步任务的执行器
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//3 构建任务(添加异常的处理方式)
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//4 执行构建的任务并返回任务执行结果
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
该类的determineAsyncExecutor
方法会返回一个AsyncTaskExecutor
,也就是返回一个执行异步任务的线程池;
这里来看看上一个类中的determineAsyncExecutor
方法源码:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 1根据方法选取`executors`缓存中的执行器
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
//2 获取方法上面的Async注解的value值
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
//3 根据获取的value值查找对应的执行器
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
//4 默认执行器
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
//5 类型转换
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
// 6 放入线程私有内存
this.executors.put(method, executor);
}
//7
return executor;
}
executors
缓存中的执行器Async
注解的value
,具体的源码在AnnotationAsyncExecutionInterceptor
类中value
值,则从beanFactory
中获取该value
值的bean
(根据注解中的值选取Executor
执行器)targetExecutor
为executor
executor
存入缓存executor
该方法是返回参数方法上的Async
注解的value
值(executor
的bean
的name
),该方法的源码在AnnotationAsyncExecutionInterceptor
类中:
protected String getExecutorQualifier(Method method) {
// Maintainer‘s note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
//1
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
//2
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
//3
return (async != null ? async.value() : null);
}
Async
注解信息null
,则根据方法的类来获取Async
注解信息Async
的注解值在2.0中的第四步调用了defaultExecutor,获取默认的Executor:
targetExecutor = this.defaultExecutor.get();
那么这个defaultExecutor的值是什么时候初始化的呢???
来看看AsyncExecutionAspectSupport的构造方法:
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}
先讲解这个方法:
这里是在给defaultExecutor
属性和exceptionHandler
属性赋值;
那么为什么要看这个构造方法呢???
接下来返回到AsyncAnnotationAdvisor类的buildAdvice方法(参考上一篇文章:springboot异步线程(三)源码解析(一)),最终调用了上方的构造方法
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
我们可以看见调用AnnotationAsyncExecutionInterceptor
构造方法的参数为null
;
接下来看看AsyncExecutionAspectSupport
类的构造方法中给defaultExecutor
属性赋值时的另一个方法getDefaultExecutor
方法,该方法就是获取BeanFactory
中的Executor
,这里选关键代码:
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
// 根据TaskExecutor类别来获取Executor
return beanFactory.getBean(TaskExecutor.class);
// 根据Executor类别且beanName为taskExecutor值来获取Executor
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
该方法被子类AsyncExecutionInterceptor重写:
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
该方法是执行上边1.0的第四步:将拦截的方法封装任务并交由默认执行器执行,最后返回结果;
该方法的源码在AsyncExecutionAspectSupport类,源码如下:
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
这里主要写一下异步任务选择执行器的大概流程吧:
在我的文章:springboot异步线程(二) 中有讲到:springboot2.1之后的版本与springboot2.1之前的版本的区别,这里也就不细说了;
我还是我,我在努力,Take your time。
原文:https://www.cnblogs.com/guoyuchuan/p/13296474.html