diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java index 26b59ffbc..902d6e1f3 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java @@ -64,22 +64,21 @@ public abstract class AbstractAlarm notifyScene = new HashSet<>(); - // 通知配置 - Set notifyIds = new HashSet<>(); // 转换AlarmDTO 为了下面循环发送使用 - Map, List> waitSendAlarmInfos = convertAlarmDTO(alarmInfos, notifyScene); - waitSendAlarmInfos.keySet().stream().map(i -> notifyIds.addAll(i)).collect(Collectors.toSet()); + Map> waitSendAlarmInfos = convertAlarmDTO(alarmInfos, notifyScene); + Set notifyIds = waitSendAlarmInfos.keySet(); // 批量获取通知配置 - Map, List> notifyConfig = obtainNotifyConfig(notifyScene, notifyIds); + Map notifyConfigMap = obtainNotifyConfig(notifyScene, notifyIds); // 循环发送消息 waitSendAlarmInfos.forEach((key, list) -> { - List notifyConfigsList = notifyConfig.getOrDefault(key, Lists.newArrayList()); - for (A alarmDTO : list) { - sendAlarm(notifyConfigsList, alarmDTO); - } + Optional.ofNullable(notifyConfigMap.get(key)).ifPresent(notifyConfig -> { + for (A alarmDTO : list) { + sendAlarm(notifyConfig, alarmDTO); + } + }); }); } catch (InterruptedException e) { SnailJobLog.LOCAL.info("retry task fail dead letter alarm stop"); @@ -89,12 +88,12 @@ public abstract class AbstractAlarm, List> obtainNotifyConfig(Set notifyScene, - Set notifyIds) { - - if (CollUtil.isEmpty(notifyIds)) { + protected Map obtainNotifyConfig(Set notifyScene, + Set notifyIds) { + if (CollUtil.isEmpty(notifyIds) || CollUtil.isEmpty(notifyScene)) { return Maps.newHashMap(); } + // 批量获取所需的通知配置 List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( new LambdaQueryWrapper() @@ -111,7 +110,7 @@ public abstract class AbstractAlarm JsonUtil.parseList(config.getRecipientIds(), Long.class).stream()) .collect(Collectors.toSet()); - List notifyRecipients = recipientMapper.selectBatchIds(recipientIds); + List notifyRecipients = recipientMapper.selectByIds(recipientIds); Map recipientMap = StreamUtils.toIdentityMap(notifyRecipients, NotifyRecipient::getId); if (CollUtil.isEmpty(recipientIds)) { @@ -133,16 +132,13 @@ public abstract class AbstractAlarm, List> notifyConfigInfo = new HashMap<>(); - for (Long notifyId : notifyIds) { - notifyConfigInfo.put(Collections.singleton(notifyId), notifyConfigInfos); - } - return notifyConfigInfo; + + return StreamUtils.toIdentityMap(notifyConfigInfos, NotifyConfigInfo::getId); } protected abstract List getSystemTaskType(); - protected abstract Map, List> convertAlarmDTO(List alarmData, Set notifyScene); + protected abstract Map> convertAlarmDTO(List alarmData, Set notifyScene); protected abstract List poll() throws InterruptedException; @@ -160,34 +156,33 @@ public abstract class AbstractAlarm notifyConfigsList, A alarmDTO) { - for (final NotifyConfigInfo notifyConfig : notifyConfigsList) { - if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) { - // 限流 - RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold()); - // 每秒发送rateLimiterThreshold个告警 - if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { - continue; - } - } - - // 重试通知阈值 - if (Objects.nonNull(alarmDTO.getCount()) - && Objects.nonNull(notifyConfig.getNotifyThreshold()) - && alarmDTO.getCount() < notifyConfig.getNotifyThreshold()) { - continue; - } - - for (final RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) { - if (Objects.isNull(recipientInfo)) { - continue; - } - AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig); - context.setNotifyAttribute(recipientInfo.getNotifyAttribute()); - Alarm alarm = SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType()); - alarm.asyncSendMessage(context); + protected void sendAlarm(NotifyConfigInfo notifyConfig, A alarmDTO) { + if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) { + // 限流 + RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold()); + // 每秒发送rateLimiterThreshold个告警 + if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + return; } } + + // 重试通知阈值 + if (Objects.nonNull(alarmDTO.getCount()) + && Objects.nonNull(notifyConfig.getNotifyThreshold()) + && alarmDTO.getCount() < notifyConfig.getNotifyThreshold()) { + return; + } + + for (final RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) { + if (Objects.isNull(recipientInfo)) { + continue; + } + AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig); + context.setNotifyAttribute(recipientInfo.getNotifyAttribute()); + Alarm alarm = SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType()); + alarm.asyncSendMessage(context); + } + } protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) { diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java index 2f3d02902..bc2438e23 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.common.alarm; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.server.common.AlarmInfoConverter; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; @@ -27,16 +28,15 @@ public abstract class AbstractJobAlarm extends Abstr private JobTaskBatchMapper jobTaskBatchMapper; @Override - protected Map, List> convertAlarmDTO(List jobAlarmInfoList, Set notifyScene) { + protected Map> convertAlarmDTO(List jobAlarmInfoList, Set notifyScene) { - Map, List> jobAlarmInfoMap = new HashMap<>(); - jobAlarmInfoList.stream().forEach(i -> notifyScene.add(i.getNotifyScene())); - - Map jobAlarmInfoGroupMap = jobAlarmInfoList.stream().collect(Collectors.toMap(i -> i.getId(), Function.identity())); + Map> jobAlarmInfoMap = new HashMap<>(); + jobAlarmInfoList.forEach(i -> notifyScene.add(i.getNotifyScene())); + Map jobAlarmInfoGroupMap = StreamUtils.toIdentityMap(jobAlarmInfoList, JobAlarmInfo::getId); // 查询数据库 QueryWrapper wrapper = new QueryWrapper() - .in("batch.id", jobAlarmInfoList.stream().map(i -> i.getId()).collect(Collectors.toSet())) + .in("batch.id", StreamUtils.toSet(jobAlarmInfoList, JobAlarmInfo::getId)) .eq("batch.deleted", 0); List jobBatchResponseDOList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper); @@ -47,7 +47,10 @@ public abstract class AbstractJobAlarm extends Abstr JobAlarmInfo jobAlarmInfo = AlarmInfoConverter.INSTANCE.toJobAlarmInfo(jobBatchResponseDO); JobAlarmInfo alarmInfo = jobAlarmInfoGroupMap.get(jobBatchResponseDO.getId()); jobAlarmInfo.setReason(alarmInfo.getReason()); - jobAlarmInfoMap.put(Collections.singleton(jobNotifyId), Lists.newArrayList(jobAlarmInfo)); + + List jobAlarmInfos = jobAlarmInfoMap.getOrDefault(jobNotifyId, Lists.newArrayList()); + jobAlarmInfos.add(jobAlarmInfo); + jobAlarmInfoMap.put(jobNotifyId, jobAlarmInfos); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java index c4c1f87ee..37dc9f2ef 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java @@ -20,9 +20,9 @@ import java.util.stream.Collectors; public abstract class AbstractRetryAlarm extends AbstractAlarm { @Override - protected Map, List> convertAlarmDTO(List retryAlarmInfoList, Set notifyScene) { + protected Map> convertAlarmDTO(List retryAlarmInfoList, Set notifyScene) { - Map, List> retryAlarmInfoMap = new HashMap<>(); + Map> retryAlarmInfoMap = new HashMap<>(); // 重试任务查询场景告警通知 Set groupNames = new HashSet<>(), sceneNames = new HashSet<>(), namespaceIds = new HashSet<>(); for (RetryAlarmInfo retryAlarmInfo : retryAlarmInfoList) { @@ -33,10 +33,10 @@ public abstract class AbstractRetryAlarm extends Abs } // 按组名、场景名、命名空间分组 - Map, RetrySceneConfig> retrySceneConfigMap = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneNameList( - groupNames, - sceneNames, - namespaceIds).stream().collect(Collectors.toMap(i -> ImmutableTriple.of( + Map, RetrySceneConfig> retrySceneConfigMap = accessTemplate.getSceneConfigAccess() + .getSceneConfigByGroupNameAndSceneNameList( + groupNames, sceneNames, namespaceIds) + .stream().collect(Collectors.toMap(i -> ImmutableTriple.of( i.getGroupName(), i.getSceneName(), i.getNamespaceId()), @@ -51,8 +51,9 @@ public abstract class AbstractRetryAlarm extends Abs Set retryNotifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class)); for (Long retryNotifyId : retryNotifyIds) { - - retryAlarmInfoMap.put(Collections.singleton(retryNotifyId), Lists.newArrayList(retryAlarmInfo)); + List retryAlarmInfos = retryAlarmInfoMap.getOrDefault(retryNotifyId, Lists.newArrayList()); + retryAlarmInfos.add(retryAlarmInfo); + retryAlarmInfoMap.put(retryNotifyId, retryAlarmInfos); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java index 7040b35c4..30da6d8e4 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java @@ -27,10 +27,10 @@ public abstract class AbstractWorkflowAlarm extends private WorkflowTaskBatchMapper workflowTaskBatchMapper; @Override - protected Map, List> convertAlarmDTO(List workflowAlarmInfoList, Set notifyScene) { + protected Map> convertAlarmDTO(List workflowAlarmInfoList, Set notifyScene) { - Map, List> workflowAlarmInfoMap = new HashMap<>(); - workflowAlarmInfoList.stream().forEach(i -> notifyScene.add(i.getNotifyScene())); + Map> workflowAlarmInfoMap = new HashMap<>(); + workflowAlarmInfoList.forEach(i -> notifyScene.add(i.getNotifyScene())); Map workflowAlarmInfoGroupMap = workflowAlarmInfoList.stream().collect(Collectors.toMap(i -> i.getId(), Function.identity())); @@ -46,7 +46,10 @@ public abstract class AbstractWorkflowAlarm extends WorkflowAlarmInfo workflowAlarmInfo = AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfo(workflowBatchResponseDO); WorkflowAlarmInfo alarmInfo = workflowAlarmInfoGroupMap.get(workflowAlarmInfo.getId()); workflowAlarmInfo.setReason(alarmInfo.getReason()); - workflowAlarmInfoMap.put(Collections.singleton(workflowNotifyId), Lists.newArrayList(workflowAlarmInfo)); + + List workflowAlarmInfos = workflowAlarmInfoMap.getOrDefault(workflowNotifyId, Lists.newArrayList()); + workflowAlarmInfos.add(workflowAlarmInfo); + workflowAlarmInfoMap.put(workflowNotifyId ,workflowAlarmInfos); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index 2359242e8..0f23412ea 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -161,7 +161,7 @@ public class WorkflowBatchHandler { SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder() .workflowTaskBatchId(workflowTaskBatchId) .notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene()) - .reason("只要叶子节点不是无需处理的都是失败") + .reason("任务执行失败 jobTaskBatchId:" + jobTaskBatch.getId()) .build())); } }