diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java index 8fa95adf4..4758c1d78 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java @@ -16,7 +16,7 @@ public enum JobNotifySceneEnum { /********************************Job****************************************/ JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER), JOB_CLIENT_ERROR(2, "客户端执行失败", NodeTypeEnum.CLIENT), - JOB_NO_CLIENT_NODES_ERROR(3, "没有可执行的客户端节点", NodeTypeEnum.CLIENT), + JOB_NO_CLIENT_NODES_ERROR(3, "没有可执行的客户端节点", NodeTypeEnum.SERVER), /********************************Workflow****************************************/ WORKFLOW_TASK_ERROR(100, "Workflow任务执行失败", NodeTypeEnum.SERVER); diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java index a5736423e..5d049f763 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/RetryNotifySceneEnum.java @@ -26,7 +26,7 @@ public enum RetryNotifySceneEnum { RETRY_TASK_ENTER_DEAD_LETTER(6, "任务重试失败进入死信队列", NodeTypeEnum.SERVER), - RETRY_NO_CLIENT_NODES_ERROR(7, "没有可执行的客户端节点", NodeTypeEnum.CLIENT); + RETRY_NO_CLIENT_NODES_ERROR(7, "任务重试失败(没有可执行的客户端节点)", NodeTypeEnum.SERVER); /** * 通知场景 diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryTask.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryTask.java index 010fca488..45a65945e 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryTask.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryTask.java @@ -42,5 +42,4 @@ public class RetryTask extends CreateUpdateDt { private Integer retryStatus; private Integer taskType; - } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java index 3901669e0..69d9dfeca 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java @@ -93,11 +93,17 @@ public abstract class AbstractAlarm>, List> obtainNotifyConfig(Set namespaceIds, - Set groupNames, Set notifyIds) { + Set groupNames, + Set notifyIds) { + + if (CollUtil.isEmpty(notifyIds)) { + return Maps.newHashMap(); + } // 批量获取所需的通知配置 List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( new LambdaQueryWrapper() .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) + .eq(NotifyConfig::getNotifyScene, getNotifyScene()) .in(NotifyConfig::getSystemTaskType, StreamUtils.toList(getSystemTaskType(), SyetemTaskTypeEnum::getType)) .in(NotifyConfig::getNamespaceId, namespaceIds) .in(NotifyConfig::getGroupName, groupNames) diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java index 8ddcc8ed2..afbf01a51 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java @@ -18,12 +18,12 @@ import java.util.*; public abstract class AbstractJobAlarm extends AbstractAlarm { @Override - protected Map>, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set notifyIds) { + protected Map>, List> convertAlarmDTO(List jobAlarmInfoList, Set namespaceIds, Set groupNames, Set notifyIds) { - return StreamUtils.groupByKey(alarmInfos, alarmInfo -> { - String namespaceId = alarmInfo.getNamespaceId(); - String groupName = alarmInfo.getGroupName(); - HashSet notifyIdsSet = StrUtil.isBlank(alarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(alarmInfo.getNotifyIds(), Long.class)); + return StreamUtils.groupByKey(jobAlarmInfoList, jobAlarmInfo -> { + String namespaceId = jobAlarmInfo.getNamespaceId(); + String groupName = jobAlarmInfo.getGroupName(); + HashSet notifyIdsSet = StrUtil.isBlank(jobAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(jobAlarmInfo.getNotifyIds(), Long.class)); namespaceIds.add(namespaceId); groupNames.add(groupName); diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java index 4101934b3..840c70e76 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java @@ -6,9 +6,13 @@ import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.springframework.context.ApplicationEvent; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * @author xiaowoniu @@ -22,12 +26,15 @@ public abstract class AbstractRetryAlarm extends Abs return StreamUtils.groupByKey(retryAlarmInfoList, retryAlarmInfo -> { String namespaceId = retryAlarmInfo.getNamespaceId(); String groupName = retryAlarmInfo.getGroupName(); - HashSet notifyIdsSet = StrUtil.isBlank(retryAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retryAlarmInfo.getNotifyIds(), Long.class)); + + // 重试任务查询场景告警通知 + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName(), retryAlarmInfo.getNamespaceId()); + HashSet retrySceneConfigNotifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class)); namespaceIds.add(namespaceId); groupNames.add(groupName); - notifyIds.addAll(notifyIdsSet); - return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet); + notifyIds.addAll(retrySceneConfigNotifyIds); + return ImmutableTriple.of(namespaceId, groupName, retrySceneConfigNotifyIds); }); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java index 63b2bd7a5..d30939d95 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.common.alarm; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; @@ -18,9 +19,9 @@ import java.util.stream.Collectors; public abstract class AbstractWorkflowAlarm extends AbstractAlarm { @Override - protected Map>, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set notifyIds) { + protected Map>, List> convertAlarmDTO(List workflowAlarmInfoList, Set namespaceIds, Set groupNames, Set notifyIds) { - return alarmInfos.stream().collect(Collectors.groupingBy(workflowAlarmInfo -> { + return StreamUtils.groupByKey(workflowAlarmInfoList, workflowAlarmInfo -> { String namespaceId = workflowAlarmInfo.getNamespaceId(); String groupName = workflowAlarmInfo.getGroupName(); HashSet notifyIdsSet = StrUtil.isBlank(workflowAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowAlarmInfo.getNotifyIds(), Long.class)); @@ -29,6 +30,6 @@ public abstract class AbstractWorkflowAlarm extends groupNames.add(groupName); notifyIds.addAll(notifyIdsSet); return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet); - })); + }); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/JobTaskFailNodeAlarmEvent.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/JobTaskFailNodeAlarmEvent.java new file mode 100644 index 000000000..6cfe07f8c --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/JobTaskFailNodeAlarmEvent.java @@ -0,0 +1,24 @@ +package com.aizuda.snailjob.server.job.task.support.alarm.event; + +import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO; +import lombok.Getter; +import org.springframework.context.ApplicationEvent; + +/** + * job任务失败事件(没有可执行的客户端节点) + * + * @author: zuoJunLin + * @date : 2023-12-02 21:40 + * @since 2.5.0 + */ +@Getter +public class JobTaskFailNodeAlarmEvent extends ApplicationEvent { + + private JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO; + + public JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO) { + super(jobTaskFailAlarmEventDTO); + this.jobTaskFailAlarmEventDTO = jobTaskFailAlarmEventDTO; + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/JobTaskFailNodeAlarmListener.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/JobTaskFailNodeAlarmListener.java new file mode 100644 index 000000000..dcba44c07 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/JobTaskFailNodeAlarmListener.java @@ -0,0 +1,122 @@ +package com.aizuda.snailjob.server.job.task.support.alarm.listener; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.alarm.AlarmContext; +import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; +import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; +import com.aizuda.snailjob.common.core.util.EnvironmentUtils; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.AlarmInfoConverter; +import com.aizuda.snailjob.server.common.alarm.AbstractJobAlarm; +import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; +import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + + +/** + * JOB任务执行失败告警(没有可执行的客户端节点) + * + * @author: zuoJunLin + * @date : 2023-12-02 21:40 + * @since 2.5.0 + */ +@Component +@RequiredArgsConstructor +public class JobTaskFailNodeAlarmListener extends AbstractJobAlarm { + + private final JobTaskBatchMapper jobTaskBatchMapper; + + /** + * job任务失败数据 + */ + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); + + private static final String MESSAGES_FORMATTER = """ + {}环境 Job任务执行失败\s + > 空间ID:{} \s + > 组名称:{} \s + > 任务名称:{} \s + > 执行器名称:{} \s + > 失败原因:{} \s + > 方法参数:{} \s + > 时间:{}; + """; + + @Override + protected List poll() throws InterruptedException { + // 无数据时阻塞线程 + JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO = queue.poll(100, TimeUnit.MILLISECONDS); + if (Objects.isNull(jobTaskFailAlarmEventDTO)) { + return Lists.newArrayList(); + } + + // 拉取200条 + List jobTaskBatchIds = Lists.newArrayList(jobTaskFailAlarmEventDTO.getJobTaskBatchId()); + queue.drainTo(Collections.singleton(jobTaskBatchIds), 200); + QueryWrapper wrapper = new QueryWrapper() + .in("batch.id", jobTaskBatchIds) + .eq("batch.deleted", 0); + List jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper); + List jobAlarmInfos = AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList); + jobAlarmInfos.stream().forEach(i -> i.setReason(jobTaskFailAlarmEventDTO.getReason())); + return jobAlarmInfos; + } + + @Override + protected AlarmContext buildAlarmContext(JobAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) { + String desc = StrUtil.isNotBlank(alarmDTO.getReason()) ? alarmDTO.getReason() : JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc(); + // 预警 + return AlarmContext.build() + .text(MESSAGES_FORMATTER, + EnvironmentUtils.getActiveProfile(), + alarmDTO.getNamespaceId(), + alarmDTO.getGroupName(), + alarmDTO.getJobName(), + alarmDTO.getExecutorInfo(), + desc, + alarmDTO.getArgsStr(), + DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN)) + .title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile()); + } + + @Override + protected void startLog() { + SnailJobLog.LOCAL.info("JobTaskFailNodeAlarmListener started"); + } + + @Override + protected int getNotifyScene() { + return JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getNotifyScene(); + } + + @Override + protected List getSystemTaskType() { + return Lists.newArrayList(SyetemTaskTypeEnum.JOB); + } + + @Override + @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION) + public void doOnApplicationEvent(JobTaskFailAlarmEvent jobTaskFailAlarmEvent) { + if (!queue.offer(jobTaskFailAlarmEvent.getJobTaskFailAlarmEventDTO())) { + SnailJobLog.LOCAL.warn("JOB任务执行失败告警队列已满(没有可执行的客户端节点)"); + } + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index b492efff4..24e561dec 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -6,10 +6,7 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.context.SnailSpringContext; -import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.enums.*; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; @@ -23,6 +20,7 @@ import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailNodeAlarmEvent; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; @@ -133,7 +131,9 @@ public class JobExecutorActor extends AbstractActor { // 无客户端节点-告警通知 if (JobTaskBatchStatusEnum.CANCEL.getStatus() == taskStatus && JobOperationReasonEnum.NOT_CLIENT.getReason() == operationReason) { SnailSpringContext.getContext().publishEvent( - new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskExecute.getTaskBatchId()).build())); + new JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO.builder() + .jobTaskBatchId(taskExecute.getTaskBatchId()) + .reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc()).build())); } // 更新状态 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 3314128ec..4985c5e17 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.batch; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; @@ -14,7 +15,7 @@ import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; -import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailNodeAlarmEvent; import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; @@ -86,7 +87,9 @@ public class JobTaskBatchGenerator { // 无客户端节点-告警通知 if (JobTaskBatchStatusEnum.CANCEL.getStatus() == jobTaskBatch.getTaskBatchStatus() && JobOperationReasonEnum.NOT_CLIENT.getReason() == jobTaskBatch.getOperationReason()) { SnailSpringContext.getContext().publishEvent( - new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(jobTaskBatch.getId()).build())); + new JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO.builder() + .jobTaskBatchId(jobTaskBatch.getId()) + .reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc()).build())); } // 非待处理状态无需进入时间轮中 diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskFailAlarmEventDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskFailAlarmEventDTO.java new file mode 100644 index 000000000..2d85ece87 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskFailAlarmEventDTO.java @@ -0,0 +1,18 @@ +package com.aizuda.snailjob.server.retry.task.dto; + +import lombok.Builder; +import lombok.Data; + +/** + * @author zhengweilin + * @version 1.0.0 + * @date 2024/12/12 + */ +@Data +@Builder +public class RetryTaskFailAlarmEventDTO { + + private Long retryTaskId; + + private String reason; +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java index d9f55ace5..13768f3d5 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java @@ -58,12 +58,14 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing RetryTask retryTask = retryContext.getRetryTask(); SnailJobLog.LOCAL.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]", retryTask.getGroupName(), - retryTask.getUniqueId(), pair.getValue().toString()); + retryTask.getUniqueId(), + pair.getValue().toString()); RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryTask); retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli()); SnailJobLog.REMOTE.error("触发条件不满足 原因: [{}] <|>{}<|>", pair.getValue().toString(), retryLogMetaDTO); + return false; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/event/RetryTaskFailAlarmEvent.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/event/RetryTaskFailAlarmEvent.java new file mode 100644 index 000000000..06954f0f4 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/event/RetryTaskFailAlarmEvent.java @@ -0,0 +1,26 @@ +package com.aizuda.snailjob.server.retry.task.support.event; + +import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; +import lombok.Getter; +import org.springframework.context.ApplicationEvent; + +import java.util.List; + +/** + * 重试任务失败事件 + * + * @author: zhengweilin + * @date : 2024-12-13 16:57 + * @since 1.3.0 + */ +@Getter +public class RetryTaskFailAlarmEvent extends ApplicationEvent { + private RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO; + + public RetryTaskFailAlarmEvent(RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO) { + super(retryTaskFailAlarmEventDTO); + this.retryTaskFailAlarmEventDTO = retryTaskFailAlarmEventDTO; + } + +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailAlarmListener.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailAlarmListener.java new file mode 100644 index 000000000..cf781a9eb --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailAlarmListener.java @@ -0,0 +1,120 @@ +package com.aizuda.snailjob.server.retry.task.support.listener; + +import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.alarm.AlarmContext; +import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; +import com.aizuda.snailjob.common.core.util.EnvironmentUtils; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.AlarmInfoConverter; +import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm; +import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; +import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; +import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO; +import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent; +import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 重试任务失败监听器 + * + * @author: zhengweilin + * @date : 2024-12-13 16:57 + * @since 1.3.0 + */ +@Component +@Slf4j +public class RetryTaskFailAlarmListener extends + AbstractRetryAlarm implements Runnable, Lifecycle { + + /** + * 死信告警数据 + */ + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); + + private static final String retryTaskDeadTextMessagesFormatter = + "{}环境 重试任务执行失败 \n" + + "> 空间ID:{} \n" + + "> 组名称:{} \n" + + "> 执行器名称:{} \n" + + "> 场景名称:{} \n" + + "> 业务数据:{} \n" + + "> 时间:{} \n"; + + @Override + protected List getSystemTaskType() { + return Lists.newArrayList(SyetemTaskTypeEnum.RETRY); + } + + @Override + protected List poll() throws InterruptedException { + + RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO = queue.poll(100, TimeUnit.MILLISECONDS); + if (Objects.isNull(retryTaskFailAlarmEventDTO)) { + return Lists.newArrayList(); + } + + // 拉取200条 + /*List retryTaskIds = Lists.newArrayList(retryTaskFailAlarmEventDTO.getRetryTaskId()); + queue.drainTo(Collections.singleton(retryTaskIds), 200); + QueryWrapper wrapper = new QueryWrapper() + .in("batch.id", retryTaskIds) + .eq("batch.deleted", 0); + List jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper); + List jobAlarmInfos = AlarmInfoConverter.INSTANCE.retryTaskToAlarmInfo(jobTaskBatchList); + jobAlarmInfos.stream().forEach(i -> i.setReason(jobTaskFailAlarmEventDTO.getReason()));*/ + return null; + } + + @Override + @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION) + public void doOnApplicationEvent(RetryTaskFailAlarmEvent retryTaskFailAlarmEvent) { + if (!queue.offer(retryTaskFailAlarmEvent.getRetryTaskFailAlarmEventDTO())) { + SnailJobLog.LOCAL.warn("任务重试失败进入死信队列告警队列已满"); + } + } + + @Override + protected AlarmContext buildAlarmContext(final RetryAlarmInfo retryAlarmInfo, final NotifyConfigInfo notifyConfig) { + + // 预警 + return AlarmContext.build().text(retryTaskDeadTextMessagesFormatter, + EnvironmentUtils.getActiveProfile(), + retryAlarmInfo.getNamespaceId(), + retryAlarmInfo.getGroupName(), + retryAlarmInfo.getExecutorName(), + retryAlarmInfo.getSceneName(), + retryAlarmInfo.getArgsStr(), + DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) + .title("组:[{}] 场景:[{}] 环境重试任务失败", + retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName()); + } + + @Override + protected void startLog() { + SnailJobLog.LOCAL.info("RetryTaskFailAlarmListener started"); + } + + @Override + protected int getNotifyScene() { + return RetryNotifySceneEnum.RETRY_NO_CLIENT_NODES_ERROR.getNotifyScene(); + } +}