本文主要介绍Spring事件流和@Async
异步线程池处理,以及@Async
默认线程池可能会导致的问题及解决方法。
事件流
Spring可以使用以观察者模式实现的事件流操作,将业务逻辑解耦,达到职责分离的效果
Spring事件流的详解
发布事件:
public class EmailService implements ApplicationEventPublisherAware {private ApplicationEventPublisher publisher;public void sendEmail(String address, String content) {publisher.publishEvent(new BlackListEvent(this, address, content));// send email...}
}
监听事件:
@EventListener(condition = "#blEvent.content == 'foo'")
public void processBlackListEvent(BlackListEvent blEvent) {// notify appropriate parties via notificationAddress...
}
注意在默认情况下,事件监听器会同步接收事件。这意味着
publishEvent()
方法将阻塞,直到所有侦听器都已完成对事件的处理为止。
@Async
用@Async
注解bean
的一个方法,就会让它在一个单独的线程中执行。换句话说,调用者不会等待被调用方法的完成
@Async
有两个限制:
- 它必须仅应用于
public
方法 - 自调用(从同一个类中调用异步方法)将不起作用
原因:该方法需要为
public
才可以被代理。而自调用是不生效的,因为它绕过了代理,直接调用了底层方法。
异步返回参数
可以通过将实际返回包装在Future
中,将@Async
应用于具有返回类型的方法
示例详见How To Do @Async in Spring
@Async
public Future<String> asyncMethodWithReturnType() {System.out.println("Execute method asynchronously - " + Thread.currentThread().getName());try {Thread.sleep(5000);return new AsyncResult<String>("hello world !!!!");} catch (InterruptedException e) {//}return null;
}
Spring 还提供了一个实现Future
的AsyncResult
类。我们可以使用它来跟踪异步方法执行的结果。
现在让我们调用上述方法并使用Future
对象检索异步过程的结果。
public void testAsyncAnnotationForMethodsWithReturnType()throws InterruptedException, ExecutionException {System.out.println("Invoking an asynchronous method. " + Thread.currentThread().getName());Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();while (true) {if (future.isDone()) {System.out.println("Result from asynchronous process - " + future.get());break;}System.out.println("Continue doing something else. ");Thread.sleep(1000);}
}
异步监听器
如果要特定的侦听器异步处理事件,只需重用常规@Async
支持:
@EventListener
@Async
public void processBlackListEvent(BlackListEvent event) {// BlackListEvent is processed in a separate thread
}
使用异步事件时,请注意以下限制:
- 如果事件监听器抛出
Exception
,它将不会传播给调用者,详见AsyncUncaughtExceptionHandler
- 此类事件监听器无法发送答复事件。如果您需要发送另一个事件作为处理结果,请注入
ApplicationEventPublisher
以手动发送事件。
@EventPublisher + @Async 阻塞
在@Async
注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor
线程池。
默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果
spring默认线程池配置参数
org.springframework.boot.autoconfigure.task.TaskExecutionProperties
/*** Configuration properties for task execution.** @author Stephane Nicoll* @since 2.1.0*/
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {private final Pool pool = new Pool();/*** Prefix to use for the names of newly created threads.*/private String threadNamePrefix = "task-";public static class Pool {/*** Queue capacity. An unbounded capacity does not increase the pool and therefore* ignores the "max-size" property.*/private int queueCapacity = Integer.MAX_VALUE;/*** Core number of threads.*/private int coreSize = 8;/*** Maximum allowed number of threads. If tasks are filling up the queue, the pool* can expand up to that size to accommodate the load. Ignored if the queue is* unbounded.*/private int maxSize = Integer.MAX_VALUE;/*** Whether core threads are allowed to time out. This enables dynamic growing and* shrinking of the pool.*/private boolean allowCoreThreadTimeout = true;/*** Time limit for which threads may remain idle before being terminated.*/private Duration keepAlive = Duration.ofSeconds(60);//getter/setter}
}
自定义线程池
在@Async
注解中value参数使用自定义线程池,能让开发工程师更加明确线程池的运行规则,选取适合的线程策略,规避资源耗尽的风险
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize
,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
@Configuration
public class ThreadConfig {@Bean("msgThread")public ThreadPoolTaskExecutor getMsgSendTaskExecutor(){ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(10);taskExecutor.setMaxPoolSize(25);taskExecutor.setQueueCapacity(800);taskExecutor.setAllowCoreThreadTimeOut(false);taskExecutor.setAwaitTerminationSeconds(60);taskExecutor.setThreadNamePrefix("msg-thread-");taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());taskExecutor.initialize();return taskExecutor;}
}
监听事件异步处理
@EventListener(value = MsgEvent.class, condition = "#root.args[0].type == 0")
@Async("msgThread")
public void commonEvent(MsgEvent event) {//logic
}
@Async
使用自定义线程池的其他方式
参考资料:
- Spring事件流
- @Async优化
- How To Do @Async in Spring
- Spring使用@Async注解