feat(sj_1.3.0-beta1):

1、新增重试场景告警通知配置
2、新增工作流任务告警通知配置
3、重试任务新增告警失败
This commit is contained in:
wodeyangzipingpingwuqi 2024-12-24 18:00:07 +08:00
parent 8e7f5e1d85
commit 7844946f63
37 changed files with 410 additions and 181 deletions

View File

@ -37,6 +37,15 @@ public interface ConfigAccess<T> extends Access<T> {
*/
RetrySceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId);
/**
* 获取场景配置集合
*
* @param groupNames 组名称
* @param sceneNames 场景名称
* @param namespaceIds 命名空间
* @return {@link RetrySceneConfig} 场景配置
*/
List<RetrySceneConfig> getSceneConfigByGroupNameAndSceneNameList(Set<String> groupNames, Set<String> sceneNames, Set<String> namespaceIds);
/**
* 获取通知配置

View File

@ -63,6 +63,13 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
.eq(RetrySceneConfig::getSceneName, sceneName));
}
protected List<RetrySceneConfig> getByGroupNameAndSceneNameList(Set<String> groupNames, Set<String> sceneNames, Set<String> namespaceIds) {
return sceneConfigMapper.selectList(new LambdaQueryWrapper<RetrySceneConfig>()
.in(RetrySceneConfig::getNamespaceId, namespaceIds)
.in(RetrySceneConfig::getGroupName, groupNames)
.in(RetrySceneConfig::getSceneName, sceneNames));
}
protected List<RetrySceneConfig> getSceneConfigs(String groupName) {
return sceneConfigMapper.selectList(new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getGroupName, groupName));
@ -94,6 +101,11 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
return getByGroupNameAndSceneName(groupName, sceneName, namespaceId);
}
@Override
public List<RetrySceneConfig> getSceneConfigByGroupNameAndSceneNameList(Set<String> groupNames, Set<String> sceneNames, Set<String> namespaceIds) {
return getByGroupNameAndSceneNameList(groupNames, sceneNames, namespaceIds);
}
@Override
public List<NotifyConfig> getNotifyListConfigByGroupName(String groupName, String namespaceId) {
return getNotifyConfigs(groupName, namespaceId);

View File

@ -53,4 +53,9 @@ public class WorkflowBatchResponseDO {
* 创建时间
*/
private LocalDateTime createDt;
/**
* 通知配置
*/
private String notifyIds;
}

View File

@ -15,7 +15,8 @@
<select id="selectWorkflowBatchList"
resultType="com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO">
SELECT batch.*,
flow.workflow_name
flow.workflow_name,
flow.notify_ids
FROM sj_workflow_task_batch batch
JOIN sj_workflow flow ON batch.workflow_id = flow.id
${ew.customSqlSegment}

View File

@ -7,11 +7,9 @@ import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.RetryTaskFailAlarmEventDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
@ -31,22 +29,6 @@ public interface AlarmInfoConverter {
AlarmInfoConverter INSTANCE = Mappers.getMapper(AlarmInfoConverter.class);
@Mappings(
@Mapping(source = "retryCount", target = "count")
)
List<RetryAlarmInfo> retryTaskToAlarmInfo(List<RetryTaskFailAlarmEventDO> retryTaskFailAlarmEventDOList);
@Mappings(
@Mapping(source = "retryCount", target = "count")
)
RetryAlarmInfo retryTaskToAlarmInfo(RetryTaskFailAlarmEventDO retryTaskFailAlarmEventDO);
@Mappings({
@Mapping(source = "notifyScene", target = "notifyScene"),
@Mapping(source = "reason", target = "reason")
})
RetryTaskFailAlarmEventDO toRetryTaskFailAlarmEventDTO(RetryTask retryTask, String reason, Integer notifyScene);
List<RetryAlarmInfo> deadLetterToAlarmInfo(List<RetryDeadLetter> retryDeadLetters);
List<NotifyConfigInfo> retryToNotifyConfigInfos(List<NotifyConfig> notifyConfigs);
@ -64,8 +46,8 @@ public interface AlarmInfoConverter {
return new HashSet<>(JsonUtil.parseList(notifyRecipientIdsStr, Long.class));
}
List<JobAlarmInfo> toJobAlarmInfos(List<JobBatchResponseDO> jobBatchResponse);
JobAlarmInfo toJobAlarmInfo(JobBatchResponseDO jobBatchResponseDO);
List<WorkflowAlarmInfo> toWorkflowAlarmInfos(List<WorkflowBatchResponseDO> workflowBatchResponses);
WorkflowAlarmInfo toWorkflowAlarmInfo(WorkflowBatchResponseDO workflowBatchResponseDO);
}

