diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java index 322d9734..1e8be0cc 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobNotifySceneEnum.java @@ -11,7 +11,14 @@ import lombok.Getter; @Getter public enum JobNotifySceneEnum { - JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER); + /********************************Job****************************************/ + JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER), + + + /********************************Workflow****************************************/ + WORKFLOW_TASK_ERROR(100, "Workflow任务执行失败", NodeTypeEnum.SERVER), + WORKFLOW_TASK_CALLBACK_ERROR(101, "回调节点任务执行失败", NodeTypeEnum.SERVER) + ; /** * 通知场景 diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java index 73e27066..d892940e 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java @@ -5,14 +5,13 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; +import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; -import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; -import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Mappings; @@ -63,4 +62,6 @@ public interface AlarmInfoConverter { List toJobAlarmInfos(List jobBatchResponse); + List toWorkflowAlarmInfos(List jobBatchResponse); + } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java index 0496eefa..bc82c177 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java @@ -26,10 +26,13 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.TaskScheduler; import org.springframework.util.CollectionUtils; +import java.time.Duration; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -40,60 +43,64 @@ import java.util.stream.Collectors; * @since 2.5.0 */ @Slf4j -public abstract class AbstractAlarm implements ApplicationListener, Runnable, +public abstract class AbstractAlarm implements ApplicationListener, + Runnable, Lifecycle { + @Autowired + @Qualifier("alarmExecutorService") + protected TaskScheduler taskScheduler; @Autowired protected AccessTemplate accessTemplate; @Autowired protected NotifyRecipientMapper recipientMapper; - @Override public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { - // 从队列获取数据 - List alarmInfos = poll(); - if (CollectionUtils.isEmpty(alarmInfos)) { - continue; - } - - // 获取所有的命名空间 - Set namespaceIds = new HashSet<>(); - // 获取所有的组名称 - Set groupNames = new HashSet<>(); - // 获取所有的场景名称 - Set sceneNames = new HashSet<>(); - - // 转换AlarmDTO 为了下面循环发送使用 - Map, List> waitSendAlarmInfos = convertAlarmDTO( - alarmInfos, namespaceIds, groupNames, sceneNames); - - // 批量获取通知配置 - Map, List> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames); - - // 循环发送消息 - waitSendAlarmInfos.forEach((key, list) -> { - List notifyConfigsList = notifyConfig.getOrDefault(key, Lists.newArrayList()); - for (A alarmDTO : list) { - sendAlarm(notifyConfigsList, alarmDTO); - } - }); - } catch (InterruptedException e) { - SnailJobLog.LOCAL.info("retry task fail dead letter alarm stop"); - Thread.currentThread().interrupt(); - } catch (Exception e) { - SnailJobLog.LOCAL.error("RetryTaskFailDeadLetterAlarmListener queue poll Exception", e); + try { + // 从队列获取数据 + List alarmInfos = poll(); + if (CollectionUtils.isEmpty(alarmInfos)) { + return; } + + // 获取所有的命名空间 + Set namespaceIds = new HashSet<>(); + // 获取所有的组名称 + Set groupNames = new HashSet<>(); + // 获取所有的场景名称 + Set businessIds = new HashSet<>(); + + // 转换AlarmDTO 为了下面循环发送使用 + Map, List> waitSendAlarmInfos = convertAlarmDTO( + alarmInfos, namespaceIds, groupNames, businessIds); + + // 批量获取通知配置 + Map, List> notifyConfig = obtainNotifyConfig(namespaceIds, + groupNames, businessIds); + + // 循环发送消息 + waitSendAlarmInfos.forEach((key, list) -> { + List notifyConfigsList = notifyConfig.getOrDefault(key, Lists.newArrayList()); + for (A alarmDTO : list) { + sendAlarm(notifyConfigsList, alarmDTO); + } + }); + } catch (InterruptedException e) { + SnailJobLog.LOCAL.info("retry task fail dead letter alarm stop"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + SnailJobLog.LOCAL.error("RetryTaskFailDeadLetterAlarmListener queue poll Exception", e); } + } - protected Map, List> obtainNotifyConfig(Set namespaceIds, Set groupNames, Set businessIds) { + protected Map, List> obtainNotifyConfig(Set namespaceIds, + Set groupNames, Set businessIds) { // 批量获取所需的通知配置 - List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( + List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( new LambdaQueryWrapper() .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) .in(NotifyConfig::getSystemTaskType, getSystemTaskType()) @@ -119,7 +126,7 @@ public abstract class AbstractAlarm notifyRecipients = recipientMapper.selectBatchIds(recipientIds); Map recipientMap = notifyRecipients.stream() - .collect(Collectors.toMap(NotifyRecipient::getId, i->i)); + .collect(Collectors.toMap(NotifyRecipient::getId, i -> i)); if (CollectionUtils.isEmpty(recipientIds)) { return Maps.newHashMap(); @@ -150,29 +157,23 @@ public abstract class AbstractAlarm getSystemTaskType(); - protected abstract Map, List> convertAlarmDTO(List alarmData, Set namespaceIds, Set groupNames, Set sceneNames); - + protected abstract Map, List> convertAlarmDTO(List alarmData, + Set namespaceIds, Set groupNames, Set sceneNames); protected abstract List poll() throws InterruptedException; protected abstract AlarmContext buildAlarmContext(A alarmDTO, NotifyConfigInfo notifyConfig); - private Thread thread; - @Override public void start() { - thread = new Thread(this); - thread.start(); startLog(); + taskScheduler.scheduleAtFixedRate(this, Duration.parse("PT1S")); } protected abstract void startLog(); @Override public void close() { - if (Objects.nonNull(thread)) { - thread.interrupt(); - } } protected void sendAlarm(List notifyConfigsList, A alarmDTO) { @@ -180,7 +181,7 @@ public abstract class AbstractAlarm= alarmDTO.getCount()) { - continue; - } + if (notifyConfig.getNotifyThreshold() >= alarmDTO.getCount()) { + continue; + } } for (final RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) { @@ -199,9 +200,9 @@ public abstract class AbstractAlarm alarmType = SnailJobAlarmFactory.getAlarmType( + Alarm alarm = SnailJobAlarmFactory.getAlarmType( recipientInfo.getNotifyType()); - alarmType.asyncSendMessage(context); + alarm.asyncSendMessage(context); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java new file mode 100644 index 00000000..8970f472 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractWorkflowAlarm.java @@ -0,0 +1,33 @@ +package com.aizuda.snailjob.server.common.alarm; + +import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; +import com.aizuda.snailjob.server.common.triple.ImmutableTriple; +import com.aizuda.snailjob.server.common.triple.Triple; +import org.springframework.context.ApplicationEvent; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * @author xiaowoniu + * @date 2023-12-03 10:19:19 + * @since 2.5.0 + */ +public abstract class AbstractWorkflowAlarm extends AbstractAlarm { + + @Override + protected Map, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set jobIds) { + + return alarmInfos.stream().collect(Collectors.groupingBy(i -> { + String namespaceId = i.getNamespaceId(); + String groupName = i.getGroupName(); + String jobId = String.valueOf(i.getWorkflowId()); + namespaceIds.add(namespaceId); + groupNames.add(groupName); + jobIds.add(jobId); + return ImmutableTriple.of(namespaceId, groupName, jobId); + })); + } +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java index 57a7358c..b9640231 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java @@ -24,4 +24,12 @@ public class SnailJobServerCommonAutoConfiguration { scheduler.setThreadNamePrefix("snail-job-scheduled-thread-"); return scheduler; } + + @Bean + public TaskScheduler alarmExecutorService() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(4); + scheduler.setThreadNamePrefix("snail-job-alarm-thread-"); + return scheduler; + } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java index cafffd3a..6abe3665 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobAlarmInfo.java @@ -13,17 +13,6 @@ import lombok.EqualsAndHashCode; public class JobAlarmInfo extends AlarmInfo { private Long id; - - /** - * 命名空间 - */ - private String namespaceId; - - /** - * 组名称 - */ - private String groupName; - /** * 名称 */ diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java new file mode 100644 index 00000000..59a3fa21 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java @@ -0,0 +1,27 @@ +package com.aizuda.snailjob.server.common.dto; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author opensnail + * @date 2024-05-05 + * @since sj_1.0.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class WorkflowAlarmInfo extends AlarmInfo { + + private Long id; + + /** + * 名称 + */ + private String workflowName; + + /** + * 任务信息id + */ + private Long workflowId; + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java index 96a59553..15a48a1b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support; import com.aizuda.snailjob.client.model.request.DispatchJobRequest; import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest; +import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.job.task.dto.BaseDTO; import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; @@ -10,6 +11,7 @@ import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.dto.RealStopTaskInstanceDTO; import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; @@ -23,13 +25,6 @@ import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; -import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext; -import com.aizuda.snailjob.server.job.task.support.callback.ClientCallbackContext; -import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; -import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext; -import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; -import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; -import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Mappings; @@ -145,4 +140,6 @@ public interface JobTaskConverter { JobLogMessage toJobLogMessage(JobLogMessage jobLogMessage); + List toJobAlarmInfos(List events); + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/event/JobTaskFailAlarmEvent.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/JobTaskFailAlarmEvent.java similarity index 86% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/event/JobTaskFailAlarmEvent.java rename to snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/JobTaskFailAlarmEvent.java index c1dca08e..0396256e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/event/JobTaskFailAlarmEvent.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/JobTaskFailAlarmEvent.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.job.task.support.event; +package com.aizuda.snailjob.server.job.task.support.alarm.event; import lombok.Getter; import org.springframework.context.ApplicationEvent; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/WorkflowTaskFailAlarmEvent.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/WorkflowTaskFailAlarmEvent.java new file mode 100644 index 00000000..225e07f5 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/event/WorkflowTaskFailAlarmEvent.java @@ -0,0 +1,23 @@ +package com.aizuda.snailjob.server.job.task.support.alarm.event; + +import lombok.Getter; +import org.springframework.context.ApplicationEvent; + +/** + * workflow任务失败事件 + * + * @author: opensnail + * @date : 2023-12-02 21:40 + * @since sj_1.0.0 + */ +@Getter +public class WorkflowTaskFailAlarmEvent extends ApplicationEvent { + + private final Long workflowTaskBatchId; + + public WorkflowTaskFailAlarmEvent(Long workflowTaskBatchId) { + super(workflowTaskBatchId); + this.workflowTaskBatchId = workflowTaskBatchId; + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/listener/JobTaskFailAlarmListener.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/JobTaskFailAlarmListener.java similarity index 74% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/listener/JobTaskFailAlarmListener.java rename to snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/JobTaskFailAlarmListener.java index c9d2ddb1..62dd8763 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/listener/JobTaskFailAlarmListener.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/JobTaskFailAlarmListener.java @@ -1,31 +1,28 @@ -package com.aizuda.snailjob.server.job.task.support.listener; +package com.aizuda.snailjob.server.job.task.support.alarm.listener; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; -import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.util.EnvironmentUtils; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.AlarmInfoConverter; import com.aizuda.snailjob.server.common.alarm.AbstractJobAlarm; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; -import com.aizuda.snailjob.server.common.triple.Triple; import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** @@ -38,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue; @Component @RequiredArgsConstructor public class JobTaskFailAlarmListener extends AbstractJobAlarm { + private final JobTaskBatchMapper jobTaskBatchMapper; /** @@ -45,24 +43,29 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm queue = new LinkedBlockingQueue<>(1000); - private static String jobTaskFailTextMessagesFormatter = - "{}环境 Job任务执行失败 \n" + - "> 空间ID:{} \n" + - "> 组名称:{} \n" + - "> 任务名称:{} \n" + - "> 执行器名称:{} \n" + - "> 方法参数:{} \n" + - "> 时间:{} \n"; + private static final String MESSAGES_FORMATTER = """ + {}环境 Job任务执行失败\s + > 空间ID:{} \s + > 组名称:{} \s + > 任务名称:{} \s + > 执行器名称:{} \s + > 方法参数:{} \s + > 时间:{}; + """; @Override protected List poll() throws InterruptedException { // 无数据时阻塞线程 - Long jobTaskBatchId = queue.take(); + Long jobTaskBatchId = queue.poll(100, TimeUnit.MILLISECONDS); + if (Objects.isNull(jobTaskBatchId)) { + return Lists.newArrayList(); + } + // 拉取200条 List jobTaskBatchIds = Lists.newArrayList(jobTaskBatchId); queue.drainTo(jobTaskBatchIds, 200); QueryWrapper wrapper = new QueryWrapper() - .in("a.id", jobTaskBatchIds).eq("a.deleted", 0); + .in("a.id", jobTaskBatchIds).eq("a.deleted", 0); List jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper); return AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList); } @@ -71,7 +74,7 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm { + + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); + private static final String MESSAGES_FORMATTER = """ + {}环境 Workflow任务执行失败\s + > 空间ID:{} \s + > 组名称:{} \s + > 工作流名称:{} \s + > 时间:{}; + """; + + @Override + protected List poll() throws InterruptedException { + // 无数据时阻塞线程 + Long workflowTaskBatchId = queue.poll(100, TimeUnit.MILLISECONDS); + if (Objects.isNull(workflowTaskBatchId)) { + return Lists.newArrayList(); + } + + // 拉取200条 + List jobTaskBatchIds = Lists.newArrayList(workflowTaskBatchId); + queue.drainTo(jobTaskBatchIds, 200); + List workflowTaskBatches = workflowTaskBatchMapper.selectBatchIds(jobTaskBatchIds); + return AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfos(workflowTaskBatches); + } + + @Override + protected AlarmContext buildAlarmContext(WorkflowAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) { + // 预警 + return AlarmContext.build() + .text(MESSAGES_FORMATTER, + EnvironmentUtils.getActiveProfile(), + alarmDTO.getNamespaceId(), + alarmDTO.getGroupName(), + alarmDTO.getWorkflowName(), + DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN)) + .title("{}环境 Workflow任务执行失败", EnvironmentUtils.getActiveProfile()); + } + + @Override + protected void startLog() { + SnailJobLog.LOCAL.info("WorkflowTaskFailAlarmListener started"); + } + + @Override + protected int getNotifyScene() { + return JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene(); + } + + @Override + protected List getSystemTaskType() { + return Lists.newArrayList(SyetemTaskTypeEnum.WORKFLOW); + } + + @Override + public void onApplicationEvent(WorkflowTaskFailAlarmEvent event) { + if (!queue.offer(event.getWorkflowTaskBatchId())) { + SnailJobLog.LOCAL.warn("Workflow任务执行失败告警队列已满"); + } + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 012764e4..5f8d1fd8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -23,7 +23,7 @@ import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; -import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; @@ -39,11 +39,6 @@ import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; -import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; -import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory; -import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; -import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; -import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index a5ce9c9d..d5009cd3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.FailStrategyEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; @@ -14,6 +15,8 @@ import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.cache.MutableGraphCache; import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorFactory; @@ -74,7 +77,7 @@ public class WorkflowExecutorActor extends AbstractActor { SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e); handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason()); - // TODO 发送通知 + SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getTaskBatchId())); } finally { getContext().stop(getSelf()); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 447893db..b047f00a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -6,7 +6,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; -import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java index 566e7f50..4f70e833 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java @@ -56,7 +56,7 @@ public class RetryTaskFailDeadLetterAlarmListener extends @Override protected List poll() throws InterruptedException { - List allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS); + List allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS); if (CollectionUtils.isEmpty(allRetryDeadLetterList)) { return Lists.newArrayList(); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java index 755b384d..ff66b714 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java @@ -19,6 +19,7 @@ import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * 重试任务失败数量超过阈值监听器 @@ -49,7 +50,7 @@ public class RetryTaskFailMoreThresholdAlarmListener extends @Override protected List poll() throws InterruptedException { // 无数据时阻塞线程 - RetryTask retryTask = queue.take(); + RetryTask retryTask = queue.poll(100, TimeUnit.MILLISECONDS); // 拉取100条 List lists = Lists.newArrayList(retryTask);