feat(sj_1.3.0-beta1):

1、新增重试场景告警通知配置
2、新增工作流任务告警通知配置
3、重试任务新增告警失败
This commit is contained in:
wodeyangzipingpingwuqi 2024-12-13 17:54:50 +08:00
parent da85ee21d2
commit 01e8518a23
15 changed files with 352 additions and 24 deletions

View File

@ -16,7 +16,7 @@ public enum JobNotifySceneEnum {
/********************************Job****************************************/ /********************************Job****************************************/
JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER), JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER),
JOB_CLIENT_ERROR(2, "客户端执行失败", NodeTypeEnum.CLIENT), JOB_CLIENT_ERROR(2, "客户端执行失败", NodeTypeEnum.CLIENT),
JOB_NO_CLIENT_NODES_ERROR(3, "没有可执行的客户端节点", NodeTypeEnum.CLIENT), JOB_NO_CLIENT_NODES_ERROR(3, "没有可执行的客户端节点", NodeTypeEnum.SERVER),
/********************************Workflow****************************************/ /********************************Workflow****************************************/
WORKFLOW_TASK_ERROR(100, "Workflow任务执行失败", NodeTypeEnum.SERVER); WORKFLOW_TASK_ERROR(100, "Workflow任务执行失败", NodeTypeEnum.SERVER);

View File