View File

@ -20,7 +20,6 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipien
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
@ -65,12 +64,12 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
// 通知场景
Set<Integer> notifyScene = new HashSet<>();
// 通知配置
Set<Long> notifyIds = new HashSet<>();
// 转换AlarmDTO 为了下面循环发送使用
Map<Set<Long>, List<A>> waitSendAlarmInfos = convertAlarmDTO(alarmInfos, notifyScene, notifyIds);
Map<Set<Long>, List<A>> waitSendAlarmInfos = convertAlarmDTO(alarmInfos, notifyScene);
waitSendAlarmInfos.keySet().stream().map(i -> notifyIds.addAll(i)).collect(Collectors.toSet());
// 批量获取通知配置
Map<Set<Long>, List<NotifyConfigInfo>> notifyConfig = obtainNotifyConfig(notifyScene, notifyIds);
@ -134,12 +133,16 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
});
notifyConfigInfo.setRecipientInfos(recipients);
}
return ImmutableMap.of(notifyIds, notifyConfigInfos);
Map<Set<Long>, List<NotifyConfigInfo>> notifyConfigInfo = new HashMap<>();
for (Long notifyId : notifyIds) {
notifyConfigInfo.put(Collections.singleton(notifyId), notifyConfigInfos);
}
return notifyConfigInfo;
}
protected abstract List<SyetemTaskTypeEnum> getSystemTaskType();
protected abstract Map<Set<Long>, List<A>> convertAlarmDTO(List<A> alarmData, Set<Integer> notifyScene, Set<Long> notifyIds);
protected abstract Map<Set<Long>, List<A>> convertAlarmDTO(List<A> alarmData, Set<Integer> notifyScene);
protected abstract List<A> poll() throws InterruptedException;

View File

@ -2,14 +2,19 @@ package com.aizuda.snailjob.server.common.alarm;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
@ -18,16 +23,34 @@ import java.util.Set;
*/
public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, JobAlarmInfo> {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Override
protected Map<Set<Long>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> jobAlarmInfoList, Set<Integer> notifyScene, Set<Long> notifyIds) {
protected Map<Set<Long>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> jobAlarmInfoList, Set<Integer> notifyScene) {
return StreamUtils.groupByKey(jobAlarmInfoList, jobAlarmInfo -> {
Map<Set<Long>, List<JobAlarmInfo>> jobAlarmInfoMap = new HashMap<>();
jobAlarmInfoList.stream().forEach(i -> notifyScene.add(i.getNotifyScene()));
Set<Long> jobNotifyIds = StrUtil.isBlank(jobAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(jobAlarmInfo.getNotifyIds(), Long.class));
Map<Long, JobAlarmInfo> jobAlarmInfoGroupMap = jobAlarmInfoList.stream().collect(Collectors.toMap(i -> i.getId(), Function.identity()));
notifyScene.add(jobAlarmInfo.getNotifyScene());
notifyIds.addAll(jobNotifyIds);
return jobNotifyIds;
});
// 查询数据库
QueryWrapper<JobTaskBatch> wrapper = new QueryWrapper<JobTaskBatch>()
.in("batch.id", jobAlarmInfoList.stream().map(i -> i.getId()).collect(Collectors.toSet()))
.eq("batch.deleted", 0);
List<JobBatchResponseDO> jobBatchResponseDOList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
for (JobBatchResponseDO jobBatchResponseDO : jobBatchResponseDOList) {
Set<Long> jobNotifyIds = StrUtil.isBlank(jobBatchResponseDO.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(jobBatchResponseDO.getNotifyIds(), Long.class));
for (Long jobNotifyId : jobNotifyIds) {
JobAlarmInfo jobAlarmInfo = AlarmInfoConverter.INSTANCE.toJobAlarmInfo(jobBatchResponseDO);
JobAlarmInfo alarmInfo = jobAlarmInfoGroupMap.get(jobBatchResponseDO.getId());
jobAlarmInfo.setReason(alarmInfo.getReason());
jobAlarmInfoMap.put(Collections.singleton(jobNotifyId), Lists.newArrayList(jobAlarmInfo));
}
}
return jobAlarmInfoMap;
}
}

View File

