feat:(1.3.0-beta1): 优化告警

This commit is contained in:
opensnail 2024-12-24 22:24:43 +08:00
parent 7844946f63
commit b50a2402ab
5 changed files with 68 additions and 66 deletions

View File

@ -64,22 +64,21 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
// 通知场景
Set<Integer> notifyScene = new HashSet<>();
// 通知配置
Set<Long> notifyIds = new HashSet<>();
// 转换AlarmDTO 为了下面循环发送使用
Map<Set<Long>, List<A>> waitSendAlarmInfos = convertAlarmDTO(alarmInfos, notifyScene);
waitSendAlarmInfos.keySet().stream().map(i -> notifyIds.addAll(i)).collect(Collectors.toSet());
Map<Long, List<A>> waitSendAlarmInfos = convertAlarmDTO(alarmInfos, notifyScene);
Set<Long> notifyIds = waitSendAlarmInfos.keySet();
// 批量获取通知配置
Map<Set<Long>, List<NotifyConfigInfo>> notifyConfig = obtainNotifyConfig(notifyScene, notifyIds);
Map<Long, NotifyConfigInfo> notifyConfigMap = obtainNotifyConfig(notifyScene, notifyIds);
// 循环发送消息
waitSendAlarmInfos.forEach((key, list) -> {
List<NotifyConfigInfo> notifyConfigsList = notifyConfig.getOrDefault(key, Lists.newArrayList());
for (A alarmDTO : list) {
sendAlarm(notifyConfigsList, alarmDTO);
}
Optional.ofNullable(notifyConfigMap.get(key)).ifPresent(notifyConfig -> {
for (A alarmDTO : list) {
sendAlarm(notifyConfig, alarmDTO);
}
});
});
} catch (InterruptedException e) {
SnailJobLog.LOCAL.info("retry task fail dead letter alarm stop");
@ -89,12 +88,12 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
}
}
protected Map<Set<Long>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<Integer> notifyScene,
Set<Long> notifyIds) {
if (CollUtil.isEmpty(notifyIds)) {
protected Map<Long, NotifyConfigInfo> obtainNotifyConfig(Set<Integer> notifyScene,
Set<Long> notifyIds) {
if (CollUtil.isEmpty(notifyIds) || CollUtil.isEmpty(notifyScene)) {
return Maps.newHashMap();
}
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
@ -111,7 +110,7 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
.flatMap(config -> JsonUtil.parseList(config.getRecipientIds(), Long.class).stream())
.collect(Collectors.toSet());
List<NotifyRecipient> notifyRecipients = recipientMapper.selectBatchIds(recipientIds);
List<NotifyRecipient> notifyRecipients = recipientMapper.selectByIds(recipientIds);
Map<Long, NotifyRecipient> recipientMap = StreamUtils.toIdentityMap(notifyRecipients, NotifyRecipient::getId);
if (CollUtil.isEmpty(recipientIds)) {
@ -133,16 +132,13 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
});
notifyConfigInfo.setRecipientInfos(recipients);
}
Map<Set<Long>, List<NotifyConfigInfo>> notifyConfigInfo = new HashMap<>();
for (Long notifyId : notifyIds) {
notifyConfigInfo.put(Collections.singleton(notifyId), notifyConfigInfos);
}
return notifyConfigInfo;
return StreamUtils.toIdentityMap(notifyConfigInfos, NotifyConfigInfo::getId);
}
protected abstract List<SyetemTaskTypeEnum> getSystemTaskType();
protected abstract Map<Set<Long>, List<A>> convertAlarmDTO(List<A> alarmData, Set<Integer> notifyScene);
protected abstract Map<Long, List<A>> convertAlarmDTO(List<A> alarmData, Set<Integer> notifyScene);
protected abstract List<A> poll() throws InterruptedException;
@ -160,34 +156,33 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
public void close() {
}
protected void sendAlarm(List<NotifyConfigInfo> notifyConfigsList, A alarmDTO) {
for (final NotifyConfigInfo notifyConfig : notifyConfigsList) {
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
// 限流
RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold());
// 每秒发送rateLimiterThreshold个告警
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
continue;
}
}
// 重试通知阈值
if (Objects.nonNull(alarmDTO.getCount())
&& Objects.nonNull(notifyConfig.getNotifyThreshold())
&& alarmDTO.getCount() < notifyConfig.getNotifyThreshold()) {
continue;
}
for (final RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) {
if (Objects.isNull(recipientInfo)) {
continue;
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
context.setNotifyAttribute(recipientInfo.getNotifyAttribute());
Alarm<AlarmContext> alarm = SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType());
alarm.asyncSendMessage(context);
protected void sendAlarm(NotifyConfigInfo notifyConfig, A alarmDTO) {
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
// 限流
RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold());
// 每秒发送rateLimiterThreshold个告警
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
return;
}
}
// 重试通知阈值
if (Objects.nonNull(alarmDTO.getCount())
&& Objects.nonNull(notifyConfig.getNotifyThreshold())
&& alarmDTO.getCount() < notifyConfig.getNotifyThreshold()) {
return;
}
for (final RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) {
if (Objects.isNull(recipientInfo)) {
continue;
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
context.setNotifyAttribute(recipientInfo.getNotifyAttribute());
Alarm<AlarmContext> alarm = SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType());
alarm.asyncSendMessage(context);
}
}
protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) {

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.common.alarm;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
@ -27,16 +28,15 @@ public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends Abstr
private JobTaskBatchMapper jobTaskBatchMapper;
@Override
protected Map<Set<Long>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> jobAlarmInfoList, Set<Integer> notifyScene) {
protected Map<Long, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> jobAlarmInfoList, Set<Integer> notifyScene) {
Map<Set<Long>, List<JobAlarmInfo>> jobAlarmInfoMap = new HashMap<>();
jobAlarmInfoList.stream().forEach(i -> notifyScene.add(i.getNotifyScene()));
Map<Long, JobAlarmInfo> jobAlarmInfoGroupMap = jobAlarmInfoList.stream().collect(Collectors.toMap(i -> i.getId(), Function.identity()));
Map<Long, List<JobAlarmInfo>> jobAlarmInfoMap = new HashMap<>();
jobAlarmInfoList.forEach(i -> notifyScene.add(i.getNotifyScene()));
Map<Long, JobAlarmInfo> jobAlarmInfoGroupMap = StreamUtils.toIdentityMap(jobAlarmInfoList, JobAlarmInfo::getId);
// 查询数据库
QueryWrapper<JobTaskBatch> wrapper = new QueryWrapper<JobTaskBatch>()
.in("batch.id", jobAlarmInfoList.stream().map(i -> i.getId()).collect(Collectors.toSet()))
.in("batch.id", StreamUtils.toSet(jobAlarmInfoList, JobAlarmInfo::getId))
.eq("batch.deleted", 0);
List<JobBatchResponseDO> jobBatchResponseDOList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
@ -47,7 +47,10 @@ public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends Abstr
JobAlarmInfo jobAlarmInfo = AlarmInfoConverter.INSTANCE.toJobAlarmInfo(jobBatchResponseDO);
JobAlarmInfo alarmInfo = jobAlarmInfoGroupMap.get(jobBatchResponseDO.getId());
jobAlarmInfo.setReason(alarmInfo.getReason());
jobAlarmInfoMap.put(Collections.singleton(jobNotifyId), Lists.newArrayList(jobAlarmInfo));
List<JobAlarmInfo> jobAlarmInfos = jobAlarmInfoMap.getOrDefault(jobNotifyId, Lists.newArrayList());
jobAlarmInfos.add(jobAlarmInfo);
jobAlarmInfoMap.put(jobNotifyId, jobAlarmInfos);
}
}