@ -26,7 +26,7 @@ public enum RetryNotifySceneEnum {
RETRY_TASK_ENTER_DEAD_LETTER(6, "任务重试失败进入死信队列", NodeTypeEnum.SERVER), RETRY_TASK_ENTER_DEAD_LETTER(6, "任务重试失败进入死信队列", NodeTypeEnum.SERVER),
RETRY_NO_CLIENT_NODES_ERROR(7, "没有可执行的客户端节点", NodeTypeEnum.CLIENT); RETRY_NO_CLIENT_NODES_ERROR(7, "任务重试失败(没有可执行的客户端节点", NodeTypeEnum.SERVER);
/** /**
* 通知场景 * 通知场景

View File

@ -42,5 +42,4 @@ public class RetryTask extends CreateUpdateDt {
private Integer retryStatus; private Integer retryStatus;
private Integer taskType; private Integer taskType;
} }

View File

@ -93,11 +93,17 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
} }
protected Map<Triple<String, String, Set<Long>>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds, protected Map<Triple<String, String, Set<Long>>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds,
Set<String> groupNames, Set<Long> notifyIds) { Set<String> groupNames,
Set<Long> notifyIds) {
if (CollUtil.isEmpty(notifyIds)) {
return Maps.newHashMap();
}
// 批量获取所需的通知配置 // 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list( List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>() new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, getNotifyScene())
.in(NotifyConfig::getSystemTaskType, StreamUtils.toList(getSystemTaskType(), SyetemTaskTypeEnum::getType)) .in(NotifyConfig::getSystemTaskType, StreamUtils.toList(getSystemTaskType(), SyetemTaskTypeEnum::getType))
.in(NotifyConfig::getNamespaceId, namespaceIds) .in(NotifyConfig::getNamespaceId, namespaceIds)
.in(NotifyConfig::getGroupName, groupNames) .in(NotifyConfig::getGroupName, groupNames)

View File

@ -18,12 +18,12 @@ import java.util.*;
public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, JobAlarmInfo> { public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, JobAlarmInfo> {
@Override @Override
protected Map<Triple<String, String, Set<Long>>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> alarmInfos, Set<String> namespaceIds, Set<String> groupNames, Set<Long> notifyIds) { protected Map<Triple<String, String, Set<Long>>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> jobAlarmInfoList, Set<String> namespaceIds, Set<String> groupNames, Set<Long> notifyIds) {
return StreamUtils.groupByKey(alarmInfos, alarmInfo -> { return StreamUtils.groupByKey(jobAlarmInfoList, jobAlarmInfo -> {
String namespaceId = alarmInfo.getNamespaceId(); String namespaceId = jobAlarmInfo.getNamespaceId();
String groupName = alarmInfo.getGroupName(); String groupName = jobAlarmInfo.getGroupName();
HashSet<Long> notifyIdsSet = StrUtil.isBlank(alarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(alarmInfo.getNotifyIds(), Long.class)); HashSet<Long> notifyIdsSet = StrUtil.isBlank(jobAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(jobAlarmInfo.getNotifyIds(), Long.class));
namespaceIds.add(namespaceId); namespaceIds.add(namespaceId);
groupNames.add(groupName); groupNames.add(groupName);

View File

@ -6,9 +6,13 @@ import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple; import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import java.util.*; import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -22,12 +26,15 @@ public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends Abs
return StreamUtils.groupByKey(retryAlarmInfoList, retryAlarmInfo -> { return StreamUtils.groupByKey(retryAlarmInfoList, retryAlarmInfo -> {
String namespaceId = retryAlarmInfo.getNamespaceId(); String namespaceId = retryAlarmInfo.getNamespaceId();
String groupName = retryAlarmInfo.getGroupName(); String groupName = retryAlarmInfo.getGroupName();
HashSet<Long> notifyIdsSet = StrUtil.isBlank(retryAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retryAlarmInfo.getNotifyIds(), Long.class));
// 重试任务查询场景告警通知
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName(), retryAlarmInfo.getNamespaceId());
HashSet<Long> retrySceneConfigNotifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
namespaceIds.add(namespaceId); namespaceIds.add(namespaceId);
groupNames.add(groupName); groupNames.add(groupName);
notifyIds.addAll(notifyIdsSet); notifyIds.addAll(retrySceneConfigNotifyIds);
return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet); return ImmutableTriple.of(namespaceId, groupName, retrySceneConfigNotifyIds);
}); });
} }
} }

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.common.alarm;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple; import com.aizuda.snailjob.server.common.triple.Triple;
@ -18,9 +19,9 @@ import java.util.stream.Collectors;
public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, WorkflowAlarmInfo> { public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, WorkflowAlarmInfo> {
@Override @Override
protected Map<Triple<String, String, Set<Long>>, List<WorkflowAlarmInfo>> convertAlarmDTO(List<WorkflowAlarmInfo> alarmInfos, Set<String> namespaceIds, Set<String> groupNames, Set<Long> notifyIds) { protected Map<Triple<String, String, Set<Long>>, List<WorkflowAlarmInfo>> convertAlarmDTO(List<WorkflowAlarmInfo> workflowAlarmInfoList, Set<String> namespaceIds, Set<String> groupNames, Set<Long> notifyIds) {
return alarmInfos.stream().collect(Collectors.groupingBy(workflowAlarmInfo -> { return StreamUtils.groupByKey(workflowAlarmInfoList, workflowAlarmInfo -> {
String namespaceId = workflowAlarmInfo.getNamespaceId(); String namespaceId = workflowAlarmInfo.getNamespaceId();
String groupName = workflowAlarmInfo.getGroupName(); String groupName = workflowAlarmInfo.getGroupName();
HashSet<Long> notifyIdsSet = StrUtil.isBlank(workflowAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowAlarmInfo.getNotifyIds(), Long.class)); HashSet<Long> notifyIdsSet = StrUtil.isBlank(workflowAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowAlarmInfo.getNotifyIds(), Long.class));
@ -29,6 +30,6 @@ public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends
groupNames.add(groupName); groupNames.add(groupName);
notifyIds.addAll(notifyIdsSet); notifyIds.addAll(notifyIdsSet);
return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet); return ImmutableTriple.of(namespaceId, groupName, notifyIdsSet);
})); });
} }
} }

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.server.job.task.support.alarm.event;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* job任务失败事件没有可执行的客户端节点
*
* @author: zuoJunLin
* @date : 2023-12-02 21:40
* @since 2.5.0
*/
@Getter
public class JobTaskFailNodeAlarmEvent extends ApplicationEvent {
private JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO;
public JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO) {
super(jobTaskFailAlarmEventDTO);
this.jobTaskFailAlarmEventDTO = jobTaskFailAlarmEventDTO;
}
}

View File

