feat(sj_1.0.0): 工作流接入告警

This commit is contained in:
opensnail 2024-05-08 09:34:53 +08:00
parent ef5c2fd953
commit e98946580e
15 changed files with 53 additions and 17 deletions

View File

@ -102,7 +102,7 @@ public class SlidingWindow<T> {
ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<>(); ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<>();
list.add(data); list.add(data);
SnailJobLog.LOCAL.info("添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size()); SnailJobLog.LOCAL.debug("添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size());
saveData.put(windowPeriod, list); saveData.put(windowPeriod, list);
// 扫描n-1个窗口是否过期过期则删除 // 扫描n-1个窗口是否过期过期则删除
@ -213,7 +213,7 @@ public class SlidingWindow<T> {
LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit); LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit);
if (windowPeriod.isBefore(currentTime)) { if (windowPeriod.isBefore(currentTime)) {
SnailJobLog.LOCAL.info("删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime); SnailJobLog.LOCAL.debug("删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime);
saveData.remove(windowPeriod); saveData.remove(windowPeriod);
} }
@ -287,7 +287,7 @@ public class SlidingWindow<T> {
removeInvalidWindow(windowPeriod); removeInvalidWindow(windowPeriod);
if (windowPeriod.isBefore(condition)) { if (windowPeriod.isBefore(condition)) {
SnailJobLog.LOCAL.info("到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData)); SnailJobLog.LOCAL.debug("到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData));
doHandlerListener(windowPeriod); doHandlerListener(windowPeriod);
} }
} }

View File

@ -14,6 +14,11 @@ public class WorkflowBatchResponseDO {
private Long id; private Long id;
/**
* 命名空间id
*/
private String namespaceId;
/** /**
* 组名称 * 组名称
*/ */

View File

@ -24,4 +24,6 @@ import java.util.List;
public interface WorkflowTaskBatchMapper extends BaseMapper<WorkflowTaskBatch> { public interface WorkflowTaskBatchMapper extends BaseMapper<WorkflowTaskBatch> {
List<WorkflowBatchResponseDO> selectWorkflowBatchPageList(PageDTO<JobTaskBatch> pageDTO, @Param("ew") Wrapper<WorkflowTaskBatch> wrapper); List<WorkflowBatchResponseDO> selectWorkflowBatchPageList(PageDTO<JobTaskBatch> pageDTO, @Param("ew") Wrapper<WorkflowTaskBatch> wrapper);
List<WorkflowBatchResponseDO> selectWorkflowBatchPageList(@Param("ew") Wrapper<WorkflowTaskBatch> wrapper);
} }

View File

@ -26,5 +26,12 @@
JOIN sj_workflow b ON a.workflow_id = b.id JOIN sj_workflow b ON a.workflow_id = b.id
${ew.customSqlSegment} ${ew.customSqlSegment}
</select> </select>
<select id="selectWorkflowBatchList"
resultType="com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO">
SELECT a.*,
b.workflow_name
FROM sj_workflow_task_batch a
JOIN sj_workflow b ON a.workflow_id = b.id
${ew.customSqlSegment}
</select>
</mapper> </mapper>

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
@ -62,6 +63,6 @@ public interface AlarmInfoConverter {
List<JobAlarmInfo> toJobAlarmInfos(List<JobBatchResponseDO> jobBatchResponse); List<JobAlarmInfo> toJobAlarmInfos(List<JobBatchResponseDO> jobBatchResponse);
List<WorkflowAlarmInfo> toWorkflowAlarmInfos(List<WorkflowTaskBatch> jobBatchResponse); List<WorkflowAlarmInfo> toWorkflowAlarmInfos(List<WorkflowBatchResponseDO> workflowBatchResponses);
} }

View File

@ -29,10 +29,8 @@ import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalApplicationListener;
import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;

View File

@ -1,13 +1,10 @@
package com.aizuda.snailjob.server.common.alarm; package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SystemModeEnum;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple; import com.aizuda.snailjob.server.common.triple.Triple;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import java.text.MessageFormat;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;

View File

@ -1,13 +1,10 @@
package com.aizuda.snailjob.server.common.alarm; package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SystemModeEnum;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple; import com.aizuda.snailjob.server.common.triple.Triple;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import java.text.MessageFormat;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;

View File

