core 模块设计-01

1
2
3
4
5
6
7
8
9
10
11
12
13
14
.
├── core # 动态线程池核心模块包,实现动态线程池相关基础类定义
├── dashboard-dev # 前端页面方便查看和调试
├── example # 动态线程池示例包,演示线程池动态参数变更、监控和告警等功能
│   ├── apollo-example
│   └── nacos-cloud-example
├── spring-base # 动态线程池基础模块包,包含Spring扫描动态线程池、是否启用以及Banner打印等
└── starter # 动态线程池配置中心组件包,实现线程池结合Spring框架和配置中心动态刷新
  ├── adapter # 动态线程池适配层,比如对接 Web 容器 Tomcat 线程池等
  │   └── web-spring-boot-starter # Web 容器线程池组件库
  ├── apollo-spring-boot-starter # Apollo 配置中心动态监控线程池组件库
  ├── common-spring-boot-starter # 配置中心公共监听等逻辑抽象组件库
  ├── dashboard-dev-spring-boot-starter # 控制台 API 组件库
  └── nacos-cloud-spring-boot-starter # Nacos 配置中心动态监控线程池组件库

与第三方框架无关的通用逻辑,统一抽象封装在了 ysythread-core 模块中

该模块主要承担以下核心职责:

  • 定义动态线程池的 基础抽象类
  • 执行线程池运行时的 告警扫描逻辑
  • 提供线程池运行状态的 指标采集能力
  • 支持 监听配置中心变更,并 动态调整线程池参数及触发告警通知

关键类

1. YsyThreadExecutor

问题:为什么要单独定义一个线程池基类,而不是直接使用原生的ThreadPoolExecutor呢?

原因:

  1. 原生线程池并没有线程池 ID 或名称的概念,在实际业务中,我们往往需要对线程池进行运行时变更、指标监控、告警通知等操作。而缺乏唯一标识,会让我们在面对多个线程池时难以准确定位、识别和管理
  2. ThreadPoolExecutor 本身并不排斥继承,反而在设计上为继承扩展预留了几个关键的扩展点方法

YsyThreadExecutor继承于ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public YsyThreadExecutor(
@NonNull String threadPoolId,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
long awaitTerminationMills) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// 通过动态代理设置拒绝策略执行次数
setRejectedExecutionHandler(handler);
// 设置动态线程池扩展属性:线程池 ID 标识
this.threadPoolId = threadPoolId;
// 设置等待终止时间,单位毫秒
this.awaitTerminationMills = awaitTerminationMills;
}
  • setRejectedExecutionHandler(handler)中重写rejectedExecution方法

2.ThreadPoolExecutorProperties

为了更好地支持动态配置和统一管理,通过 threadPoolId 将这些“外围参数”与线程池进行关联,形成一套完整的配置体系,便于在运行时查看、存储、变更和追踪

外围参数【队列容量、是否允许核心线程超时、通知配置、报警配置】

3. ThreadPoolExecutorHolder

线程池实例对象线程池参数配置对象 本质上是两个独立的实体。但在实际使用中,往往需要同时操作这两部分信息 ,例如在执行动态变更、状态监控或告警处理时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data
@AllArgsConstructor
public class ThreadPoolExecutorHolder {
/**
* 线程池唯一标识
*/
private String threadPoolId;
/**
* 线程池
*/
private ThreadPoolExecutor executor;
/**
* 线程池属性参数
*/
private ThreadPoolExecutorProperties executorProperties;
}

4. YsyThreadRegistry

因为单个项目中往往不止一个动态线程池实例 。因此,我们需要一个统一的线程池管理容器 来进行集中存储和管理

  • 这个注册中心的设计非常简洁,主要提供了以下 3 类核心能力:

    • 新增线程池实例

    • 根据线程池ID获取指定实例

    • 获取当前所有线程池实例

通过这一管理器,我们就可以以统一的方式对所有线程池进行注册、查询和维护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class YsyThreadRegistry {
/**
* 线程池持有者缓存,key 为线程池唯一标识,value 为线程池包装类
*/
private static final Map<String, ThreadPoolExecutorHolder> HOLDER_MAP = new ConcurrentHashMap<>();
/**
* 注册线程池到管理器
*/
public static void putHolder(String threadPoolId, ThreadPoolExecutor executor, ThreadPoolExecutorProperties properties) {
ThreadPoolExecutorHolder executorHolder = new ThreadPoolExecutorHolder(threadPoolId, executor, properties);
HOLDER_MAP.put(threadPoolId, executorHolder);
}
/**
* 根据线程池 Id 获取对应线程池包装对象
*/
public static ThreadPoolExecutorHolder getHolder(String threadPoolId) {
return HOLDER_MAP.get(threadPoolId);
}
/**
* 获取所有线程池集合
*/
public static Collection<ThreadPoolExecutorHolder> getAllHolders() {
return HOLDER_MAP.values();
}
}

引入 Builder 设计模式

  • Lombok不是也提供了@Builder注解吗?为什么还要手动实现Builder模式呢?

  • Lombok 提供的 @Builder 注解确实很方便,它的最大优势在于省代码、上手快 ,非常适合用于构建简单的 Java 对象。但相比之下,手写的 Builder 模式则具有更强的灵活性和可控性 ,能够支持复杂构建逻辑、参数校验、默认值注入等扩展场景

  • 以我们当前的线程池构建器为例,如果单纯使用 Lombok@Builder,只能构造出一个静态对象,无法根据构建过程中的参数动态决定是创建普通线程池还是动态线程池 ,也无法在构建过程中插入定制化校验或初始化逻辑

可以根据dynamicPool参数决定实例化 普通线程池 还是 动态线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public ThreadPoolExecutor build() {
// 1. 【创建队列】
BlockingQueue<Runnable> blockingQueue = BlockingQueueTypeEnum.createBlockingQueue(workQueueType.getName(), workQueueCapacity);
// 2. 【确定拒绝策略】
RejectedExecutionHandler rejectedHandler = Optional.ofNullable(this.rejectedHandler)
.orElseGet(() -> new ThreadPoolExecutor.AbortPolicy());
// 3. 【参数校验】
Assert.notNull(threadFactory, "The thread factory cannot be null.");
ThreadPoolExecutor threadPoolExecutor;
// 4. 【实例化线程池】 (核心分支)
if (dynamicPool) {
// 分支 A: 如果标记为 dynamicPool = true
// 创建框架自定义的 YsyThreadExecutor。
// 这是一个继承自 JDK ThreadPoolExecutor 的增强类,通常包含了:
// - 线程池 ID (threadPoolId) 用于监控区分
// - 拒绝策略统计
// - 任务执行耗时统计
// - 优雅停机等待时间 (awaitTerminationMillis)
threadPoolExecutor = new YsyThreadExecutor(
threadPoolId,
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
blockingQueue,
threadFactory,
rejectedHandler,
awaitTerminationMillis
);
} else {
// 分支 B: 如果只是普通线程池
// 创建 JDK 原生的 ThreadPoolExecutor,不带任何额外的监控或增强功能。
threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
blockingQueue,
threadFactory,
rejectedHandler
);
}
// 5. 【设置额外参数】
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return threadPoolExecutor;
}

如何使用 build()

1
2
3
4
5
6
7
8
9
10
ThreadPoolExecutorBuilder.builder()
.threadPoolId("ysythread-1")
.corePoolSize(4)
.maximumPoolSize(6)
.keepAliveTime(9999L)
.workQueueType(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE)
.threadFactory("ysythread-1")
.rejectedHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.dynamicPool() // 默认为 false,设置为动态线程池 true
.build();