@ -0,0 +1,122 @@
package com.aizuda.snailjob.server.job.task.support.alarm.listener;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.alarm.AbstractJobAlarm;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* JOB任务执行失败告警没有可执行的客户端节点
*
* @author: zuoJunLin
* @date : 2023-12-02 21:40
* @since 2.5.0
*/
@Component
@RequiredArgsConstructor
public class JobTaskFailNodeAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmEvent> {
private final JobTaskBatchMapper jobTaskBatchMapper;
/**
* job任务失败数据
*/
private final LinkedBlockingQueue<JobTaskFailAlarmEventDTO> queue = new LinkedBlockingQueue<>(1000);
private static final String MESSAGES_FORMATTER = """
<font face=微软雅黑 color=#ff0000 size=4>{}环境 Job任务执行失败</font>\s
> 空间ID:{} \s
> 组名称:{} \s
> 任务名称:{} \s
> 执行器名称:{} \s
> 失败原因:{} \s
> 方法参数:{} \s
> 时间:{};
""";
@Override
protected List<JobAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(jobTaskFailAlarmEventDTO)) {
return Lists.newArrayList();
}
// 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(jobTaskFailAlarmEventDTO.getJobTaskBatchId());
queue.drainTo(Collections.singleton(jobTaskBatchIds), 200);
QueryWrapper<JobTaskBatch> wrapper = new QueryWrapper<JobTaskBatch>()
.in("batch.id", jobTaskBatchIds)
.eq("batch.deleted", 0);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
List<JobAlarmInfo> jobAlarmInfos = AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList);
jobAlarmInfos.stream().forEach(i -> i.setReason(jobTaskFailAlarmEventDTO.getReason()));
return jobAlarmInfos;
}
@Override
protected AlarmContext buildAlarmContext(JobAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) {
String desc = StrUtil.isNotBlank(alarmDTO.getReason()) ? alarmDTO.getReason() : JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc();
// 预警
return AlarmContext.build()
.text(MESSAGES_FORMATTER,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getNamespaceId(),
alarmDTO.getGroupName(),
alarmDTO.getJobName(),
alarmDTO.getExecutorInfo(),
desc,
alarmDTO.getArgsStr(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile());
}
@Override
protected void startLog() {
SnailJobLog.LOCAL.info("JobTaskFailNodeAlarmListener started");
}
@Override
protected int getNotifyScene() {
return JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getNotifyScene();
}
@Override
protected List<SyetemTaskTypeEnum> getSystemTaskType() {
return Lists.newArrayList(SyetemTaskTypeEnum.JOB);
}
@Override
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
public void doOnApplicationEvent(JobTaskFailAlarmEvent jobTaskFailAlarmEvent) {
if (!queue.offer(jobTaskFailAlarmEvent.getJobTaskFailAlarmEventDTO())) {
SnailJobLog.LOCAL.warn("JOB任务执行失败告警队列已满没有可执行的客户端节点");
}
}
}

View File

@ -6,10 +6,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.akka.ActorGenerator;
@ -23,6 +20,7 @@ import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailNodeAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
@ -133,7 +131,9 @@ public class JobExecutorActor extends AbstractActor {
// 无客户端节点-告警通知 // 无客户端节点-告警通知
if (JobTaskBatchStatusEnum.CANCEL.getStatus() == taskStatus && JobOperationReasonEnum.NOT_CLIENT.getReason() == operationReason) { if (JobTaskBatchStatusEnum.CANCEL.getStatus() == taskStatus && JobOperationReasonEnum.NOT_CLIENT.getReason() == operationReason) {
SnailSpringContext.getContext().publishEvent( SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskExecute.getTaskBatchId()).build())); new JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskExecute.getTaskBatchId())
.reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc()).build()));
} }
// 更新状态 // 更新状态

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.batch;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
@ -14,7 +15,7 @@ import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailNodeAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask;
@ -86,7 +87,9 @@ public class JobTaskBatchGenerator {
// 无客户端节点-告警通知 // 无客户端节点-告警通知
if (JobTaskBatchStatusEnum.CANCEL.getStatus() == jobTaskBatch.getTaskBatchStatus() && JobOperationReasonEnum.NOT_CLIENT.getReason() == jobTaskBatch.getOperationReason()) { if (JobTaskBatchStatusEnum.CANCEL.getStatus() == jobTaskBatch.getTaskBatchStatus() && JobOperationReasonEnum.NOT_CLIENT.getReason() == jobTaskBatch.getOperationReason()) {
SnailSpringContext.getContext().publishEvent( SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(jobTaskBatch.getId()).build())); new JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(jobTaskBatch.getId())
.reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc()).build()));
} }
// 非待处理状态无需进入时间轮中 // 非待处理状态无需进入时间轮中

View File

