告警频率限制方法

前言

当系统出现异常时,往往会在短时间内产生 大量重复告警,形成所谓的”告警风暴“。这不仅会对开发人员造成信息过载,还可能导致真正重要的告警被淹没在噪音中

为了解决这个问题,需要设计一套实用的告警限流机制 ,既要保证重要告警能够及时送达,又要避免无意义的重复通知

告警风暴问题分析

在线程池监控场景中,告警风暴通常出现在以下情况:

  • 队列积压告警 :当任务提交速度超过处理能力时,队列长度持续增长,可能每秒触发多次告警
  • 拒绝策略告警 :线程池达到最大容量后,每个被拒绝的任务都可能触发一次告警
  • 线程数异常告警 :活跃线程数超过阈值时,监控系统可能频繁发送通知

告警风暴会带来以下负面影响:

  • 信息过载 :运维人员被大量重复信息淹没,难以快速定位问题
  • 资源浪费 :频繁的网络请求消耗系统资源,影响正常业务
  • 告警疲劳 :过多无效告警导致运维人员对告警系统失去信任
  • 成本增加 :第三方通知服务(如钉钉、企业微信)按调用次数收费

解决方案核心思想是:在保证告警时效性的前提下,通过时间窗口限流算法控制同类型告警的发送频率

具体策略包括:

  • 按线程池 ID 和告警类型进行分组 限流
  • 基于时间窗口的 滑动限流算法
  • 可配置的 告警间隔时间

简单工厂模式在通知系统中的应用

如何支持多种不同的通知渠道(钉钉、企业微信、邮件等),同时保持代码的可维护性?—— 简单工厂模式

1. 通知服务接口设计

首先定义一个统一的通知服务接口,抽象出所有通知渠道的共同行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface NotifierService {
/**
* 发送线程池配置变更通知
*/
void sendChangeMessage(ThreadPoolConfigChangeDTO configChange);
/**
* 发送 Web 线程池配置变更通知
*/
void sendWebChangeMessage(WebThreadPoolConfigChangeDTO configChange);
/**
* 发送线程池报警通知
*/
void sendAlarmMessage(ThreadPoolAlarmNotifyDTO alarm);
}

2. 简单工厂实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class NotifierDispatcher implements NotifierService{
private static final Map<String,NotifierService> NOTIFIER_SERVICE_MAP = new HashMap<>();
static {
// 在简单工厂中注册不同的通知实现
NOTIFIER_SERVICE_MAP.put("DING", new DingTalkMessageService());
/*
后续可以轻松扩展其他通知渠道
NOTIFIER_SERVICE_MAP.put("WECHAT", new WeChatMessageService());
NOTIFIER_SERVICE_MAP.put("EMAIL", new EmailMessageService());
*/
}
@Override
public void sendChangeMessage(ThreadPoolConfigChangeDTO configChange) {
getNotifierService().ifPresent(service -> service.sendChangeMessage(configChange));
}
@Override
public void sendWebChangeMessage(WebThreadPoolConfigChangeDTO configChange) {
getNotifierService().ifPresent(service -> service.sendWebChangeMessage(configChange));
}
@Override
public void sendAlarmMessage(ThreadPoolAlarmNotifyDTO alarm) {
getNotifierService().ifPresent(service->{
// 频率检查
boolean allowSend = AlarmRateLimiter.allowAlarm(
alarm.getThreadPoolId(),
alarm.getAlarmType(),
alarm.getInterval()
);
// 满足频率发送告警
if (allowSend) {
service.sendAlarmMessage(alarm.resolve());
}
});
}
/**
* 根据配置获取对应的通知服务实现
* 简单工厂模式的核心方法
*/
private Optional<NotifierService> getNotifierService(){
// 包装入口,不为 null 用 Optional 包装成盒子
return Optional.ofNullable(BootstrapConfigProperties.getInstance().getNotifyPlatforms())
// 打开盒子,调用 getPlatform
.map(BootstrapConfigProperties.NotifyPlatformsConfig::getPlatform)
// 拿到字符串(如“钉钉”),去 Map 中找对象
.map(platform->NOTIFIER_SERVICE_MAP.get(platform));
}
}

4. 简单工厂模式的优势