View File

@ -20,9 +20,9 @@ import java.util.stream.Collectors;
public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, RetryAlarmInfo> {
@Override
protected Map<Set<Long>, List<RetryAlarmInfo>> convertAlarmDTO(List<RetryAlarmInfo> retryAlarmInfoList, Set<Integer> notifyScene) {
protected Map<Long, List<RetryAlarmInfo>> convertAlarmDTO(List<RetryAlarmInfo> retryAlarmInfoList, Set<Integer> notifyScene) {
Map<Set<Long>, List<RetryAlarmInfo>> retryAlarmInfoMap = new HashMap<>();
Map<Long, List<RetryAlarmInfo>> retryAlarmInfoMap = new HashMap<>();
// 重试任务查询场景告警通知
Set<String> groupNames = new HashSet<>(), sceneNames = new HashSet<>(), namespaceIds = new HashSet<>();
for (RetryAlarmInfo retryAlarmInfo : retryAlarmInfoList) {
@ -33,10 +33,10 @@ public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends Abs
}
// 按组名场景名命名空间分组
Map<ImmutableTriple<String, String, String>, RetrySceneConfig> retrySceneConfigMap = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneNameList(
groupNames,
sceneNames,
namespaceIds).stream().collect(Collectors.toMap(i -> ImmutableTriple.of(
Map<ImmutableTriple<String, String, String>, RetrySceneConfig> retrySceneConfigMap = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneNameList(
groupNames, sceneNames, namespaceIds)
.stream().collect(Collectors.toMap(i -> ImmutableTriple.of(
i.getGroupName(),
i.getSceneName(),
i.getNamespaceId()),
@ -51,8 +51,9 @@ public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends Abs
Set<Long> retryNotifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
for (Long retryNotifyId : retryNotifyIds) {
retryAlarmInfoMap.put(Collections.singleton(retryNotifyId), Lists.newArrayList(retryAlarmInfo));
List<RetryAlarmInfo> retryAlarmInfos = retryAlarmInfoMap.getOrDefault(retryNotifyId, Lists.newArrayList());
retryAlarmInfos.add(retryAlarmInfo);
retryAlarmInfoMap.put(retryNotifyId, retryAlarmInfos);
}
}

View File

@ -27,10 +27,10 @@ public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends
private WorkflowTaskBatchMapper workflowTaskBatchMapper;
@Override
protected Map<Set<Long>, List<WorkflowAlarmInfo>> convertAlarmDTO(List<WorkflowAlarmInfo> workflowAlarmInfoList, Set<Integer> notifyScene) {
protected Map<Long, List<WorkflowAlarmInfo>> convertAlarmDTO(List<WorkflowAlarmInfo> workflowAlarmInfoList, Set<Integer> notifyScene) {
Map<Set<Long>, List<WorkflowAlarmInfo>> workflowAlarmInfoMap = new HashMap<>();
workflowAlarmInfoList.stream().forEach(i -> notifyScene.add(i.getNotifyScene()));
Map<Long, List<WorkflowAlarmInfo>> workflowAlarmInfoMap = new HashMap<>();
workflowAlarmInfoList.forEach(i -> notifyScene.add(i.getNotifyScene()));
Map<Long, WorkflowAlarmInfo> workflowAlarmInfoGroupMap = workflowAlarmInfoList.stream().collect(Collectors.toMap(i -> i.getId(), Function.identity()));
@ -46,7 +46,10 @@ public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends
WorkflowAlarmInfo workflowAlarmInfo = AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfo(workflowBatchResponseDO);
WorkflowAlarmInfo alarmInfo = workflowAlarmInfoGroupMap.get(workflowAlarmInfo.getId());
workflowAlarmInfo.setReason(alarmInfo.getReason());
workflowAlarmInfoMap.put(Collections.singleton(workflowNotifyId), Lists.newArrayList(workflowAlarmInfo));
List<WorkflowAlarmInfo> workflowAlarmInfos = workflowAlarmInfoMap.getOrDefault(workflowNotifyId, Lists.newArrayList());
workflowAlarmInfos.add(workflowAlarmInfo);
workflowAlarmInfoMap.put(workflowNotifyId ,workflowAlarmInfos);
}
}

View File

@ -161,7 +161,7 @@ public class WorkflowBatchHandler {
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(workflowTaskBatchId)
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason("只要叶子节点不是无需处理的都是失败")
.reason("任务执行失败 jobTaskBatchId:" + jobTaskBatch.getId())
.build()));
}
}