@ -0,0 +1,18 @@
package com.aizuda.snailjob.server.retry.task.dto;
import lombok.Builder;
import lombok.Data;
/**
* @author zhengweilin
* @version 1.0.0
* @date 2024/12/12
*/
@Data
@Builder
public class RetryTaskFailAlarmEventDTO {
private Long retryTaskId;
private String reason;
}

View File

@ -58,12 +58,14 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
RetryTask retryTask = retryContext.getRetryTask(); RetryTask retryTask = retryContext.getRetryTask();
SnailJobLog.LOCAL.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]", SnailJobLog.LOCAL.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]",
retryTask.getGroupName(), retryTask.getGroupName(),
retryTask.getUniqueId(), pair.getValue().toString()); retryTask.getUniqueId(),
pair.getValue().toString());
RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryTask); RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryTask);
retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli()); retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli());
SnailJobLog.REMOTE.error("触发条件不满足 原因: [{}] <|>{}<|>", pair.getValue().toString(), retryLogMetaDTO); SnailJobLog.REMOTE.error("触发条件不满足 原因: [{}] <|>{}<|>", pair.getValue().toString(), retryLogMetaDTO);
return false; return false;
} }

View File

@ -0,0 +1,26 @@
package com.aizuda.snailjob.server.retry.task.support.event;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.util.List;
/**
* 重试任务失败事件
*
* @author: zhengweilin
* @date : 2024-12-13 16:57
* @since 1.3.0
*/
@Getter
public class RetryTaskFailAlarmEvent extends ApplicationEvent {
private RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO;
public RetryTaskFailAlarmEvent(RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO) {
super(retryTaskFailAlarmEventDTO);
this.retryTaskFailAlarmEventDTO = retryTaskFailAlarmEventDTO;
}
}

View File

@ -0,0 +1,120 @@
package com.aizuda.snailjob.server.retry.task.support.listener;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 重试任务失败监听器
*
* @author: zhengweilin
* @date : 2024-12-13 16:57
* @since 1.3.0
*/
@Component
@Slf4j
public class RetryTaskFailAlarmListener extends
AbstractRetryAlarm<RetryTaskFailAlarmEvent> implements Runnable, Lifecycle {
/**
* 死信告警数据
*/
private final LinkedBlockingQueue<RetryTaskFailAlarmEventDTO> queue = new LinkedBlockingQueue<>(1000);
private static final String retryTaskDeadTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务执行失败</font> \n" +
"> 空间ID:{} \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Override
protected List<SyetemTaskTypeEnum> getSystemTaskType() {
return Lists.newArrayList(SyetemTaskTypeEnum.RETRY);
}
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {
RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(retryTaskFailAlarmEventDTO)) {
return Lists.newArrayList();
}
// 拉取200条
/*List<Long> retryTaskIds = Lists.newArrayList(retryTaskFailAlarmEventDTO.getRetryTaskId());
queue.drainTo(Collections.singleton(retryTaskIds), 200);
QueryWrapper<RetryTask> wrapper = new QueryWrapper<RetryTask>()
.in("batch.id", retryTaskIds)
.eq("batch.deleted", 0);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
List<JobAlarmInfo> jobAlarmInfos = AlarmInfoConverter.INSTANCE.retryTaskToAlarmInfo(jobTaskBatchList);
jobAlarmInfos.stream().forEach(i -> i.setReason(jobTaskFailAlarmEventDTO.getReason()));*/
return null;
}
@Override
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
public void doOnApplicationEvent(RetryTaskFailAlarmEvent retryTaskFailAlarmEvent) {
if (!queue.offer(retryTaskFailAlarmEvent.getRetryTaskFailAlarmEventDTO())) {
SnailJobLog.LOCAL.warn("任务重试失败进入死信队列告警队列已满");
}
}
@Override
protected AlarmContext buildAlarmContext(final RetryAlarmInfo retryAlarmInfo, final NotifyConfigInfo notifyConfig) {
// 预警
return AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
retryAlarmInfo.getNamespaceId(),
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getExecutorName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getArgsStr(),
DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败",
retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName());
}
@Override
protected void startLog() {
SnailJobLog.LOCAL.info("RetryTaskFailAlarmListener started");
}
@Override
protected int getNotifyScene() {
return RetryNotifySceneEnum.RETRY_NO_CLIENT_NODES_ERROR.getNotifyScene();
}
}