feat(sj_1.0.0): 工作流接入失败告警模块

This commit is contained in:
opensnail 2024-05-05 20:55:30 +08:00
parent 05c9c06bd0
commit 6104f3a34d
17 changed files with 292 additions and 108 deletions

View File

@ -11,7 +11,14 @@ import lombok.Getter;
@Getter
public enum JobNotifySceneEnum {
JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER);
/********************************Job****************************************/
JOB_TASK_ERROR(1, "JOB任务执行失败", NodeTypeEnum.SERVER),
/********************************Workflow****************************************/
WORKFLOW_TASK_ERROR(100, "Workflow任务执行失败", NodeTypeEnum.SERVER),
WORKFLOW_TASK_CALLBACK_ERROR(101, "回调节点任务执行失败", NodeTypeEnum.SERVER)
;
/**
* 通知场景

View File

@ -5,14 +5,13 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
@ -63,4 +62,6 @@ public interface AlarmInfoConverter {
List<JobAlarmInfo> toJobAlarmInfos(List<JobBatchResponseDO> jobBatchResponse);
List<WorkflowAlarmInfo> toWorkflowAlarmInfos(List<WorkflowTaskBatch> jobBatchResponse);
}

View File

@ -26,10 +26,13 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -40,60 +43,64 @@ import java.util.stream.Collectors;
* @since 2.5.0
*/
@Slf4j
public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmInfo> implements ApplicationListener<E>, Runnable,
public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmInfo> implements ApplicationListener<E>,
Runnable,
Lifecycle {
@Autowired
@Qualifier("alarmExecutorService")
protected TaskScheduler taskScheduler;
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
protected NotifyRecipientMapper recipientMapper;
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 从队列获取数据
List<A> alarmInfos = poll();
if (CollectionUtils.isEmpty(alarmInfos)) {
continue;
}
// 获取所有的命名空间
Set<String> namespaceIds = new HashSet<>();
// 获取所有的组名称
Set<String> groupNames = new HashSet<>();
// 获取所有的场景名称
Set<String> sceneNames = new HashSet<>();
// 转换AlarmDTO 为了下面循环发送使用
Map<Triple<String, String, String>, List<A>> waitSendAlarmInfos = convertAlarmDTO(
alarmInfos, namespaceIds, groupNames, sceneNames);
// 批量获取通知配置
Map<Triple<String, String, String>, List<NotifyConfigInfo>> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames);
// 循环发送消息
waitSendAlarmInfos.forEach((key, list) -> {
List<NotifyConfigInfo> notifyConfigsList = notifyConfig.getOrDefault(key, Lists.newArrayList());
for (A alarmDTO : list) {
sendAlarm(notifyConfigsList, alarmDTO);
}
});
} catch (InterruptedException e) {
SnailJobLog.LOCAL.info("retry task fail dead letter alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
SnailJobLog.LOCAL.error("RetryTaskFailDeadLetterAlarmListener queue poll Exception", e);
try {
// 从队列获取数据
List<A> alarmInfos = poll();
if (CollectionUtils.isEmpty(alarmInfos)) {
return;
}
// 获取所有的命名空间
Set<String> namespaceIds = new HashSet<>();
// 获取所有的组名称
Set<String> groupNames = new HashSet<>();
// 获取所有的场景名称
Set<String> businessIds = new HashSet<>();
// 转换AlarmDTO 为了下面循环发送使用
Map<Triple<String, String, String>, List<A>> waitSendAlarmInfos = convertAlarmDTO(
alarmInfos, namespaceIds, groupNames, businessIds);
// 批量获取通知配置
Map<Triple<String, String, String>, List<NotifyConfigInfo>> notifyConfig = obtainNotifyConfig(namespaceIds,
groupNames, businessIds);
// 循环发送消息
waitSendAlarmInfos.forEach((key, list) -> {
List<NotifyConfigInfo> notifyConfigsList = notifyConfig.getOrDefault(key, Lists.newArrayList());
for (A alarmDTO : list) {
sendAlarm(notifyConfigsList, alarmDTO);
}
});
} catch (InterruptedException e) {
SnailJobLog.LOCAL.info("retry task fail dead letter alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
SnailJobLog.LOCAL.error("RetryTaskFailDeadLetterAlarmListener queue poll Exception", e);
}
}
protected Map<Triple<String, String, String>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds, Set<String> groupNames, Set<String> businessIds) {
protected Map<Triple<String, String, String>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds,
Set<String> groupNames, Set<String> businessIds) {
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.in(NotifyConfig::getSystemTaskType, getSystemTaskType())
@ -119,7 +126,7 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
List<NotifyRecipient> notifyRecipients = recipientMapper.selectBatchIds(recipientIds);
Map<Long, NotifyRecipient> recipientMap = notifyRecipients.stream()
.collect(Collectors.toMap(NotifyRecipient::getId, i->i));
.collect(Collectors.toMap(NotifyRecipient::getId, i -> i));
if (CollectionUtils.isEmpty(recipientIds)) {
return Maps.newHashMap();
@ -150,29 +157,23 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
protected abstract List<SyetemTaskTypeEnum> getSystemTaskType();
protected abstract Map<Triple<String, String, String>, List<A>> convertAlarmDTO(List<A> alarmData, Set<String> namespaceIds, Set<String> groupNames, Set<String> sceneNames);
protected abstract Map<Triple<String, String, String>, List<A>> convertAlarmDTO(List<A> alarmData,
Set<String> namespaceIds, Set<String> groupNames, Set<String> sceneNames);
protected abstract List<A> poll() throws InterruptedException;
protected abstract AlarmContext buildAlarmContext(A alarmDTO, NotifyConfigInfo notifyConfig);
private Thread thread;
@Override
public void start() {
thread = new Thread(this);
thread.start();
startLog();
taskScheduler.scheduleAtFixedRate(this, Duration.parse("PT1S"));
}
protected abstract void startLog();
@Override
public void close() {
if (Objects.nonNull(thread)) {
thread.interrupt();
}
}
protected void sendAlarm(List<NotifyConfigInfo> notifyConfigsList, A alarmDTO) {
@ -180,7 +181,7 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
// 限流
RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()),
notifyConfig.getRateLimiterThreshold());
notifyConfig.getRateLimiterThreshold());
// 每秒发送rateLimiterThreshold个告警
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
continue;
@ -188,9 +189,9 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
}
if (Objects.nonNull(alarmDTO.getCount()) && Objects.nonNull(notifyConfig.getNotifyThreshold())) {
if (notifyConfig.getNotifyThreshold() >= alarmDTO.getCount()) {
continue;
}
if (notifyConfig.getNotifyThreshold() >= alarmDTO.getCount()) {
continue;
}
}
for (final RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) {
@ -199,9 +200,9 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
context.setNotifyAttribute(recipientInfo.getNotifyAttribute());
Alarm<AlarmContext> alarmType = SnailJobAlarmFactory.getAlarmType(
Alarm<AlarmContext> alarm = SnailJobAlarmFactory.getAlarmType(
recipientInfo.getNotifyType());
alarmType.asyncSendMessage(context);
alarm.asyncSendMessage(context);
}
}