@ -2,15 +2,15 @@ package com.aizuda.snailjob.server.common.alarm;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.google.common.collect.Lists;
import org.springframework.context.ApplicationEvent;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
@ -20,20 +20,42 @@ import java.util.Set;
public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, RetryAlarmInfo> {
@Override
protected Map<Set<Long>, List<RetryAlarmInfo>> convertAlarmDTO(List<RetryAlarmInfo> retryAlarmInfoList, Set<Integer> notifyScene, Set<Long> notifyIds) {
return StreamUtils.groupByKey(retryAlarmInfoList, retryAlarmInfo -> {
protected Map<Set<Long>, List<RetryAlarmInfo>> convertAlarmDTO(List<RetryAlarmInfo> retryAlarmInfoList, Set<Integer> notifyScene) {
Map<Set<Long>, List<RetryAlarmInfo>> retryAlarmInfoMap = new HashMap<>();
// 重试任务查询场景告警通知
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(
Set<String> groupNames = new HashSet<>(), sceneNames = new HashSet<>(), namespaceIds = new HashSet<>();
for (RetryAlarmInfo retryAlarmInfo : retryAlarmInfoList) {
groupNames.add(retryAlarmInfo.getGroupName());
sceneNames.add(retryAlarmInfo.getSceneName());
namespaceIds.add(retryAlarmInfo.getNamespaceId());
notifyScene.add(retryAlarmInfo.getNotifyScene());
}
// 按组名场景名命名空间分组
Map<ImmutableTriple<String, String, String>, RetrySceneConfig> retrySceneConfigMap = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneNameList(
groupNames,
sceneNames,
namespaceIds).stream().collect(Collectors.toMap(i -> ImmutableTriple.of(
i.getGroupName(),
i.getSceneName(),
i.getNamespaceId()),
Function.identity()
));
for (RetryAlarmInfo retryAlarmInfo : retryAlarmInfoList) {
RetrySceneConfig retrySceneConfig = retrySceneConfigMap.get(ImmutableTriple.of(
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getNamespaceId());
retryAlarmInfo.getNamespaceId()));
Set<Long> retryNotifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
notifyScene.add(retryAlarmInfo.getNotifyScene());
notifyIds.addAll(retryNotifyIds);
return retryNotifyIds;
});
for (Long retryNotifyId : retryNotifyIds) {
retryAlarmInfoMap.put(Collections.singleton(retryNotifyId), Lists.newArrayList(retryAlarmInfo));
}
}
return retryAlarmInfoMap;
}
}

View File

@ -2,14 +2,19 @@ package com.aizuda.snailjob.server.common.alarm;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
@ -18,16 +23,33 @@ import java.util.Set;
*/
public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, WorkflowAlarmInfo> {
@Autowired
private WorkflowTaskBatchMapper workflowTaskBatchMapper;
@Override
protected Map<Set<Long>, List<WorkflowAlarmInfo>> convertAlarmDTO(List<WorkflowAlarmInfo> workflowAlarmInfoList, Set<Integer> notifyScene, Set<Long> notifyIds) {
protected Map<Set<Long>, List<WorkflowAlarmInfo>> convertAlarmDTO(List<WorkflowAlarmInfo> workflowAlarmInfoList, Set<Integer> notifyScene) {
return StreamUtils.groupByKey(workflowAlarmInfoList, workflowAlarmInfo -> {
Map<Set<Long>, List<WorkflowAlarmInfo>> workflowAlarmInfoMap = new HashMap<>();
workflowAlarmInfoList.stream().forEach(i -> notifyScene.add(i.getNotifyScene()));
Set<Long> workflowNotifyIds = StrUtil.isBlank(workflowAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowAlarmInfo.getNotifyIds(), Long.class));
Map<Long, WorkflowAlarmInfo> workflowAlarmInfoGroupMap = workflowAlarmInfoList.stream().collect(Collectors.toMap(i -> i.getId(), Function.identity()));
notifyScene.add(workflowAlarmInfo.getNotifyScene());
notifyIds.addAll(workflowNotifyIds);
return workflowNotifyIds;
});
List<WorkflowBatchResponseDO> workflowBatchResponseDOList = workflowTaskBatchMapper.selectWorkflowBatchList(
new QueryWrapper<WorkflowTaskBatch>()
.in("batch.id", workflowAlarmInfoList.stream().map(i -> i.getId()).collect(Collectors.toSet()))
.eq("batch.deleted", 0));
for (WorkflowBatchResponseDO workflowBatchResponseDO : workflowBatchResponseDOList) {
Set<Long> workflowNotifyIds = StrUtil.isBlank(workflowBatchResponseDO.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowBatchResponseDO.getNotifyIds(), Long.class));
for (Long workflowNotifyId : workflowNotifyIds) {
WorkflowAlarmInfo workflowAlarmInfo = AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfo(workflowBatchResponseDO);
WorkflowAlarmInfo alarmInfo = workflowAlarmInfoGroupMap.get(workflowAlarmInfo.getId());
workflowAlarmInfo.setReason(alarmInfo.getReason());
workflowAlarmInfoMap.put(Collections.singleton(workflowNotifyId), Lists.newArrayList(workflowAlarmInfo));
}
}
return workflowAlarmInfoMap;
}
}