@ -24,4 +24,9 @@ public class WorkflowAlarmInfo extends AlarmInfo {
*/ */
private Long workflowId; private Long workflowId;
/**
* 操作原因
*/
private Integer operationReason;
} }

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.alarm.listener;
import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils; import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter; import com.aizuda.snailjob.server.common.AlarmInfoConverter;
@ -11,8 +12,11 @@ import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -41,6 +45,7 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<Workflo
> 空间ID:{} \s > 空间ID:{} \s
> 组名称:{} \s > 组名称:{} \s
> 工作流名称:{} \s > 工作流名称:{} \s
> 失败原因:{} \s
> 时间:{}; > 时间:{};
"""; """;
@ -53,14 +58,19 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<Workflo
} }
// 拉取200条 // 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(workflowTaskBatchId); List<Long> workflowTaskBatchIds = Lists.newArrayList(workflowTaskBatchId);
queue.drainTo(jobTaskBatchIds, 200); queue.drainTo(workflowTaskBatchIds, 200);
List<WorkflowTaskBatch> workflowTaskBatches = workflowTaskBatchMapper.selectBatchIds(jobTaskBatchIds);
QueryWrapper<WorkflowTaskBatch> wrapper = new QueryWrapper<WorkflowTaskBatch>()
.in("a.id", workflowTaskBatchIds).eq("a.deleted", 0);
List<WorkflowBatchResponseDO> workflowTaskBatches = workflowTaskBatchMapper.selectWorkflowBatchPageList(wrapper);
return AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfos(workflowTaskBatches); return AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfos(workflowTaskBatches);
} }
@Override @Override
protected AlarmContext buildAlarmContext(WorkflowAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) { protected AlarmContext buildAlarmContext(WorkflowAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) {
String desc = JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc();
// 预警 // 预警
return AlarmContext.build() return AlarmContext.build()
.text(MESSAGES_FORMATTER, .text(MESSAGES_FORMATTER,
@ -68,6 +78,7 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm<Workflo
alarmDTO.getNamespaceId(), alarmDTO.getNamespaceId(),
alarmDTO.getGroupName(), alarmDTO.getGroupName(),
alarmDTO.getWorkflowName(), alarmDTO.getWorkflowName(),
desc,
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN)) DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 Workflow任务执行失败", EnvironmentUtils.getActiveProfile()); .title("{}环境 Workflow任务执行失败", EnvironmentUtils.getActiveProfile());
} }

View File

@ -77,7 +77,7 @@ public class WorkflowExecutorActor extends AbstractActor {
SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e); SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(),
JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason()); JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getTaskBatchId())); SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId()));
} finally { } finally {
getContext().stop(getSelf()); getContext().stop(getSelf());
} }

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.executor.workflow;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.*; import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
@ -9,6 +10,7 @@ import com.aizuda.snailjob.server.common.rpc.okhttp.RequestInterceptor;
import com.aizuda.snailjob.server.common.dto.CallbackConfig; import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO; import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
@ -123,6 +125,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
} }
message = throwable.getMessage(); message = throwable.getMessage();
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(context.getWorkflowTaskBatchId()));
} }
context.setEvaluationResult(result); context.setEvaluationResult(result);

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.executor.workflow;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
@ -17,6 +18,7 @@ import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
import com.aizuda.snailjob.server.common.enums.LogicalConditionEnum; import com.aizuda.snailjob.server.common.enums.LogicalConditionEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler; import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
@ -116,6 +118,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason(); operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage(); message = e.getMessage();
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(context.getWorkflowTaskBatchId()));
} }
} else { } else {
result = Boolean.TRUE; result = Boolean.TRUE;

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.handler;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
@ -15,6 +16,7 @@ import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.MutableGraphCache; import com.aizuda.snailjob.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
@ -115,6 +117,7 @@ public class WorkflowBatchHandler {
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason() if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason()
&& JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) { && JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
} }
} }
} }
@ -178,6 +181,7 @@ public class WorkflowBatchHandler {
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(workflowTaskBatch), Assert.isTrue(1 == workflowTaskBatchMapper.updateById(workflowTaskBatch),
() -> new SnailJobServerException("停止工作流批次失败. id:[{}]", () -> new SnailJobServerException("停止工作流批次失败. id:[{}]",
workflowTaskBatchId)); workflowTaskBatchId));
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
// 关闭已经触发的任务 // 关闭已经触发的任务
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>() List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.prepare.workflow; package com.aizuda.snailjob.server.job.task.support.prepare.workflow;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -7,6 +8,7 @@ import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.block.workflow.WorkflowBlockStrategyContext; import com.aizuda.snailjob.server.job.task.support.block.workflow.WorkflowBlockStrategyContext;
import com.aizuda.snailjob.server.job.task.support.block.workflow.WorkflowBlockStrategyFactory; import com.aizuda.snailjob.server.job.task.support.block.workflow.WorkflowBlockStrategyFactory;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
@ -56,6 +58,7 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout())); prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务 // 超时停止任务
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(prepare.getWorkflowTaskBatchId()));
} }
} }