From 11c57a96860919e468e0b6414a4b0fe3b19d9b56 Mon Sep 17 00:00:00 2001 From: wodeyangzipingpingwuqi Date: Sat, 7 Dec 2024 17:28:07 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.3.0-beta1):=201=E3=80=81=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=202=E3=80=81=E6=96=B0=E5=A2=9E=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1=E6=97=A0=E8=8A=82=E7=82=B9=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/JobExecutorFutureCallback.java | 5 +- .../common/core/enums/JobNotifySceneEnum.java | 1 + .../core/enums/RetryNotifySceneEnum.java | 3 +- .../dataobject/JobBatchResponseDO.java | 1 + .../datasource/persistence/po/Job.java | 5 ++ .../persistence/po/NotifyConfig.java | 5 ++ .../template/mapper/JobTaskBatchMapper.xml | 3 +- .../server/common/alarm/AbstractAlarm.java | 27 +++--- .../server/common/alarm/AbstractJobAlarm.java | 16 ++-- .../common/alarm/AbstractRetryAlarm.java | 27 +++--- .../common/alarm/AbstractWorkflowAlarm.java | 21 ++--- .../convert/JobResponseVOConverter.java | 15 +++- .../server/common/dto/JobAlarmInfo.java | 5 ++ .../server/common/dto/RetryAlarmInfo.java | 3 + .../server/common/dto/WorkflowAlarmInfo.java | 5 ++ .../server/common/vo/JobResponseVO.java | 6 ++ .../support/dispatch/JobExecutorActor.java | 3 +- .../batch/JobTaskBatchGenerator.java | 7 +- .../controller/NotifyConfigController.java | 7 ++ .../web/model/request/JobRequestVO.java | 7 ++ .../model/request/NotifyConfigRequestVO.java | 6 +- .../web/service/NotifyConfigService.java | 3 + .../web/service/convert/JobConverter.java | 34 +++++++- .../web/service/impl/JobServiceImpl.java | 4 +- .../service/impl/NotifyConfigServiceImpl.java | 84 +++---------------- 25 files changed, 167 insertions(+), 136 deletions(-) diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java index 922c5443c..f6b91067d 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java @@ -78,8 +78,7 @@ public class JobExecutorFutureCallback implements FutureCallback initLogContext(); // 上报执行成功 - SnailJobLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), - JsonUtil.toJsonString(result)); + SnailJobLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result)); if (Objects.isNull(result)) { result = ExecuteResult.success(); @@ -197,7 +196,5 @@ public class JobExecutorFutureCallback implements FutureCallback } catch (Exception e1) { SnailJobLog.LOCAL.error("Client failed to send component exception alert.", e1); } - } - } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java index e7e5fcf9a..8fa95adf4 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java @@ -16,6 +16,7 @@ public enum JobNotifySceneEnum { /********************************Job****************************************/ JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER), JOB_CLIENT_ERROR(2, "客户端执行失败", NodeTypeEnum.CLIENT), + JOB_NO_CLIENT_NODES_ERROR(3, "没有可执行的客户端节点", NodeTypeEnum.CLIENT), /********************************Workflow****************************************/ WORKFLOW_TASK_ERROR(100, "Workflow任务执行失败", NodeTypeEnum.SERVER); diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java index 71a9bbb83..a5736423e 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java @@ -24,8 +24,9 @@ public enum RetryNotifySceneEnum { RETRY_TASK_REACH_THRESHOLD(5, "任务重试次数超过阈值", NodeTypeEnum.SERVER), - RETRY_TASK_ENTER_DEAD_LETTER(6, "任务重试失败进入死信队列", NodeTypeEnum.SERVER); + RETRY_TASK_ENTER_DEAD_LETTER(6, "任务重试失败进入死信队列", NodeTypeEnum.SERVER), + RETRY_NO_CLIENT_NODES_ERROR(7, "没有可执行的客户端节点", NodeTypeEnum.CLIENT); /** * 通知场景 diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/JobBatchResponseDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/JobBatchResponseDO.java index 152652ca0..a8ac7c11b 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/JobBatchResponseDO.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/JobBatchResponseDO.java @@ -76,5 +76,6 @@ public class JobBatchResponseDO { private String argsStr; + private String notifyIds; } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Job.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Job.java index d0eb1c908..805901e27 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Job.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Job.java @@ -142,4 +142,9 @@ public class Job extends CreateUpdateDt { */ private Integer deleted; + /** + * 通知告警场景配置id列表 + */ + private String notifyIds; + } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java index 51e66a57a..8cc4e3e86 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java @@ -34,6 +34,11 @@ public class NotifyConfig extends CreateUpdateDt { private Integer notifyThreshold; + /** + * 通知告警场景名 + */ + private String notifyName; + private Integer notifyScene; private Integer rateLimiterStatus; diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskBatchMapper.xml b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskBatchMapper.xml index ccd87fd5b..fee0ec4e6 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskBatchMapper.xml +++ b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskBatchMapper.xml @@ -59,7 +59,8 @@ job.block_strategy, job.trigger_type, job.executor_info, - job.args_str + job.args_str, + job.notify_ids FROM sj_job_task_batch batch JOIN sj_job job ON batch.job_id = job.id ${ew.customSqlSegment} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java index c85edf358..fd7290c78 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.common.alarm; +package com.aizuda.snailjob.client.common.annotation; import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.alarm.Alarm; @@ -69,15 +69,13 @@ public abstract class AbstractAlarm groupNames = new HashSet<>(); // 获取所有的场景名称 - Set businessIds = new HashSet<>(); + Set notifyIds = new HashSet<>(); // 转换AlarmDTO 为了下面循环发送使用 - Map, List> waitSendAlarmInfos = convertAlarmDTO( - alarmInfos, namespaceIds, groupNames, businessIds); + Map>, List> waitSendAlarmInfos = convertAlarmDTO(alarmInfos, namespaceIds, groupNames, notifyIds); // 批量获取通知配置 - Map, List> notifyConfig = obtainNotifyConfig(namespaceIds, - groupNames, businessIds); + Map>, List> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, notifyIds); // 循环发送消息 waitSendAlarmInfos.forEach((key, list) -> { @@ -95,18 +93,17 @@ public abstract class AbstractAlarm, List> obtainNotifyConfig(Set namespaceIds, - Set groupNames, Set businessIds) { + protected Map>, List> obtainNotifyConfig(Set namespaceIds, + Set groupNames, Set notifyIds) { // 批量获取所需的通知配置 List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( new LambdaQueryWrapper() .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) .in(NotifyConfig::getSystemTaskType, StreamUtils.toList(getSystemTaskType(), SyetemTaskTypeEnum::getType)) - .eq(NotifyConfig::getNotifyScene, getNotifyScene()) .in(NotifyConfig::getNamespaceId, namespaceIds) .in(NotifyConfig::getGroupName, groupNames) - .in(NotifyConfig::getBusinessId, businessIds) + .in(NotifyConfig::getId, notifyIds) ); if (CollUtil.isEmpty(notifyConfigs)) { return Maps.newHashMap(); @@ -139,17 +136,15 @@ public abstract class AbstractAlarm getSystemTaskType(); - protected abstract Map, List> convertAlarmDTO(List alarmData, - Set namespaceIds, Set groupNames, Set sceneNames); + protected abstract Map>, List> convertAlarmDTO(List alarmData, + Set namespaceIds, Set groupNames, Set notifyIds); protected abstract List poll() throws InterruptedException; @@ -212,5 +207,3 @@ public abstract class AbstractAlarm extends AbstractAlarm { @Override - protected Map, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set jobIds) { + protected Map>, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set notifyIds) { + return StreamUtils.groupByKey(alarmInfos, alarmInfo -> { String namespaceId = alarmInfo.getNamespaceId(); String groupName = alarmInfo.getGroupName(); - String jobId = String.valueOf(alarmInfo.getJobId()); + HashSet notifyIdsSet = Objects.isNull(alarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(alarmInfo.getNotifyIds(), Long.class)); + namespaceIds.add(namespaceId); groupNames.add(groupName); - jobIds.add(jobId); - return ImmutableTriple.of(namespaceId, groupName, jobId); + notifyIds.addAll(notifyIdsSet); + return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet); }); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java index af9be5207..dd9bd1eee 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java @@ -1,14 +1,14 @@ package com.aizuda.snailjob.server.common.alarm; +import com.aizuda.snailjob.client.common.annotation.AbstractAlarm; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; import org.springframework.context.ApplicationEvent; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * @author xiaowoniu @@ -17,24 +17,21 @@ import java.util.Set; */ public abstract class AbstractRetryAlarm extends AbstractAlarm { @Override - protected Map, List> convertAlarmDTO( - List alarmDataList, + protected Map>, List> convertAlarmDTO( + List retryAlarmInfoList, Set namespaceIds, Set groupNames, - Set sceneNames) { + Set notifyIds) { - return StreamUtils.groupByKey(alarmDataList, alarmData -> { - - String namespaceId = alarmData.getNamespaceId(); - String groupName = alarmData.getGroupName(); - String sceneName = alarmData.getSceneName(); + return StreamUtils.groupByKey(retryAlarmInfoList, retryAlarmInfo -> { + String namespaceId = retryAlarmInfo.getNamespaceId(); + String groupName = retryAlarmInfo.getGroupName(); + HashSet notifyIdsSet = Objects.isNull(retryAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retryAlarmInfo.getNotifyIds(), Long.class)); namespaceIds.add(namespaceId); groupNames.add(groupName); - sceneNames.add(sceneName); - - return ImmutableTriple.of(namespaceId, groupName, sceneName); + notifyIds.addAll(notifyIdsSet); + return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet); }); } - } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java index 8970f472f..a00886099 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java @@ -1,13 +1,13 @@ package com.aizuda.snailjob.server.common.alarm; +import com.aizuda.snailjob.client.common.annotation.AbstractAlarm; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; import org.springframework.context.ApplicationEvent; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; /** @@ -18,16 +18,17 @@ import java.util.stream.Collectors; public abstract class AbstractWorkflowAlarm extends AbstractAlarm { @Override - protected Map, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set jobIds) { + protected Map>, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set notifyIds) { + + return alarmInfos.stream().collect(Collectors.groupingBy(workflowAlarmInfo -> { + String namespaceId = workflowAlarmInfo.getNamespaceId(); + String groupName = workflowAlarmInfo.getGroupName(); + HashSet notifyIdsSet = Objects.isNull(workflowAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowAlarmInfo.getNotifyIds(), Long.class)); - return alarmInfos.stream().collect(Collectors.groupingBy(i -> { - String namespaceId = i.getNamespaceId(); - String groupName = i.getGroupName(); - String jobId = String.valueOf(i.getWorkflowId()); namespaceIds.add(namespaceId); groupNames.add(groupName); - jobIds.add(jobId); - return ImmutableTriple.of(namespaceId, groupName, jobId); + notifyIds.addAll(notifyIdsSet); + return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet); })); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobResponseVOConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobResponseVOConverter.java index 16c2e1d02..c1d3f60ee 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobResponseVOConverter.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobResponseVOConverter.java @@ -1,5 +1,7 @@ package com.aizuda.snailjob.server.common.convert; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.vo.JobResponseVO; import com.aizuda.snailjob.template.datasource.persistence.po.Job; @@ -9,8 +11,10 @@ import org.mapstruct.Mappings; import org.mapstruct.factory.Mappers; import java.time.LocalDateTime; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; /** * @author opensnail @@ -28,7 +32,8 @@ public interface JobResponseVOConverter { List convertList(List jobs); @Mappings({ - @Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))") + @Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))"), + @Mapping(target = "notifyIds", expression = "java(JobResponseVOConverter.toJobNotifyIds(job.getNotifyIds()))") }) JobResponseVO convert(Job job); @@ -39,4 +44,12 @@ public interface JobResponseVOConverter { return DateUtils.toLocalDateTime(nextTriggerAt); } + + static Set toJobNotifyIds(String notifyIds) { + if (StrUtil.isBlank(notifyIds)) { + return new HashSet<>(); + } + + return new HashSet<>(JsonUtil.parseList(notifyIds, Long.class)); + } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java index ad87dc149..98f50062a 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java @@ -37,4 +37,9 @@ public class JobAlarmInfo extends AlarmInfo { */ private Integer operationReason; + /** + * 通知告警场景 + */ + private String notifyIds; + } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/RetryAlarmInfo.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/RetryAlarmInfo.java index 329df2fa4..8b3261359 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/RetryAlarmInfo.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/RetryAlarmInfo.java @@ -33,4 +33,7 @@ public class RetryAlarmInfo extends AlarmInfo { private Integer retryCount; private LocalDateTime createDt; + + private String notifyIds; + } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java index d92bf359e..017286496 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java @@ -29,4 +29,9 @@ public class WorkflowAlarmInfo extends AlarmInfo { */ private Integer operationReason; + /** + * 通知告警场景 + */ + private String notifyIds; + } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobResponseVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobResponseVO.java index 368a277db..2a8e2af14 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobResponseVO.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobResponseVO.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.common.vo; import lombok.Data; import java.time.LocalDateTime; +import java.util.Set; /** * @author opensnail @@ -129,4 +130,9 @@ public class JobResponseVO { */ private Integer deleted; + /** + * 通知告警场景 + */ + private Set notifyIds; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index d29c68539..8693a6dc8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -116,8 +116,7 @@ public class JobExecutorActor extends AbstractActor { if (Objects.isNull(job)) { taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason(); - } else if (CollUtil.isEmpty(CacheRegisterTable.getServerNodeSet(job.getGroupName(), - job.getNamespaceId()))) { + } else if (CollUtil.isEmpty(CacheRegisterTable.getServerNodeSet(job.getGroupName(), job.getNamespaceId()))) { taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index f0d06bb42..e5e9439f0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.batch; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; @@ -12,6 +13,7 @@ import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; @@ -64,7 +66,6 @@ public class JobTaskBatchGenerator { CollUtil.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason()); - } else { // 生成一个新的任务 jobTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus())); @@ -79,7 +80,11 @@ public class JobTaskBatchGenerator { .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId()) .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId()) ); + } + // 无执行的节点-告警通知 + if (JobTaskBatchStatusEnum.CANCEL.getStatus() == jobTaskBatch.getTaskBatchStatus()) { + SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(jobTaskBatch.getId())); } // 非待处理状态无需进入时间轮中 diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/NotifyConfigController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/NotifyConfigController.java index 7631df832..83b58f1cc 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/NotifyConfigController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/NotifyConfigController.java @@ -6,6 +6,7 @@ import com.aizuda.snailjob.server.web.model.request.NotifyConfigQueryVO; import com.aizuda.snailjob.server.web.model.request.NotifyConfigRequestVO; import com.aizuda.snailjob.server.web.model.response.NotifyConfigResponseVO; import com.aizuda.snailjob.server.web.service.NotifyConfigService; +import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import jakarta.validation.constraints.NotEmpty; import lombok.RequiredArgsConstructor; import org.springframework.validation.annotation.Validated; @@ -32,6 +33,12 @@ public class NotifyConfigController { return notifyConfigService.getNotifyConfigList(queryVO); } + @LoginRequired + @GetMapping("/all/{systemTaskType}") + public List getNotifyConfigBySystemTaskTypeList(@PathVariable("systemTaskType") Integer systemTaskType) { + return notifyConfigService.getNotifyConfigBySystemTaskTypeList(systemTaskType); + } + @LoginRequired @GetMapping("{id}") public NotifyConfigResponseVO getNotifyConfigDetail(@PathVariable("id") Long id) { diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java index 4c6365668..e2d89d95d 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java @@ -9,6 +9,8 @@ import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Pattern; import lombok.Data; +import java.util.Set; + /** * @author opensnail * @date 2023-10-11 22:37:55 @@ -123,4 +125,9 @@ public class JobRequestVO { */ private String description; + /** + * 通知告警场景配置id列表 + */ + private Set notifyIds; + } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java index 17be206b3..4ef1bcba4 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java @@ -25,7 +25,8 @@ public class NotifyConfigRequestVO { /** * 业务id (scene_name或job_id或workflow_id) */ - @NotBlank(message = "业务id不能为空") + //@NotBlank(message = "业务id不能为空") + @Deprecated private String businessId; /** @@ -37,6 +38,9 @@ public class NotifyConfigRequestVO { @NotNull(message = "通知状态不能为空") private Integer notifyStatus; + @NotNull(message = "通知告警场景名不能为空") + private String notifyName; + @NotEmpty(message = "通知人列表") private Set recipientIds; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/NotifyConfigService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/NotifyConfigService.java index 451e51715..c5055a5e7 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/NotifyConfigService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/NotifyConfigService.java @@ -4,6 +4,7 @@ import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.NotifyConfigQueryVO; import com.aizuda.snailjob.server.web.model.request.NotifyConfigRequestVO; import com.aizuda.snailjob.server.web.model.response.NotifyConfigResponseVO; +import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import java.util.List; import java.util.Set; @@ -16,6 +17,8 @@ public interface NotifyConfigService { PageResult> getNotifyConfigList(NotifyConfigQueryVO queryVO); + List getNotifyConfigBySystemTaskTypeList(Integer systemTaskType); + Boolean saveNotify(NotifyConfigRequestVO requestVO); Boolean updateNotify(NotifyConfigRequestVO requestVO); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/JobConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/JobConverter.java index bb6e05f0f..c721d0134 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/JobConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/JobConverter.java @@ -1,11 +1,18 @@ package com.aizuda.snailjob.server.web.service.convert; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.web.model.request.JobRequestVO; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Mappings; import org.mapstruct.factory.Mappers; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * @author: opensnail @@ -17,8 +24,31 @@ public interface JobConverter { JobConverter INSTANCE = Mappers.getMapper(JobConverter.class); - Job convert(JobRequestVO jobRequestVO); - List convertList(List jobs); + @Mappings({ + @Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))") + }) + JobRequestVO convert(Job job); + + @Mappings({ + @Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIdsStr(jobRequestVO.getNotifyIds()))") + }) + Job convert(JobRequestVO jobRequestVO); + + static Set toNotifyIds(String notifyIds) { + if (StrUtil.isBlank(notifyIds)) { + return new HashSet<>(); + } + + return new HashSet<>(JsonUtil.parseList(notifyIds, Long.class)); + } + + static String toNotifyIdsStr(Set notifyIds) { + if (CollUtil.isEmpty(notifyIds)) { + return null; + } + + return JsonUtil.toJsonString(notifyIds); + } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java index bc6083d63..90e4c5809 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java @@ -152,6 +152,7 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job updateJob = JobConverter.INSTANCE.convert(jobRequestVO); + updateJob.setNotifyIds(JsonUtil.toJsonString(jobRequestVO.getNotifyIds())); updateJob.setResident(isResident(jobRequestVO)); updateJob.setNamespaceId(job.getNamespaceId()); @@ -270,8 +271,7 @@ public class JobServiceImpl implements JobService { new LambdaQueryWrapper() .eq(Job::getNamespaceId, namespaceId) .eq(StrUtil.isNotBlank(exportJobVO.getGroupName()), Job::getGroupName, exportJobVO.getGroupName()) - .likeRight(StrUtil.isNotBlank(exportJobVO.getJobName()), Job::getJobName, - StrUtil.trim(exportJobVO.getJobName())) + .likeRight(StrUtil.isNotBlank(exportJobVO.getJobName()), Job::getJobName, StrUtil.trim(exportJobVO.getJobName())) .eq(Objects.nonNull(exportJobVO.getJobStatus()), Job::getJobStatus, exportJobVO.getJobStatus()) .in(CollUtil.isNotEmpty(exportJobVO.getJobIds()), Job::getId, exportJobVO.getJobIds()) .eq(Job::getDeleted, StatusEnum.NO.getStatus()) diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java index 50057c2c7..3be861ba0 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java @@ -4,8 +4,6 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.NotifyConfigQueryVO; @@ -19,25 +17,17 @@ import com.aizuda.snailjob.server.web.service.handler.SyncConfigHandler; import com.aizuda.snailjob.server.web.util.UserSessionUtils; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.ConfigAccess; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient; -import com.aizuda.snailjob.template.datasource.persistence.po.Workflow; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import lombok.RequiredArgsConstructor; -import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Service; import java.time.LocalDateTime; -import java.util.*; -import java.util.stream.Collectors; +import java.util.List; +import java.util.Set; /** * @author: opensnail @@ -48,9 +38,6 @@ import java.util.stream.Collectors; public class NotifyConfigServiceImpl implements NotifyConfigService { private final AccessTemplate accessTemplate; - private final NotifyRecipientMapper notifyRecipientMapper; - private final JobMapper jobMapper; - private final WorkflowMapper workflowMapper; @Override public PageResult> getNotifyConfigList(NotifyConfigQueryVO queryVO) { @@ -74,66 +61,19 @@ public class NotifyConfigServiceImpl implements NotifyConfigService { List notifyConfigResponseVOS = NotifyConfigResponseVOConverter.INSTANCE.convertList( notifyConfigs); - Map recipientNameMap = getRecipientNameMap(notifyConfigResponseVOS); - Map jobNameMap = getJobNameMap(notifyConfigResponseVOS); - Map workflowNameMap = getWorkflowNameMap(notifyConfigResponseVOS); - for (final NotifyConfigResponseVO notifyConfigResponseVO : notifyConfigResponseVOS) { - notifyConfigResponseVO.setRecipientNames(StreamUtils.toSet(notifyConfigResponseVO.getRecipientIds(), - recipientId -> recipientNameMap.getOrDefault(recipientId, StrUtil.EMPTY))); - - if (Objects.equals(notifyConfigResponseVO.getSystemTaskType(), SyetemTaskTypeEnum.RETRY.getType()) || - Objects.equals(notifyConfigResponseVO.getSystemTaskType(), SyetemTaskTypeEnum.CALLBACK.getType())) { - notifyConfigResponseVO.setBusinessName(notifyConfigResponseVO.getBusinessId()); - } else if (Objects.equals(notifyConfigResponseVO.getSystemTaskType(), SyetemTaskTypeEnum.JOB.getType())) { - notifyConfigResponseVO.setBusinessName( - jobNameMap.get(Long.parseLong(notifyConfigResponseVO.getBusinessId()))); - } else if (Objects.equals(notifyConfigResponseVO.getSystemTaskType(), - SyetemTaskTypeEnum.WORKFLOW.getType())) { - notifyConfigResponseVO.setBusinessName( - workflowNameMap.get(Long.parseLong(notifyConfigResponseVO.getBusinessId()))); - } - } - return new PageResult<>(pageDTO, notifyConfigResponseVOS); } - private Map getWorkflowNameMap(final List notifyConfigResponseVOS) { - Set workflowIds = notifyConfigResponseVOS.stream().filter(responseVO -> - responseVO.getSystemTaskType().equals(SyetemTaskTypeEnum.WORKFLOW.getType())) - .map(responseVO -> Long.parseLong(responseVO.getBusinessId())) - .collect(Collectors.toSet()); - if (CollUtil.isNotEmpty(workflowIds)) { - List workflows = workflowMapper.selectBatchIds(workflowIds); - return StreamUtils.toMap(workflows, Workflow::getId, Workflow::getWorkflowName); - } - - return new HashMap<>(); - } - - private Map getJobNameMap(final List notifyConfigResponseVOS) { - Set jobIds = notifyConfigResponseVOS.stream().filter(responseVO -> - responseVO.getSystemTaskType().equals(SyetemTaskTypeEnum.JOB.getType())) - .map(responseVO -> Long.parseLong(responseVO.getBusinessId())) - .collect(Collectors.toSet()); - if (CollUtil.isNotEmpty(jobIds)) { - List jobs = jobMapper.selectBatchIds(jobIds); - return StreamUtils.toMap(jobs, Job::getId, Job::getJobName); - } - - return new HashMap<>(); - } - - @NotNull - private Map getRecipientNameMap(final List notifyConfigResponseVOS) { - Set recipientIds = StreamUtils.toSetByFlatMap(notifyConfigResponseVOS, - NotifyConfigResponseVO::getRecipientIds, Collection::stream); - - if (CollUtil.isEmpty(recipientIds)) { - return Maps.newHashMap(); - } - - List notifyRecipients = notifyRecipientMapper.selectBatchIds(recipientIds); - return StreamUtils.toMap(notifyRecipients, NotifyRecipient::getId, NotifyRecipient::getRecipientName); + @Override + public List getNotifyConfigBySystemTaskTypeList(Integer systemTaskType) { + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + List notifyConfigList = accessTemplate.getNotifyConfigAccess().list(new LambdaQueryWrapper() + .select(NotifyConfig::getId, NotifyConfig::getNotifyName) + .eq(NotifyConfig::getNamespaceId, namespaceId) + .eq(NotifyConfig::getSystemTaskType, systemTaskType) + .orderByDesc(NotifyConfig::getId) + ); + return notifyConfigList; } @Override