View File

@ -21,14 +21,14 @@ public class AlarmInfo {
*/
private String notifyIds;
/**
* 失败原因
*/
private String reason;
/**
* 通知场景
*/
private Integer notifyScene;
/**
* 失败原因
*/
private String reason;
}

View File

@ -0,0 +1,29 @@
package com.aizuda.snailjob.server.job.task.dto;
import lombok.Builder;
import lombok.Data;
/**
* @author zhengweilin
* @version 1.0.0
* @date 2024/12/23
*/
@Data
@Builder
public class WorkflowTaskFailAlarmEventDTO {
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
/**
* 通知场景
*/
private Integer notifyScene;
/**
* 失败原因
*/
private String reason;
}

View File

@ -6,7 +6,6 @@ import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.job.task.dto.*;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.snailjob.server.job.task.support.callback.ClientCallbackContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
@ -73,8 +72,6 @@ public interface JobTaskConverter {
TaskStopJobContext toStopJobContext(JobExecutorResultContext context);
TaskStopJobContext toStopJobContext(JobExecutorResultDTO context);
@Mappings(
@Mapping(source = "id", target = "jobId")
)
@ -142,9 +139,14 @@ public interface JobTaskConverter {
JobLogMessage toJobLogMessage(JobLogMessage jobLogMessage);
ReduceTaskDTO toReduceTaskDTO(CompleteJobBatchDTO jobBatchDTO);
ReduceTaskDTO toReduceTaskDTO(JobExecutorResultContext context);
JobExecutorResultContext toJobExecutorResultContext(CompleteJobBatchDTO completeJobBatchDTO);
List<JobAlarmInfo> toJobTaskFailAlarmEventDTO(List<JobTaskFailAlarmEventDTO> jobTaskFailAlarmEventDTOList);
@Mappings(
@Mapping(source = "jobTaskBatchId", target = "id")
)
JobAlarmInfo toJobAlarmInfo(JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO);
}

View File

@ -1,13 +1,13 @@
package com.aizuda.snailjob.server.job.task.support;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.block.workflow.WorkflowBlockStrategyContext;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
@ -54,5 +54,10 @@ public interface WorkflowTaskConverter {
WorkflowBlockStrategyContext toWorkflowBlockStrategyContext(WorkflowTaskPrepareDTO prepareDTO);
List<CallbackParamsDTO> toCallbackParamsDTO(List<JobTask> tasks);
List<WorkflowAlarmInfo> toWorkflowTaskFailAlarmEventDTO(List<WorkflowTaskFailAlarmEventDTO> workflowTaskFailAlarmEventDTOList);
@Mappings(
@Mapping(source = "workflowTaskBatchId", target = "id")
)
WorkflowAlarmInfo toWorkflowTaskFailAlarmEventDTO(WorkflowTaskFailAlarmEventDTO workflowTaskFailAlarmEventDTO);
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.alarm.event;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
@ -13,11 +14,11 @@ import org.springframework.context.ApplicationEvent;
@Getter
public class WorkflowTaskFailAlarmEvent extends ApplicationEvent {
private final Long workflowTaskBatchId;
private WorkflowTaskFailAlarmEventDTO workflowTaskFailAlarmEventDTO;
public WorkflowTaskFailAlarmEvent(Long workflowTaskBatchId) {
super(workflowTaskBatchId);
this.workflowTaskBatchId = workflowTaskBatchId;
public WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO workflowTaskFailAlarmEventDTO) {
super(workflowTaskFailAlarmEventDTO);
this.workflowTaskFailAlarmEventDTO = workflowTaskFailAlarmEventDTO;
}
}

View File

@ -5,25 +5,21 @@ import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.alarm.AbstractJobAlarm;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
@ -41,8 +37,6 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmEvent> {
private final JobTaskBatchMapper jobTaskBatchMapper;
/**
* job任务失败数据
*/
@ -69,13 +63,11 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
}
// 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(jobTaskFailAlarmEventDTO.getJobTaskBatchId());
queue.drainTo(Collections.singleton(jobTaskBatchIds), 200);
QueryWrapper<JobTaskBatch> wrapper = new QueryWrapper<JobTaskBatch>()
.in("batch.id", jobTaskBatchIds)
.eq("batch.deleted", 0);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
return AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList);
ArrayList<JobTaskFailAlarmEventDTO> lists = Lists.newArrayList(jobTaskFailAlarmEventDTO);
queue.drainTo(lists, 200);
// 数据类型转换
return JobTaskConverter.INSTANCE.toJobTaskFailAlarmEventDTO(lists);
}
@Override

