线程池参数的并发安全刷新

动态线程池的核心能力之一,就是运行时可以自动感知配置变化并热更新,无需重启服务

配置前置验证

1. 判断是否有线程池配置

首先检查远程配置是否包含线程池设置,如果为空,则直接返回,跳过后续处理

2. 判断线程池配置是否发生变化

我们逐个遍历线程池 ID,加synchronized锁进行对比,根据线程池 ID 获取 注册中心 中的线程池实例,对比新旧配置

ps:参数中的阻塞队列,除了要校验容量,还要校验类型

线程池配置变更

如果检测到变化,调用线程池刷新方法应用新的配置:

1. 核心 / 最大线程数

线程数更新的原则是:先最大后核心,如果新核心线程数大于当前最大线程数,必须先调大 maximumPoolSize,否则 JDK 会抛 IllegalArgumentException。之所以会抛出异常,是因为 JDK17 线程池底层在设置核心线程数时做了 参数限制校验

1
2
3
4
5
6
7
if ( 远程核心 > 本地最大 ) {
   executor.setMaximumPoolSize( 远程最大 );
   executor.setCorePoolSize( 远程核心 );
} else {
   executor.setCorePoolSize( 远程核心 );
   executor.setMaximumPoolSize( 远程最大 );
}

2. 拒绝策略

借助 枚举工厂 方法把字符串策略名转换为真正的 RejectedExecutionHandler 实例,保证可插拔

3. 队列容量动态扩容

只有当队列实例 实现了可扩容接口 时才可以修改容量,避免 LinkedBlockingQueueJDK 原生队列不支持容量变化导致的风险

1
2
3
4
5
6
7
8
// --- 5. 更新队列容量 ---
// JDK 原生 LinkedBlockingQueue 容量是 final 的,无法修改。
// 这里必须使用自定义的可扩容队列 ResizableCapacityLinkedBlockingQueue 才能生效。
if (isQueueCapacityChanged(originalProperties, remoteProperties, executor)) {
BlockingQueue<Runnable> queue = executor.getQueue();
ResizableCapacityLinkedBlockingQueue<?> resizableQueue = (ResizableCapacityLinkedBlockingQueue<?>) queue;
resizableQueue.setCapacity(remoteProperties.getQueueCapacity());
}

4. 刷新元数据、发送通知、打印审计日志

线程池运行时参数变更后,还有一些后置逻辑需要处理:

  1. 把最新配置写回注册表,保证后续再读取时就是新的值✅

  2. 然后会把“旧值 → 新值”的映射封装成 DTO,通过钉钉、企业微信、邮件等渠道推送给开发 / 运维,做到即时可见📋

  3. 为实现日志留痕,会通过 log.info 统一打印所有关键字段的 “旧值->新值” 📋

如何保障线程池配置刷新的并发安全?

在动态线程池的配置刷新过程中,我们需要支持多个线程同时触发配置变更(比如配置中心推送受网络影响重复推送、多用户手动调用重复、定时校验等),但必须保证同一个线程池的参数刷新是串行、原子、安全的

在处理并发安全问题时,我考虑了两种方案:

  1. 对整个方法加锁 :实现最简单,能快速解决线程安全问题❌
  2. 仅对指定的线程池ID加锁 :粒度更细,性能更优✔️

1. 使用 synchronized (id)

1
2
3
synchronized (threadPoolId) {
// do refresh
}

存在的问题:

  • 如果 threadPoolId 是从对象字段获取的(例如 .getThreadPoolId()),多个对象即使返回相同内容,也可能是不同的String实例
  • JVM 会对不同的引用分配不同的锁 → 锁不生效 ,并发冲突依然会发生

2. 使用 ConcurrentHashMap<String, ReentrantLock>

1
2
3
4
5
6
7
8
private static final ConcurrentMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
ReentrantLock lock = lockMap.computeIfAbsent(threadPoolId, k -> new ReentrantLock());
lock.lock();
try {
// do refresh
} finally {
lock.unlock();
}

存在的问题:

  • 需要维护锁生命周期,比如什么时候释放锁内存?
  • 对于中小项目或轻量组件来说有点“”。

3. 使用 .intern() 基于内容值构建锁

1
2
3
4
String threadPoolId = remoteProperties.getThreadPoolId();
synchronized (threadPoolId.intern()) {
// do refresh
}

优势:

  • 任何内容相同的字符串,调用 .intern() 后都会返回同一个对象引用
  • 不依赖外部锁表,零依赖、线程安全
  • 锁粒度线程池 ID 为单位,天然支持并发刷新多个线程池

3.1 .intern() 原理

.intern()Java 提供的字符串常量池机制 的一部分

  • 它会将当前字符串加入 JVM 的字符串池(String Pool)中
  • 如果字符串池中已有相同内容的字符串,则直接返回那一份对象引用
  • 这就确保了同内容字符串→同引用对象 ,从而让 synchronized 生效