core 模块设计-01
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| . ├── core # 动态线程池核心模块包,实现动态线程池相关基础类定义 ├── dashboard-dev # 前端页面方便查看和调试 ├── example # 动态线程池示例包,演示线程池动态参数变更、监控和告警等功能 │ ├── apollo-example │ └── nacos-cloud-example ├── spring-base # 动态线程池基础模块包,包含Spring扫描动态线程池、是否启用以及Banner打印等 └── starter # 动态线程池配置中心组件包,实现线程池结合Spring框架和配置中心动态刷新 ├── adapter # 动态线程池适配层,比如对接 Web 容器 Tomcat 线程池等 │ └── web-spring-boot-starter # Web 容器线程池组件库 ├── apollo-spring-boot-starter # Apollo 配置中心动态监控线程池组件库 ├── common-spring-boot-starter # 配置中心公共监听等逻辑抽象组件库 ├── dashboard-dev-spring-boot-starter # 控制台 API 组件库 └── nacos-cloud-spring-boot-starter # Nacos 配置中心动态监控线程池组件库
|
与第三方框架无关的通用逻辑,统一抽象封装在了 ysythread-core 模块中
该模块主要承担以下核心职责:
- 定义动态线程池的 基础抽象类
- 执行线程池运行时的 告警扫描逻辑
- 提供线程池运行状态的 指标采集能力
- 支持 监听配置中心变更,并 动态调整线程池参数及触发告警通知
关键类
1. YsyThreadExecutor
问题:为什么要单独定义一个线程池基类,而不是直接使用原生的ThreadPoolExecutor呢?
原因:
- 原生线程池并没有线程池 ID 或名称的概念,在实际业务中,我们往往需要对线程池进行运行时变更、指标监控、告警通知等操作。而缺乏唯一标识,会让我们在面对多个线程池时难以准确定位、识别和管理
ThreadPoolExecutor 本身并不排斥继承,反而在设计上为继承扩展预留了几个关键的扩展点方法
让YsyThreadExecutor继承于ThreadPoolExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public YsyThreadExecutor( @NonNull String threadPoolId, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long awaitTerminationMills) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); setRejectedExecutionHandler(handler); this.threadPoolId = threadPoolId; this.awaitTerminationMills = awaitTerminationMills; }
|
- 在
setRejectedExecutionHandler(handler)中重写rejectedExecution方法
2.ThreadPoolExecutorProperties
为了更好地支持动态配置和统一管理,通过 threadPoolId 将这些“外围参数”与线程池进行关联,形成一套完整的配置体系,便于在运行时查看、存储、变更和追踪
外围参数【队列容量、是否允许核心线程超时、通知配置、报警配置】
3. ThreadPoolExecutorHolder
线程池实例对象 和 线程池参数配置对象 本质上是两个独立的实体。但在实际使用中,往往需要同时操作这两部分信息 ,例如在执行动态变更、状态监控或告警处理时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Data @AllArgsConstructor public class ThreadPoolExecutorHolder {
private String threadPoolId;
private ThreadPoolExecutor executor;
private ThreadPoolExecutorProperties executorProperties; }
|
4. YsyThreadRegistry
因为单个项目中往往不止一个动态线程池实例 。因此,我们需要一个统一的线程池管理容器 来进行集中存储和管理
通过这一管理器,我们就可以以统一的方式对所有线程池进行注册、查询和维护
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class YsyThreadRegistry {
private static final Map<String, ThreadPoolExecutorHolder> HOLDER_MAP = new ConcurrentHashMap<>();
public static void putHolder(String threadPoolId, ThreadPoolExecutor executor, ThreadPoolExecutorProperties properties) { ThreadPoolExecutorHolder executorHolder = new ThreadPoolExecutorHolder(threadPoolId, executor, properties); HOLDER_MAP.put(threadPoolId, executorHolder); }
public static ThreadPoolExecutorHolder getHolder(String threadPoolId) { return HOLDER_MAP.get(threadPoolId); }
public static Collection<ThreadPoolExecutorHolder> getAllHolders() { return HOLDER_MAP.values(); } }
|
引入 Builder 设计模式
Lombok不是也提供了@Builder注解吗?为什么还要手动实现Builder模式呢?
Lombok 提供的 @Builder 注解确实很方便,它的最大优势在于省代码、上手快 ,非常适合用于构建简单的 Java 对象。但相比之下,手写的 Builder 模式则具有更强的灵活性和可控性 ,能够支持复杂构建逻辑、参数校验、默认值注入等扩展场景
以我们当前的线程池构建器为例,如果单纯使用 Lombok 的 @Builder,只能构造出一个静态对象,无法根据构建过程中的参数动态决定是创建普通线程池还是动态线程池 ,也无法在构建过程中插入定制化校验或初始化逻辑
可以根据dynamicPool参数决定实例化 普通线程池 还是 动态线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public ThreadPoolExecutor build() { BlockingQueue<Runnable> blockingQueue = BlockingQueueTypeEnum.createBlockingQueue(workQueueType.getName(), workQueueCapacity); RejectedExecutionHandler rejectedHandler = Optional.ofNullable(this.rejectedHandler) .orElseGet(() -> new ThreadPoolExecutor.AbortPolicy()); Assert.notNull(threadFactory, "The thread factory cannot be null."); ThreadPoolExecutor threadPoolExecutor; if (dynamicPool) { threadPoolExecutor = new YsyThreadExecutor( threadPoolId, corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, blockingQueue, threadFactory, rejectedHandler, awaitTerminationMillis ); } else { threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, blockingQueue, threadFactory, rejectedHandler ); } threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut); return threadPoolExecutor; }
|
如何使用 build()
1 2 3 4 5 6 7 8 9 10
| ThreadPoolExecutorBuilder.builder() .threadPoolId("ysythread-1") .corePoolSize(4) .maximumPoolSize(6) .keepAliveTime(9999L) .workQueueType(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE) .threadFactory("ysythread-1") .rejectedHandler(new ThreadPoolExecutor.CallerRunsPolicy()) .dynamicPool() .build();
|