View File

@ -5,23 +5,21 @@ import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.alarm.AbstractWorkflowAlarm;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
@ -39,13 +37,13 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<WorkflowTaskFailAlarmEvent> {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(1000);
private final LinkedBlockingQueue<WorkflowTaskFailAlarmEventDTO> queue = new LinkedBlockingQueue<>(1000);
private static final String MESSAGES_FORMATTER = """
<font face=微软雅黑 color=#ff0000 size=4>{}环境 Workflow任务执行失败</font>\s
> 空间ID:{} \s
> 组名称:{} \s
> 工作流名称:{} \s
> 通知场景:{} \s
> 失败原因:{} \s
> 时间:{};
""";
@ -53,25 +51,21 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<Workflo
@Override
protected List<WorkflowAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
Long workflowTaskBatchId = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(workflowTaskBatchId)) {
WorkflowTaskFailAlarmEventDTO workflowTaskFailAlarmEventDTO = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(workflowTaskFailAlarmEventDTO)) {
return Lists.newArrayList();
}
// 拉取200条
List<Long> workflowTaskBatchIds = Lists.newArrayList(workflowTaskBatchId);
queue.drainTo(workflowTaskBatchIds, 200);
ArrayList<WorkflowTaskFailAlarmEventDTO> lists = Lists.newArrayList(workflowTaskFailAlarmEventDTO);
queue.drainTo(lists, 200);
List<WorkflowBatchResponseDO> workflowTaskBatches = workflowTaskBatchMapper.selectWorkflowBatchList(
new QueryWrapper<WorkflowTaskBatch>()
.in("batch.id", workflowTaskBatchIds)
.eq("batch.deleted", 0));
return AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfos(workflowTaskBatches);
// 数据类型转换
return WorkflowTaskConverter.INSTANCE.toWorkflowTaskFailAlarmEventDTO(lists);
}
@Override
protected AlarmContext buildAlarmContext(WorkflowAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) {
String desc = JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc();
// 预警
return AlarmContext.build()
@ -80,7 +74,8 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<Workflo
alarmDTO.getNamespaceId(),
alarmDTO.getGroupName(),
alarmDTO.getWorkflowName(),
desc,
JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc(),
alarmDTO.getReason(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 Workflow任务执行失败", EnvironmentUtils.getActiveProfile());
}
@ -103,7 +98,7 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<Workflo
@Override
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
public void doOnApplicationEvent(WorkflowTaskFailAlarmEvent event) {
if (!queue.offer(event.getWorkflowTaskBatchId())) {
if (!queue.offer(event.getWorkflowTaskFailAlarmEventDTO())) {
SnailJobLog.LOCAL.warn("Workflow任务执行失败告警队列已满");
}
}

View File

@ -92,10 +92,6 @@ public class JobExecutorActor extends AbstractActor {
} catch (Exception e) {
SnailJobLog.LOCAL.error("job executor exception. [{}]", taskExecute, e);
handleTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskExecute.getTaskBatchId())
.build()));
} finally {
getContext().stop(getSelf());
}
@ -137,6 +133,7 @@ public class JobExecutorActor extends AbstractActor {
.reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc())
.notifyScene(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getNotifyScene())
.build()));
return;
}
// 更新状态
@ -240,9 +237,10 @@ public class JobExecutorActor extends AbstractActor {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskExecute.getTaskBatchId())
.reason(JobOperationReasonEnum.TASK_EXECUTION_ERROR.getDesc())
.notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene())
.build()));
}
}
}

View File

