feat(sj_1.3.0-beta1):

1、新增重试场景告警通知配置
2、新增工作流任务告警通知配置
This commit is contained in:
wodeyangzipingpingwuqi 2024-12-12 17:11:04 +08:00
parent edf4f0f80c
commit da85ee21d2
33 changed files with 214 additions and 61 deletions

View File

@ -42,4 +42,8 @@ public class RetrySceneConfig extends CreateUpdateDt {
private Integer executorTimeout;
/**
* 通知告警场景配置id列表
*/
private String notifyIds;
}

View File

@ -105,4 +105,8 @@ public class Workflow extends CreateUpdateDt {
*/
private Integer deleted;
/**
* 通知告警场景配置id列表
*/
private String notifyIds;
}

View File

@ -8,7 +8,6 @@ import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
@ -58,8 +57,6 @@ public interface AlarmInfoConverter {
return new HashSet<>(JsonUtil.parseList(notifyRecipientIdsStr, Long.class));
}
List<NotifyConfigInfo> jobToNotifyConfigInfos(List<JobNotifyConfig> notifyConfigs);
List<JobAlarmInfo> toJobAlarmInfos(List<JobBatchResponseDO> jobBatchResponse);
List<WorkflowAlarmInfo> toWorkflowAlarmInfos(List<WorkflowBatchResponseDO> workflowBatchResponses);

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.client.common.annotation;
package com.aizuda.snailjob.server.common.alarm;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.alarm.Alarm;
@ -90,12 +90,10 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
} catch (Exception e) {
SnailJobLog.LOCAL.error("RetryTaskFailDeadLetterAlarmListener queue poll Exception", e);
}
}
protected Map<Triple<String, String, Set<Long>>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds,
Set<String> groupNames, Set<Long> notifyIds) {
Set<String> groupNames, Set<Long> notifyIds) {
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
@ -138,13 +136,12 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
return ImmutableTriple.of(configInfo.getNamespaceId(), configInfo.getGroupName(), notifyIds);
});
}
protected abstract List<SyetemTaskTypeEnum> getSystemTaskType();
protected abstract Map<Triple<String, String, Set<Long>>, List<A>> convertAlarmDTO(List<A> alarmData,
Set<String> namespaceIds, Set<String> groupNames, Set<Long> notifyIds);
Set<String> namespaceIds, Set<String> groupNames, Set<Long> notifyIds);
protected abstract List<A> poll() throws InterruptedException;
@ -186,11 +183,9 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
context.setNotifyAttribute(recipientInfo.getNotifyAttribute());
Alarm<AlarmContext> alarm = SnailJobAlarmFactory.getAlarmType(
recipientInfo.getNotifyType());
Alarm<AlarmContext> alarm = SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType());
alarm.asyncSendMessage(context);
}
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.client.common.annotation.AbstractAlarm;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
@ -23,7 +23,7 @@ public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends Abstr
return StreamUtils.groupByKey(alarmInfos, alarmInfo -> {
String namespaceId = alarmInfo.getNamespaceId();
String groupName = alarmInfo.getGroupName();
HashSet<Long> notifyIdsSet = Objects.isNull(alarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(alarmInfo.getNotifyIds(), Long.class));
HashSet<Long> notifyIdsSet = StrUtil.isBlank(alarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(alarmInfo.getNotifyIds(), Long.class));
namespaceIds.add(namespaceId);
groupNames.add(groupName);

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.client.common.annotation.AbstractAlarm;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
@ -17,16 +17,12 @@ import java.util.*;
*/
public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, RetryAlarmInfo> {
@Override
protected Map<Triple<String, String, Set<Long>>, List<RetryAlarmInfo>> convertAlarmDTO(
List<RetryAlarmInfo> retryAlarmInfoList,
Set<String> namespaceIds,
Set<String> groupNames,
Set<Long> notifyIds) {
protected Map<Triple<String, String, Set<Long>>, List<RetryAlarmInfo>> convertAlarmDTO(List<RetryAlarmInfo> retryAlarmInfoList, Set<String> namespaceIds, Set<String> groupNames, Set<Long> notifyIds) {
return StreamUtils.groupByKey(retryAlarmInfoList, retryAlarmInfo -> {
String namespaceId = retryAlarmInfo.getNamespaceId();
String groupName = retryAlarmInfo.getGroupName();
HashSet<Long> notifyIdsSet = Objects.isNull(retryAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retryAlarmInfo.getNotifyIds(), Long.class));
HashSet<Long> notifyIdsSet = StrUtil.isBlank(retryAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retryAlarmInfo.getNotifyIds(), Long.class));
namespaceIds.add(namespaceId);
groupNames.add(groupName);

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.client.common.annotation.AbstractAlarm;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
@ -23,7 +23,7 @@ public abstract class AbstractWorkflowAlarm<E extends ApplicationEvent> extends
return alarmInfos.stream().collect(Collectors.groupingBy(workflowAlarmInfo -> {
String namespaceId = workflowAlarmInfo.getNamespaceId();
String groupName = workflowAlarmInfo.getGroupName();
HashSet<Long> notifyIdsSet = Objects.isNull(workflowAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowAlarmInfo.getNotifyIds(), Long.class));
HashSet<Long> notifyIdsSet = StrUtil.isBlank(workflowAlarmInfo.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(workflowAlarmInfo.getNotifyIds(), Long.class));
namespaceIds.add(namespaceId);
groupNames.add(groupName);

View File

@ -37,6 +37,12 @@ public class JobAlarmInfo extends AlarmInfo {
*/
private Integer operationReason;
/**
* 原因
*/
private String reason;
/**
* 通知告警场景
*/

View File

@ -19,6 +19,7 @@ public class CompleteJobBatchDTO extends BaseDTO {
private Long taskBatchId;
private Integer jobOperationReason;
private Object result;
private String message;
private Integer taskType;
private Boolean retryStatus;

View File

@ -0,0 +1,18 @@
package com.aizuda.snailjob.server.job.task.dto;
import lombok.Builder;
import lombok.Data;
/**
* @author zhengweilin
* @version 1.0.0
* @date 2024/12/12
*/
@Data
@Builder
public class JobTaskFailAlarmEventDTO {
private Long jobTaskBatchId;
private String reason;
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.alarm.event;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
@ -13,11 +14,11 @@ import org.springframework.context.ApplicationEvent;
@Getter
public class JobTaskFailAlarmEvent extends ApplicationEvent {
private final Long jobTaskBatchId;
private JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO;
public JobTaskFailAlarmEvent(Long jobTaskBatchId) {
super(jobTaskBatchId);
this.jobTaskBatchId = jobTaskBatchId;
public JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO) {
super(jobTaskFailAlarmEventDTO);
this.jobTaskFailAlarmEventDTO = jobTaskFailAlarmEventDTO;
}
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.alarm.listener;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
@ -11,6 +12,7 @@ import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
@ -22,6 +24,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
@ -44,7 +47,7 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
/**
* job任务失败数据
*/
private final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(1000);
private final LinkedBlockingQueue<JobTaskFailAlarmEventDTO> queue = new LinkedBlockingQueue<>(1000);
private static final String MESSAGES_FORMATTER = """
<font face=微软雅黑 color=#ff0000 size=4>{}环境 Job任务执行失败</font>\s
@ -60,24 +63,26 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
@Override
protected List<JobAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
Long jobTaskBatchId = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(jobTaskBatchId)) {
JobTaskFailAlarmEventDTO jobTaskFailAlarmEventDTO = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(jobTaskFailAlarmEventDTO)) {
return Lists.newArrayList();
}
// 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(jobTaskBatchId);
queue.drainTo(jobTaskBatchIds, 200);
List<Long> jobTaskBatchIds = Lists.newArrayList(jobTaskFailAlarmEventDTO.getJobTaskBatchId());
queue.drainTo(Collections.singleton(jobTaskBatchIds), 200);
QueryWrapper<JobTaskBatch> wrapper = new QueryWrapper<JobTaskBatch>()
.in("batch.id", jobTaskBatchIds)
.eq("batch.deleted", 0);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
return AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList);
List<JobAlarmInfo> jobAlarmInfos = AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList);
jobAlarmInfos.stream().forEach(i -> i.setReason(jobTaskFailAlarmEventDTO.getReason()));
return jobAlarmInfos;
}
@Override
protected AlarmContext buildAlarmContext(JobAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) {
String desc = JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc();
String desc = StrUtil.isNotBlank(alarmDTO.getReason()) ? alarmDTO.getReason() : JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc();
// 预警
return AlarmContext.build()
.text(MESSAGES_FORMATTER,
@ -109,8 +114,8 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
@Override
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
public void doOnApplicationEvent(JobTaskFailAlarmEvent event) {
if (!queue.offer(event.getJobTaskBatchId())) {
public void doOnApplicationEvent(JobTaskFailAlarmEvent jobTaskFailAlarmEvent) {
if (!queue.offer(jobTaskFailAlarmEvent.getJobTaskFailAlarmEventDTO())) {
SnailJobLog.LOCAL.warn("JOB任务执行失败告警队列已满");
}
}

View File

@ -17,6 +17,7 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
@ -94,7 +95,8 @@ public class JobExecutorActor extends AbstractActor {
} catch (Exception e) {
SnailJobLog.LOCAL.error("job executor exception. [{}]", taskExecute, e);
handleTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecute.getTaskBatchId()));
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskExecute.getTaskBatchId()).build()));
} finally {
getContext().stop(getSelf());
}
@ -128,6 +130,12 @@ public class JobExecutorActor extends AbstractActor {
workflowBatchHandler.openNextNode(taskExecuteDTO);
}
// 无客户端节点-告警通知
if (JobTaskBatchStatusEnum.CANCEL.getStatus() == taskStatus && JobOperationReasonEnum.NOT_CLIENT.getReason() == operationReason) {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskExecute.getTaskBatchId()).build()));
}
// 更新状态
handleTaskBatch(taskExecute, taskStatus, operationReason);
@ -223,7 +231,8 @@ public class JobExecutorActor extends AbstractActor {
() -> new SnailJobServerException("更新任务失败"));
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(taskStatus)) {
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecute.getTaskBatchId()));
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskExecute.getTaskBatchId()).build()));
}
}

View File

@ -74,8 +74,8 @@ public class JobExecutorResultActor extends AbstractActor {
}
private void tryCompleteAndStop(JobExecutorResultDTO result) {
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
private void tryCompleteAndStop(JobExecutorResultDTO jobExecutorResultDTO) {
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(jobExecutorResultDTO);
jobTaskBatchHandler.handleResult(completeJobBatchDTO);
}
}

View File

@ -17,6 +17,7 @@ import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.client.JobRpcClient;
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
@ -114,12 +115,14 @@ public class RequestClientActor extends AbstractActor {
SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试 重试次数:[{}]. <|>{}<|>", jobLogMetaDTO.getTaskId(),
realJobExecutorDTO.getRetryCount(), jobLogMetaDTO, throwable);
} else {
SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>", jobLogMetaDTO.getTaskId(),
SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>",
jobLogMetaDTO.getTaskId(),
jobLogMetaDTO, throwable);
}
taskExecuteFailure(realJobExecutorDTO, throwable.getMessage());
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(dispatchJobRequest.getTaskBatchId()));
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(dispatchJobRequest.getTaskBatchId()).build()));
}
}

View File

@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
@ -82,9 +83,10 @@ public class JobTaskBatchGenerator {
);
}
// 无执行的节点-告警通知
if (JobTaskBatchStatusEnum.CANCEL.getStatus() == jobTaskBatch.getTaskBatchStatus()) {
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(jobTaskBatch.getId()));
// 无客户端节点-告警通知
if (JobTaskBatchStatusEnum.CANCEL.getStatus() == jobTaskBatch.getTaskBatchStatus() && JobOperationReasonEnum.NOT_CLIENT.getReason() == jobTaskBatch.getOperationReason()) {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(jobTaskBatch.getId()).build()));
}
// 非待处理状态无需进入时间轮中

View File

@ -1,17 +1,17 @@
package com.aizuda.snailjob.server.job.task.support.prepare.job;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.snailjob.server.job.task.support.block.job.JobBlockStrategyFactory;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
@ -64,7 +64,9 @@ public class RunningJobPrepareHandler extends AbstractJobPrepareHandler {
stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(prepare.getTaskBatchId()));
SnailSpringContext.getContext().publishEvent(
JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(prepare.getTaskBatchId()));
}
}

View File

@ -6,6 +6,7 @@ import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutorResultHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
@ -67,7 +68,8 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
int taskBatchStatus;
if (failCount > 0) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(context.getTaskBatchId()));
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(context.getTaskBatchId()).reason(context.getMessage()).build()));
doHandleFail(context);
} else if (stopCount > 0) {
taskBatchStatus = JobTaskBatchStatusEnum.STOP.getStatus();

View File

@ -32,5 +32,9 @@ public class JobExecutorResultContext extends BaseDTO {
*/
private boolean taskBatchComplete;
/**
* 原因
*/
private String message;
}

View File

@ -5,6 +5,7 @@ import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.dto.JobTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
@ -67,7 +68,8 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
stopJobContext.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId());
instanceInterrupt.stop(stopJobContext);
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId));
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskBatchId).build()));
SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskBatchId);
}

View File

@ -4,6 +4,8 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants;
import jakarta.validation.constraints.*;
import lombok.Data;
import java.util.Set;
/**
* @author opensnail
* @date 2023-10-25 08:40:57
@ -62,4 +64,8 @@ public class SceneConfigRequestVO {
*/
private Integer isDeleted;
/**
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
}

View File

@ -10,6 +10,7 @@ import jakarta.validation.constraints.Pattern;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
@ -131,4 +132,8 @@ public class WorkflowRequestVO {
private CallbackConfig callback;
}
/**
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.model.response;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Set;
/**
* @author opensnail
@ -129,4 +130,8 @@ public class JobResponseVO {
*/
private Integer deleted;
/**
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.model.response;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Set;
/**
* @author: opensnail
@ -37,4 +38,8 @@ public class SceneConfigResponseVO {
private LocalDateTime updateDt;
/**
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
}

View File

@ -6,6 +6,7 @@ import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
@ -155,4 +156,9 @@ public class WorkflowDetailResponseVO {
}
/**
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.model.response;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Set;
/**
* @author: xiaowoniu
@ -59,4 +60,8 @@ public class WorkflowResponseVO {
*/
private LocalDateTime updateDt;
/**
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
}

View File

@ -22,13 +22,14 @@ public interface JobResponseVOConverter {
JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class);
// @Mappings({
// @Mapping(source = "nextTriggerAt", target = "nextTriggerAt", expression = "java(DateUtils.toLocalDateTime())")
// })
@Mappings({
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
})
List<JobResponseVO> convertList(List<Job> jobs);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))")
@Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))"),
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
})
JobResponseVO convert(Job job);

View File

@ -1,11 +1,18 @@
package com.aizuda.snailjob.server.web.service.convert;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.web.model.request.SceneConfigRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @author: opensnail
@ -16,11 +23,31 @@ public interface SceneConfigConverter {
SceneConfigConverter INSTANCE = Mappers.getMapper(SceneConfigConverter.class);
@Mappings({
@Mapping(target = "notifyIds", expression = "java(SceneConfigConverter.toNotifyIdsStr(requestVO.getNotifyIds()))")
})
RetrySceneConfig toRetrySceneConfig(SceneConfigRequestVO requestVO);
List<RetrySceneConfig> toRetrySceneConfigs(List<SceneConfigRequestVO> requestVOs);
@Mappings({
@Mapping(target = "notifyIds", expression = "java(SceneConfigConverter.toNotifyIds(requestVOs.getNotifyIds()))")
})
List<SceneConfigRequestVO> toSceneConfigRequestVOs(List<RetrySceneConfig> requestVOs);
static Set<Long> toNotifyIds(String notifyIds) {
if (StrUtil.isBlank(notifyIds)) {
return new HashSet<>();
}
return new HashSet<>(JsonUtil.parseList(notifyIds, Long.class));
}
static String toNotifyIdsStr(Set<Long> notifyIds) {
if (CollUtil.isEmpty(notifyIds)) {
return null;
}
return JsonUtil.toJsonString(notifyIds);
}
}

View File

@ -1,11 +1,17 @@
package com.aizuda.snailjob.server.web.service.convert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.web.model.response.SceneConfigResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @author: opensnail
@ -16,7 +22,18 @@ public interface SceneConfigResponseVOConverter {
SceneConfigResponseVOConverter INSTANCE = Mappers.getMapper(SceneConfigResponseVOConverter.class);
List<SceneConfigResponseVO> convertList(List<RetrySceneConfig> retrySceneConfigs);
@Mappings({
@Mapping(target = "notifyIds", expression = "java(SceneConfigResponseVOConverter.toNotifyIds(retrySceneConfig.getNotifyIds()))")
})
SceneConfigResponseVO convert(RetrySceneConfig retrySceneConfig);
List<SceneConfigResponseVO> convertList(List<RetrySceneConfig> retrySceneConfigs);
static Set<Long> toNotifyIds(String notifyIds) {
if (StrUtil.isBlank(notifyIds)) {
return new HashSet<>();
}
return new HashSet<>(JsonUtil.parseList(notifyIds, Long.class));
}
}

View File

@ -1,5 +1,7 @@
package com.aizuda.snailjob.server.web.service.convert;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
@ -20,8 +22,10 @@ import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* @author: xiaowoniu
@ -37,6 +41,9 @@ public interface WorkflowConverter {
WorkflowNode convert(WorkflowRequestVO.NodeInfo nodeInfo);
@Mappings({
@Mapping(target = "notifyIds", expression = "java(WorkflowConverter.toNotifyIds(workflow.getNotifyIds()))")
})
WorkflowDetailResponseVO convert(Workflow workflow);
List<WorkflowDetailResponseVO.NodeInfo> convertList(List<WorkflowNode> workflowNodes);
@ -51,7 +58,8 @@ public interface WorkflowConverter {
List<WorkflowResponseVO> convertListToWorkflowList(List<Workflow> workflowList);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(WorkflowConverter.toLocalDateTime(workflow.getNextTriggerAt()))")
@Mapping(target = "nextTriggerAt", expression = "java(WorkflowConverter.toLocalDateTime(workflow.getNextTriggerAt()))"),
@Mapping(target = "notifyIds", expression = "java(WorkflowConverter.toNotifyIds(workflow.getNotifyIds()))")
})
WorkflowResponseVO convertToWorkflow(Workflow workflow);
@ -99,5 +107,20 @@ public interface WorkflowConverter {
return null;
}
static Set<Long> toNotifyIds(String notifyIds) {
if (StrUtil.isBlank(notifyIds)) {
return new HashSet<>();
}
return new HashSet<>(JsonUtil.parseList(notifyIds, Long.class));
}
static String toNotifyIdsStr(Set<Long> notifyIds) {
if (CollUtil.isEmpty(notifyIds)) {
return null;
}
return JsonUtil.toJsonString(notifyIds);
}
}

View File

@ -139,6 +139,7 @@ public class JobServiceImpl implements JobService {
% systemProperties.getBucketTotal());
job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
job.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
job.setNotifyIds(JsonUtil.toJsonString(jobRequestVO.getNotifyIds()));
job.setId(null);
return 1 == jobMapper.insert(job);
}

View File

@ -86,12 +86,10 @@ public class SceneConfigServiceImpl implements SceneConfigService {
.eq(RetrySceneConfig::getNamespaceId, userSessionVO.getNamespaceId())
.in(CollUtil.isNotEmpty(groupNames), RetrySceneConfig::getGroupName, groupNames)
.eq(Objects.nonNull(queryVO.getSceneStatus()), RetrySceneConfig::getSceneStatus, queryVO.getSceneStatus())
.likeRight(StrUtil.isNotBlank(queryVO.getSceneName()),
RetrySceneConfig::getSceneName, StrUtil.trim(queryVO.getSceneName()))
.likeRight(StrUtil.isNotBlank(queryVO.getSceneName()), RetrySceneConfig::getSceneName, StrUtil.trim(queryVO.getSceneName()))
.orderByDesc(RetrySceneConfig::getCreateDt));
return new PageResult<>(pageDTO, SceneConfigResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords()));
}
@Override
@ -127,6 +125,7 @@ public class SceneConfigServiceImpl implements SceneConfigService {
RetrySceneConfig retrySceneConfig = SceneConfigConverter.INSTANCE.toRetrySceneConfig(requestVO);
retrySceneConfig.setCreateDt(LocalDateTime.now());
retrySceneConfig.setNamespaceId(namespaceId);
retrySceneConfig.setNotifyIds(JsonUtil.toJsonString(requestVO.getNotifyIds()));
if (requestVO.getBackOff() == WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()) {
retrySceneConfig.setTriggerInterval(StrUtil.EMPTY);
}
@ -154,6 +153,7 @@ public class SceneConfigServiceImpl implements SceneConfigService {
retrySceneConfig.setTriggerInterval(
Optional.ofNullable(retrySceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY));
retrySceneConfig.setNotifyIds(JsonUtil.toJsonString(requestVO.getNotifyIds()));
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(retrySceneConfig,
new LambdaUpdateWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)

View File

@ -127,6 +127,7 @@ public class WorkflowServiceImpl implements WorkflowService {
HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
% systemProperties.getBucketTotal());
workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
workflow.setNotifyIds(JsonUtil.toJsonString(workflowRequestVO.getNotifyIds()));
workflow.setId(null);
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new SnailJobServerException("新增工作流失败"));
@ -190,8 +191,7 @@ public class WorkflowServiceImpl implements WorkflowService {
queryVO.getWorkflowStatus())
.orderByDesc(Workflow::getId));
List<WorkflowResponseVO> jobResponseList = WorkflowConverter.INSTANCE.convertListToWorkflowList(
page.getRecords());
List<WorkflowResponseVO> jobResponseList = WorkflowConverter.INSTANCE.convertListToWorkflowList(page.getRecords());
return new PageResult<>(pageDTO, jobResponseList);
}
@ -226,6 +226,7 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setVersion(version);
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
workflow.setNotifyIds(JsonUtil.toJsonString(workflowRequestVO.getNotifyIds()));
// 不允许更新组
workflow.setGroupName(null);
Assert.isTrue(