回调函数方式防线程池任务丢失

传统线程池关闭的痛点分析

1. 任务丢失风险

首先是时间竞争问题,JVM 关闭和任务执行之间存在时间竞争,任务可能来不及完成就被强制终止了。其次是资源浪费,已经投入的计算资源和业务逻辑处理可能前功尽弃。最严重的是数据一致性问题,对于涉及数据跑批等任务,强制中断可能导致数据不一致

2. 手动管理的复杂性

Spring 应用中,正确管理线程池的生命周期 需要开发者手动处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class TaskService {
private ThreadPoolExecutor executor;
@PostConstruct
public void init() {
executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
}
@PreDestroy
public void destroy() {
// 开发者需要记住手动关闭
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
try {
// 需要手动实现等待逻辑
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("线程池无法正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

每个使用线程池的组件都需要编写类似的销毁逻辑,产生大量样板代码

开发者很容易忘记添加 @PreDestroy 方法,导致资源泄漏

3. 缺乏必要的监控和日志记录

除此之外,传统线程池在关闭过程中缺乏必要的监控和日志记录:

1
2
3
4
5
6
7
// 传统方式缺乏关闭过程的可观测性
executor.shutdown();
// 无法知道:
// - 关闭过程是否顺利?
// - 有多少任务被中断?
// - 等待了多长时间?
// - 是否存在资源泄漏?

优雅关闭机制设计

ps:等待机制方面,选择了 awaitTermination() 而不是简单的 Thread.sleep()。这是因为 awaitTermination() 能够准确感知所有任务的完成状态,在任务提前完成时立即返回,不浪费等待时间,还能正确处理中断信号

shutdown() 方法重写实现

1. 核心实现代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void shutdown() {
// 1. 防重复关闭检查
if (isShutdown()) {
return;
}
// 2. 调用父类的 shutdown 方法,停止接收新任务
super.shutdown();
// 3. 如果未配置等待时间,直接返回
if (this.awaitTerminationMillis <= 0) {
return;
}
log.info("Before shutting down ExecutorService {}", threadPoolId);
try {
// 4. 等待所有任务完成,最多等待 awaitTerminationMillis 毫秒
boolean isTerminated = this.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS);
// 5. 记录 成功关闭日志 / 超时警告日志
if (!isTerminated) {
log.warn("Timed out while waiting for executor {} to terminate.", threadPoolId);
} else {
log.info("ExecutorService {} has been shutdown.", threadPoolId);
}
} catch (InterruptedException ex) {
log.warn("Interrupted while waiting for executor {} to terminate.", threadPoolId);
// 6. 重要:恢复线程的中断状态
Thread.currentThread().interrupt();
}
}

2. 实现细节深度分析

  1. 防重复关闭机制
1
2
3
if (isShutdown()) {
return;
}

通过 isShutdown() 检查,确保关闭逻辑只执行一次,避免重复等待和日志记录

  1. 渐进式关闭策略
1
2
3
4
// 第一步:停止接收新任务
super.shutdown();
// 第二步:等待现有任务完成
boolean isTerminated = this.awaitTermination(this.awaitTerminationMillis,TimeUnit.MILLISECONDS);

首先是立即响应,super.shutdown() 会立即停止接收新任务,避免关闭过程中任务继续堆积。然后是优雅等待,awaitTermination() 给现有任务充足的时间完成执行。最后是可控超时,通过 awaitTerminationMillis 参数控制最大等待时间,避免无限等待

3. 配置参数设计

推荐配置值:

快速任务 处理线程池(如缓存更新、日志记录): 5秒

中等耗时任务 线程池(如文件处理、邮件发送): 30秒

长时间任务 线程池(如数据导入、报表生成): 2分钟

Spring Bean 生命周期集成

1. Spring Bean 销毁机制概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 1. Spring 提供的 Bean 销毁机制
public interface DisposableBean {
void destroy() throws Exception;
}

// 2. 或者使用注解方式
@PreDestroy
public void cleanup() {
// 清理逻辑
}

// 3. 或者在 @Bean 注解中指定
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor createExecutor() {
return new ThreadPoolExecutor(...);
}

2. ysyThread 的自动销毁实现

Spring 框架有一个重要特性:自动销毁方法推断。当 Spring 容器关闭时,它会自动查找 Bean 中名为 closeshutdownstop 等的方法,并在 Bean 销毁时自动调用

由于 YsyThreadExecutor 继承自 ThreadPoolExecutor,而 ThreadPoolExecutor 本身就有 shutdown() 方法,所以当 Spring 容器关闭时:

  1. Spring 会自动检测到 YsyThreadExecutor Beanshutdown() 方法

  2. 在容器关闭过程中自动调用这个方法

  3. 触发 ysyThread 的优雅关闭逻辑

传统 vs 优雅关闭