@ -5,10 +5,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.FailStrategyEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -16,6 +13,7 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
@ -76,10 +74,15 @@ public class WorkflowExecutorActor extends AbstractActor {
} catch (Exception e) {
SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(),
handlerTaskBatch(taskExecute,
JobTaskBatchStatusEnum.FAIL.getStatus(),
JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SnailSpringContext.getContext().publishEvent(
new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId()));
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(taskExecute.getWorkflowTaskBatchId())
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason(e.getMessage())
.build()));
} finally {
getContext().stop(getSelf());
}

View File

@ -5,6 +5,7 @@ import akka.actor.ActorRef;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
@ -124,6 +125,8 @@ public class RequestClientActor extends AbstractActor {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(dispatchJobRequest.getTaskBatchId())
.reason(throwable.getMessage())
.notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene())
.build()));
}

View File

@ -9,6 +9,7 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.rpc.okhttp.RequestInterceptor;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
@ -114,7 +115,11 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
}
message = throwable.getMessage();
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(context.getWorkflowTaskBatchId()));
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(context.getWorkflowTaskBatchId())
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason(message)
.build()));
}
context.setEvaluationResult(result);

View File

@ -13,6 +13,7 @@ import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -96,7 +97,12 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(context.getWorkflowTaskBatchId()));
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(context.getWorkflowTaskBatchId())
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason(message)
.build()));
}
} else {
result = Boolean.TRUE;

View File

@ -6,6 +6,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
@ -18,6 +19,7 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
@ -155,7 +157,12 @@ public class WorkflowBatchHandler {
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason()
&& JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(workflowTaskBatchId)
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason("只要叶子节点不是无需处理的都是失败")
.build()));
}
}
}
@ -191,7 +198,12 @@ public class WorkflowBatchHandler {
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(workflowTaskBatch),
() -> new SnailJobServerException("停止工作流批次失败. id:[{}]",
workflowTaskBatchId));
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(workflowTaskBatchId)
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason("停止工作流批次失败")
.build()));
// 关闭已经触发的任务
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()

View File

@ -1,12 +1,13 @@
package com.aizuda.snailjob.server.job.task.support.prepare.workflow;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
@ -53,11 +54,15 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
// 2. 判断DAG是否已经支持超时
// 计算超时时间到达超时时间中断任务
if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) {
log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]",
prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(prepare.getWorkflowTaskBatchId()));
String reason = String.format("任务执行超时.workflowTaskBatchId:[%s] delay:[%s] executorTimeout:[%s]", prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(prepare.getWorkflowTaskBatchId())
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason(reason)
.build()));
log.info(reason);
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.result.job;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
@ -72,6 +73,7 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(context.getTaskBatchId())
.reason(context.getMessage())
.notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene())
.build()));
doHandleFail(context);
} else if (stopCount > 0) {

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -71,6 +72,8 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskBatchId)
.reason("超时中断.taskBatchId:[" + taskBatchId + "]")
.notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene())
.build()));
SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskBatchId);
}

View File

@ -1,10 +1,12 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -40,8 +42,15 @@ public class WorkflowTimeoutCheckTask implements TimerTask<String> {
// 超时停止任务
workflowBatchHandler.stop(workflowTaskBatchId, JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
SnailJobLog.LOCAL.info("超时中断.workflowTaskBatchId:[{}]", workflowTaskBatchId);
String reason = String.format("超时中断.workflowTaskBatchId:[%s]", workflowTaskBatchId);
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder()
.workflowTaskBatchId(workflowTaskBatchId)
.notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene())
.reason(reason)
.build()));
SnailJobLog.LOCAL.info(reason);
}
@Override

View File