通过简单工厂模式的应用,我们的通知系统具备了以下优势:

  • 封装创建逻辑 :客户端无需关心具体通知服务的创建过程
  • 可扩展性 :新增通知渠道只需实现 NotifierService 接口并注册到工厂映射中
  • 可配置性 :通过配置文件动态切换通知渠道,无需修改代码
  • 单一职责 :工厂类负责创建,具体实现类负责业务逻辑

时间窗口限流算法设计

在众多限流算法中,我们选择了固定时间窗口算法 作为告警限流的核心机制。虽然这种算法在窗口边界可能存在 突发流量问题,但对于告警场景来说,其简单性和可预测性更为重要

固定时间窗口限流的核心思想是:在固定的时间窗口内,同一类型的告警最多只能发送一次

具体实现逻辑:

  1. 为每个”线程池ID + 告警类型“的组合维护一个时间戳记录
  2. 当新告警到达时,检查距离上次发送是否超过配置的时间间隔
  3. 如果超过间隔时间,允许发送并更新时间戳;否则拒绝发送

告警限流机制实现

巧妙之处在于使用了 ConcurrentHashMap.compute() 方法,它提供了 原子性 的”检查-更新“操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AlarmRateLimiter {
private static final Map<String, Long> ALARM_RECORD = new ConcurrentHashMap<>();
/**
* 检查是否允许发送报警
*/
public static boolean allowAlarm(String threadPoolId, String alarmType, int intervalMinutes) {
String key = buildKey(threadPoolId, alarmType);
long currentTime = System.currentTimeMillis();
// compute 方法会原子性地执行 "计算新值并替换旧值" 的逻辑,避免多线程下的竞态条件
return ALARM_RECORD.compute(key, (k, lastTime) -> {
if (lastTime == null || (currentTime - lastTime) > intervalMinutes * 60 * 1000L) {
return currentTime; // 更新时间为当前时间
}
return lastTime; // 保持原时间
}) == currentTime; // 返回值等于当前时间说明允许发送
}

private static String buildKey(String threadPoolId, String alarmType) {
return threadPoolId + "|" + alarmType;
}
}

内存优化(可忽略)

在长期运行的系统中,ALARM_RECORD 可能会积累历史记录。虽然在告警场景下数据量通常不大,但我们仍可以考虑以下优化策略:

1
2
3
4
5
6
7
8
9
10
11
// 定期清理过期记录
private static final ScheduledExecutorService CLEANER =
Executors.newSingleThreadScheduledExecutor();

static {
// 每小时清理一次超过24小时的记录
CLEANER.scheduleAtFixedRate(() -> {
long expireTime = System.currentTimeMillis() - 24 * 60 * 60 * 1000L;
ALARM_RECORD.entrySet().removeIf(entry -> entry.getValue() < expireTime);
}, 1, 1, TimeUnit.HOURS);
}

函数编程优化无效告警性能

1. 当前问题

功能肯定是没有问题,但是还存在可优化空间,比如提个问题:线程池的很多状态 API(如 getActiveCount())是 有锁的,但 只要满足了告警条件,即使告警被拦截了,还是会获取所有线程池的全量数据,造成不必要的资源浪费

我们希望 只有在真正需要发送告警的那一刻,才去调用相关 API,避免不必要的锁竞争和内存分配

2. Supplier 函数延迟构建

延迟构建告警对象(Supplier模式)SupplierJDK8 推出的内置函数式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class ThreadPoolAlarmNotifyDTO {
// ......
@ToString.Exclude
private transient Supplier<ThreadPoolAlarmNotifyDTO> supplier;

public ThreadPoolAlarmNotifyDTO resolve() {
return supplier != null ? supplier.get() : this;
}
}

为了防止 supplier 被序列化或者 toString 打印,我们通过 transient 关键字和 @ToString.Exclude 注解进行排除

在告警检查中,先初始化必须的几个字段,然后将剩余字段通过 Supplier 函数进行延迟加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   // 1. 构建告警 DTO
ThreadPoolAlarmNotifyDTO alarm = ThreadPoolAlarmNotifyDTO.builder()
.alarmType(alarmType)
.threadPoolId(threadPoolId)
// 告警间隔,用于告警合并或抑制
.interval(properties.getNotify().getInterval())
.build();

// 2. 使用 Supplier 延迟加载告警详情,只有在发送告警时才会执行
alarm.setSupplier(() -> {
try {
// 获取当前服务器IP地址,用于定位告警来源
alarm.setIdentify(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
log.warn("Error in obtaining HostAddress for alarm", e);
}
// alarm......
return alarm;
});