View File

@ -0,0 +1,33 @@
package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple;
import org.springframework.context.ApplicationEvent;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
* @date 2023-12-03 10:19:19
* @since 2.5.0
*/
public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, WorkflowAlarmInfo> {
@Override
protected Map<Triple<String, String, String>, List<WorkflowAlarmInfo>> convertAlarmDTO(List<WorkflowAlarmInfo> alarmInfos, Set<String> namespaceIds, Set<String> groupNames, Set<String> jobIds) {
return alarmInfos.stream().collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
String jobId = String.valueOf(i.getWorkflowId());
namespaceIds.add(namespaceId);
groupNames.add(groupName);
jobIds.add(jobId);
return ImmutableTriple.of(namespaceId, groupName, jobId);
}));
}
}

View File

@ -24,4 +24,12 @@ public class SnailJobServerCommonAutoConfiguration {
scheduler.setThreadNamePrefix("snail-job-scheduled-thread-");
return scheduler;
}
@Bean
public TaskScheduler alarmExecutorService() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(4);
scheduler.setThreadNamePrefix("snail-job-alarm-thread-");
return scheduler;
}
}

View File

@ -13,17 +13,6 @@ import lombok.EqualsAndHashCode;
public class JobAlarmInfo extends AlarmInfo {
private Long id;
/**
* 命名空间
*/
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/

View File

@ -0,0 +1,27 @@
package com.aizuda.snailjob.server.common.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author opensnail
* @date 2024-05-05
* @since sj_1.0.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class WorkflowAlarmInfo extends AlarmInfo {
private Long id;
/**
* 名称
*/
private String workflowName;
/**
* 任务信息id
*/
private Long workflowId;
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support;
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.job.task.dto.BaseDTO;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
@ -10,6 +11,7 @@ import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.dto.RealStopTaskInstanceDTO;
import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
@ -23,13 +25,6 @@ import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.snailjob.server.job.task.support.callback.ClientCallbackContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
@ -145,4 +140,6 @@ public interface JobTaskConverter {
JobLogMessage toJobLogMessage(JobLogMessage jobLogMessage);
List<JobAlarmInfo> toJobAlarmInfos(List<JobTaskFailAlarmEvent> events);
}

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.server.job.task.support.event;
package com.aizuda.snailjob.server.job.task.support.alarm.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;

View File

@ -0,0 +1,23 @@
package com.aizuda.snailjob.server.job.task.support.alarm.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* workflow任务失败事件
*
* @author: opensnail
* @date : 2023-12-02 21:40
* @since sj_1.0.0
*/
@Getter
public class WorkflowTaskFailAlarmEvent extends ApplicationEvent {
private final Long workflowTaskBatchId;
public WorkflowTaskFailAlarmEvent(Long workflowTaskBatchId) {
super(workflowTaskBatchId);
this.workflowTaskBatchId = workflowTaskBatchId;
}
}

View File

@ -1,31 +1,28 @@
package com.aizuda.snailjob.server.job.task.support.listener;
package com.aizuda.snailjob.server.job.task.support.alarm.listener;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.alarm.AbstractJobAlarm;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
@ -38,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
@Component
@RequiredArgsConstructor
public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmEvent> {
private final JobTaskBatchMapper jobTaskBatchMapper;
/**
@ -45,24 +43,29 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
*/
private final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(1000);
private static String jobTaskFailTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 Job任务执行失败</font> \n" +
"> 空间ID:{} \n" +
"> 组名称:{} \n" +
"> 任务名称:{} \n" +
"> 执行器名称:{} \n" +
"> 方法参数:{} \n" +
"> 时间:{} \n";
private static final String MESSAGES_FORMATTER = """
<font face=微软雅黑 color=#ff0000 size=4>{}环境 Job任务执行失败</font>\s
> 空间ID:{} \s
> 组名称:{} \s
> 任务名称:{} \s
> 执行器名称:{} \s
> 方法参数:{} \s
> 时间:{};
""";
@Override
protected List<JobAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
Long jobTaskBatchId = queue.take();
Long jobTaskBatchId = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(jobTaskBatchId)) {
return Lists.newArrayList();
}
// 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(jobTaskBatchId);
queue.drainTo(jobTaskBatchIds, 200);
QueryWrapper<JobTaskBatch> wrapper = new QueryWrapper<JobTaskBatch>()
.in("a.id", jobTaskBatchIds).eq("a.deleted", 0);
.in("a.id", jobTaskBatchIds).eq("a.deleted", 0);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
return AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList);
}
@ -71,7 +74,7 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
protected AlarmContext buildAlarmContext(JobAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) {
// 预警
return AlarmContext.build()
.text(jobTaskFailTextMessagesFormatter,
.text(MESSAGES_FORMATTER,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getNamespaceId(),
alarmDTO.getGroupName(),
@ -84,7 +87,7 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
@Override
protected void startLog() {
SnailJobLog.LOCAL.info("JobTaskFailAlarmListener started");
SnailJobLog.LOCAL.info("JobTaskFailAlarmListener started");
}
@Override
@ -100,7 +103,7 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
@Override
public void onApplicationEvent(JobTaskFailAlarmEvent event) {
if (!queue.offer(event.getJobTaskBatchId())) {
SnailJobLog.LOCAL.warn("JOB任务执行失败告警队列已满");
SnailJobLog.LOCAL.warn("JOB任务执行失败告警队列已满");
}
}
}

View File

@ -0,0 +1,96 @@
package com.aizuda.snailjob.server.job.task.support.alarm.listener;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.alarm.AbstractWorkflowAlarm;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* JOB任务执行失败告警
*
* @author: zuoJunLin
* @date : 2023-12-02 21:40
* @since 2.5.0
*/
@Component
@RequiredArgsConstructor
public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<WorkflowTaskFailAlarmEvent> {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(1000);
private static final String MESSAGES_FORMATTER = """
<font face=微软雅黑 color=#ff0000 size=4>{}环境 Workflow任务执行失败</font>\s
> 空间ID:{} \s
> 组名称:{} \s
> 工作流名称:{} \s
> 时间:{};
""";
@Override
protected List<WorkflowAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
Long workflowTaskBatchId = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(workflowTaskBatchId)) {
return Lists.newArrayList();
}
// 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(workflowTaskBatchId);
queue.drainTo(jobTaskBatchIds, 200);
List<WorkflowTaskBatch> workflowTaskBatches = workflowTaskBatchMapper.selectBatchIds(jobTaskBatchIds);
return AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfos(workflowTaskBatches);
}
@Override
protected AlarmContext buildAlarmContext(WorkflowAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) {
// 预警
return AlarmContext.build()
.text(MESSAGES_FORMATTER,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getNamespaceId(),
alarmDTO.getGroupName(),
alarmDTO.getWorkflowName(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 Workflow任务执行失败", EnvironmentUtils.getActiveProfile());
}
@Override
protected void startLog() {
SnailJobLog.LOCAL.info("WorkflowTaskFailAlarmListener started");
}
@Override
protected int getNotifyScene() {
return JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene();
}
@Override
protected List<SyetemTaskTypeEnum> getSystemTaskType() {
return Lists.newArrayList(SyetemTaskTypeEnum.WORKFLOW);
}
@Override
public void onApplicationEvent(WorkflowTaskFailAlarmEvent event) {
if (!queue.offer(event.getWorkflowTaskBatchId())) {
SnailJobLog.LOCAL.warn("Workflow任务执行失败告警队列已满");
}
}
}

View File

@ -23,7 +23,7 @@ import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
@ -39,11 +39,6 @@ import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.FailStrategyEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
@ -14,6 +15,8 @@ import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorFactory;
@ -74,7 +77,7 @@ public class WorkflowExecutorActor extends AbstractActor {
SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(),
JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
// TODO 发送通知
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getTaskBatchId()));
} finally {
getContext().stop(getSelf());
}

View File

@ -6,7 +6,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;

View File

@ -56,7 +56,7 @@ public class RetryTaskFailDeadLetterAlarmListener extends
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS);
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS);
if (CollectionUtils.isEmpty(allRetryDeadLetterList)) {
return Lists.newArrayList();
}

View File

@ -19,6 +19,7 @@ import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 重试任务失败数量超过阈值监听器
@ -49,7 +50,7 @@ public class RetryTaskFailMoreThresholdAlarmListener extends
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
RetryTask retryTask = queue.take();
RetryTask retryTask = queue.poll(100, TimeUnit.MILLISECONDS);
// 拉取100条
List<RetryTask> lists = Lists.newArrayList(retryTask);