@ -1,15 +1,12 @@
package com.aizuda.snailjob.template.datasource.persistence.dataobject;
package com.aizuda.snailjob.server.retry.task.dto;
import com.aizuda.snailjob.template.datasource.persistence.po.CreateUpdateDt;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 重试任务失败告警
*/
@Data
public class RetryTaskFailAlarmEventDO extends CreateUpdateDt {
public class RetryTaskExecutorDTO extends CreateUpdateDt {
private Long id;

View File

@ -0,0 +1,48 @@
package com.aizuda.snailjob.server.retry.task.dto;
import com.aizuda.snailjob.template.datasource.persistence.po.CreateUpdateDt;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author zhengweilin
* @version 1.0.0
* @date 2024/12/23
*/
@Data
public class RetryTaskFailAlarmEventDTO extends CreateUpdateDt {
private Long id;
private String namespaceId;
private String uniqueId;
private String groupName;
private String sceneName;
private String idempotentId;
private String bizNo;
private String argsStr;
private String extAttrs;
private String executorName;
private LocalDateTime nextTriggerAt;
private Integer retryCount;
private Integer retryStatus;
private Integer taskType;
private Integer notifyScene;
private String reason;
}

View File

@ -41,6 +41,13 @@ public interface RetryContext<V> {
*/
void setException(Exception e);
/**
* 获取客户端发送异常信息
*
* @return
*/
Exception getException();
/**
* 是否发生异常
*

View File

@ -2,14 +2,16 @@ package com.aizuda.snailjob.server.retry.task.support;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.model.dto.RetryLogTaskDTO;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.RetryTaskFailAlarmEventDO;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@ -29,9 +31,9 @@ public interface RetryTaskConverter {
RetryTaskConverter INSTANCE = Mappers.getMapper(RetryTaskConverter.class);
RetryTask toRetryTask(RetryTaskDTO retryTaskDTO);
RetryTask toRetryTask(RetryTaskExecutorDTO retryTaskExecutorDTO);
RetryTask toRetryTask(RetryTaskFailAlarmEventDO retryTaskFailAlarmEventDO);
RetryTask toRetryTask(RetryTaskDTO retryTaskDTO);
RetryTask toRetryTask(RetryTask retryTask);
@ -69,4 +71,18 @@ public interface RetryTaskConverter {
RetryLogMetaDTO toLogMetaDTO(RetryTask retryTask);
@Mappings({
@Mapping(source = "reason", target = "reason"),
@Mapping(source = "notifyScene", target = "notifyScene")
})
RetryTaskExecutorDTO toRetryTaskExecutorDTO(RetryTask retryTask, String reason, Integer notifyScene);
@Mappings({
@Mapping(source = "reason", target = "reason"),
@Mapping(source = "notifyScene", target = "notifyScene")
})
RetryTaskFailAlarmEventDTO toRetryTaskFailAlarmEventDTO(RetryTask retryTask, String reason, Integer notifyScene);
List<RetryAlarmInfo> toRetryTaskFailAlarmEventDTO(List<RetryTaskFailAlarmEventDTO> retryTaskFailAlarmEventDTOList);
}

View File

@ -71,6 +71,11 @@ public class MaxAttemptsPersistenceRetryContext<V> implements RetryContext<V> {
return Objects.nonNull(exception);
}
@Override
public Exception getException() {
return exception;
}
@Override
public RetrySceneConfig sceneConfig() {
return retrySceneConfig;

View File

@ -11,12 +11,13 @@ import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.RetryTaskFailAlarmEventDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
@ -43,6 +44,7 @@ import java.time.LocalDateTime;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor
public class FailureActor extends AbstractActor {
private final IdempotentStrategy<String> idempotentStrategy = IdempotentHolder.getRetryIdempotent();
private final AccessTemplate accessTemplate;
private final CallbackRetryTaskHandler callbackRetryTaskHandler;
@ -52,10 +54,10 @@ public class FailureActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTaskFailAlarmEventDO.class, retryTaskFailAlarmEventDO -> {
return receiveBuilder().match(RetryTaskExecutorDTO.class, retryTaskExecutorDTO -> {
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTaskFailAlarmEventDO);
SnailJobLog.LOCAL.debug("FailureActor params:[{}]", retryTask);
SnailJobLog.LOCAL.debug("FailureActor params:[{}]", retryTaskExecutorDTO);
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTaskExecutorDTO);
try {
// 超过最大等级
RetrySceneConfig retrySceneConfig =
@ -93,7 +95,12 @@ public class FailureActor extends AbstractActor {
.eq(RetryTaskLog::getUniqueId, retryTask.getUniqueId())
.eq(RetryTaskLog::getGroupName, retryTask.getGroupName()));
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDO));
RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO =
RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
retryTask,
retryTaskExecutorDTO.getReason(),
retryTaskExecutorDTO.getNotifyScene());
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO));
}
});
} catch (Exception e) {

View File

@ -8,6 +8,8 @@ import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
@ -46,11 +48,11 @@ public class FinishActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTask.class, retryTask -> {
SnailJobLog.LOCAL.debug("FinishActor params:[{}]", retryTask);
return receiveBuilder().match(RetryTaskExecutorDTO.class, retryTaskExecutorDTO -> {
SnailJobLog.LOCAL.debug("FinishActor params:[{}]", retryTaskExecutorDTO);
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTaskExecutorDTO);
retryTask.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
try {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
@ -86,7 +88,6 @@ public class FinishActor extends AbstractActor {
}
}).build();
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.event;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.RetryTaskFailAlarmEventDO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
@ -14,10 +14,10 @@ import org.springframework.context.ApplicationEvent;
@Getter
public class RetryTaskFailAlarmEvent extends ApplicationEvent {
private RetryTaskFailAlarmEventDO retryTaskFailAlarmEventDO;
private RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO;
public RetryTaskFailAlarmEvent(RetryTaskFailAlarmEventDO retryTaskFailAlarmEventDO) {
super(retryTaskFailAlarmEventDO);
this.retryTaskFailAlarmEventDO = retryTaskFailAlarmEventDO;
public RetryTaskFailAlarmEvent(RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO) {
super(retryTaskFailAlarmEventDTO);
this.retryTaskFailAlarmEventDTO = retryTaskFailAlarmEventDTO;
}
}

View File

@ -1,20 +1,18 @@
package com.aizuda.snailjob.server.retry.task.support.listener;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.RetryTaskFailAlarmEventDO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -41,7 +39,7 @@ public class RetryTaskFailAlarmListener extends
/**
* 死信告警数据
*/
private final LinkedBlockingQueue<RetryTaskFailAlarmEventDO> queue = new LinkedBlockingQueue<>(1000);
private final LinkedBlockingQueue<RetryTaskFailAlarmEventDTO> queue = new LinkedBlockingQueue<>(1000);
private static final String retryTaskDeadTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务执行失败</font> \n" +
@ -63,22 +61,23 @@ public class RetryTaskFailAlarmListener extends
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
RetryTaskFailAlarmEventDO retryTaskFailAlarmEventDO = queue.poll(100, TimeUnit.MILLISECONDS);
RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDO = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(retryTaskFailAlarmEventDO)) {
return Lists.newArrayList();
}
// 拉取200条
List<RetryTaskFailAlarmEventDO> lists = Lists.newArrayList(retryTaskFailAlarmEventDO);
List<RetryTaskFailAlarmEventDTO> lists = Lists.newArrayList(retryTaskFailAlarmEventDO);
queue.drainTo(lists, 200);
return AlarmInfoConverter.INSTANCE.retryTaskToAlarmInfo(lists);
// 数据类型转换
return RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(lists);
}
@Override
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
public void doOnApplicationEvent(RetryTaskFailAlarmEvent retryTaskFailAlarmEvent) {
if (!queue.offer(retryTaskFailAlarmEvent.getRetryTaskFailAlarmEventDO())) {
if (!queue.offer(retryTaskFailAlarmEvent.getRetryTaskFailAlarmEventDTO())) {
SnailJobLog.LOCAL.warn("任务重试失败告警队列已满");
}
}

View File

@ -2,19 +2,19 @@ package com.aizuda.snailjob.server.retry.task.support.retry;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO;
import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.StopStrategy;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.RetryTaskFailAlarmEventDO;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@ -98,12 +98,12 @@ public class RetryExecutor<V> {
actorRef = ActorGenerator.failureActor();
}
RetryTaskFailAlarmEventDO retryTaskFailAlarmEventDO =
AlarmInfoConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
RetryTaskExecutorDTO retryTaskExecutorDTO =
RetryTaskConverter.INSTANCE.toRetryTaskExecutorDTO(
retryContext.getRetryTask(),
((Result) call).getMessage(),
retryContext.hasException() ? retryContext.getException().getCause().getMessage() : ((DispatchRetryResultDTO) ((Result) call).getData()).getExceptionMsg(),
RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR.getNotifyScene());
actorRef.tell(retryTaskFailAlarmEventDO, actorRef);
actorRef.tell(retryTaskExecutorDTO, actorRef);
return call;
}

View File

@ -4,17 +4,17 @@ import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.RetryTaskFailAlarmEventDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
@ -205,12 +205,12 @@ public class FilterStrategies {
}
if (result == false) {
RetryTaskFailAlarmEventDO retryTaskFailAlarmEventDO =
AlarmInfoConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
RetryTaskFailAlarmEventDTO toRetryTaskFailAlarmEventDTO =
RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
retryTask,
description.toString(),
RetryNotifySceneEnum.RETRY_NO_CLIENT_NODES_ERROR.getNotifyScene());
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDO));
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(toRetryTaskFailAlarmEventDTO));
}
return Pair.of(result, description);