From d7d5322e53a9b64b2d4f7db6c790be6a1fb1115f Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Mon, 20 May 2024 23:15:57 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.0):=20=E6=B7=BB=E5=8A=A0=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E6=A3=80=E6=9F=A5=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/dispatch/JobExecutorActor.java | 13 +++- .../dispatch/WorkflowExecutorActor.java | 23 ++++--- .../support/idempotent/TimerIdempotent.java | 12 ++-- .../support/timer/JobTimeoutCheckTask.java | 66 +++++++++++++++++++ .../job/task/support/timer/JobTimerWheel.java | 19 ++---- .../timer/WorkflowTimeoutCheckTask.java | 39 +++++++++++ .../task/support/timer/WorkflowTimerTask.java | 1 + 7 files changed, 142 insertions(+), 31 deletions(-) create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 8f44df810..ca7658f6f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -32,6 +32,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask; +import com.aizuda.snailjob.server.job.task.support.timer.JobTimeoutCheckTask; import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; @@ -105,10 +106,8 @@ public class JobExecutorActor extends AbstractActor { } Job job = jobMapper.selectOne(queryWrapper.eq(Job::getId, taskExecute.getJobId())); - + int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); try { - - int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); int operationReason = JobOperationReasonEnum.NONE.getReason(); if (Objects.isNull(job)) { taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); @@ -153,6 +152,14 @@ public class JobExecutorActor extends AbstractActor { public void afterCompletion(int status) { // 清除时间轮的缓存 JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId()); + + if (JobTaskBatchStatusEnum.RUNNING.getStatus() == status) { + // 运行中的任务,需要进行超时检查 + JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(), + new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), + job.getExecutorTimeout(), TimeUnit.SECONDS); + } + //方法内容 doHandlerResidentTask(job, taskExecute); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 1394453a0..524c80101 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -11,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; @@ -21,14 +22,11 @@ import com.aizuda.snailjob.server.job.task.support.cache.MutableGraphCache; import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorFactory; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.Job; -import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; -import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowNode; -import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; +import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; +import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimeoutCheckTask; +import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask; +import com.aizuda.snailjob.template.datasource.persistence.mapper.*; +import com.aizuda.snailjob.template.datasource.persistence.po.*; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Sets; import com.google.common.graph.MutableGraph; @@ -44,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -59,6 +58,7 @@ public class WorkflowExecutorActor extends AbstractActor { private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final WorkflowNodeMapper workflowNodeMapper; + private final WorkflowMapper workflowMapper; private final JobMapper jobMapper; private final JobTaskBatchMapper jobTaskBatchMapper; private final WorkflowBatchHandler workflowBatchHandler; @@ -88,6 +88,13 @@ public class WorkflowExecutorActor extends AbstractActor { if (SystemConstants.ROOT.equals(taskExecute.getParentId()) && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) { handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason()); + + Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId()); + JobTimerWheel.clearCache(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId()); + + // 超时检查 + JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(), + new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.MILLISECONDS); } // 获取DAG图 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java index 1188cdda9..b99bf2d7e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java @@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit; * @date 2023-10-19 21:54:57 * @since 2.4.0 */ -public class TimerIdempotent implements IdempotentStrategy { +public class TimerIdempotent implements IdempotentStrategy { private static final String KEY_FORMAT = "{0}_{1}_{2}"; private static final Cache cache; @@ -26,28 +26,28 @@ public class TimerIdempotent implements IdempotentStrategy { } @Override - public boolean set(Long key, Long value) { + public boolean set(Integer key, Long value) { cache.put(getKey(key, value), value); return Boolean.TRUE; } @Override - public Long get(Long s) { + public Long get(Integer s) { throw new UnsupportedOperationException("不支持此操作"); } @Override - public boolean isExist(Long key, Long value) { + public boolean isExist(Integer key, Long value) { return cache.asMap().containsKey(getKey(key, value)); } @Override - public boolean clear(Long key, Long value) { + public boolean clear(Integer key, Long value) { cache.invalidate(getKey(key, value)); return Boolean.TRUE; } - private static String getKey(Long key, Long value) { + private static String getKey(Integer key, Long value) { return MessageFormat.format(KEY_FORMAT, key, value); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java new file mode 100644 index 000000000..970ff0e5a --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java @@ -0,0 +1,66 @@ +package com.aizuda.snailjob.server.job.task.support.timer; + +import com.aizuda.snailjob.common.core.context.SpringContext; +import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import lombok.AllArgsConstructor; + +import java.util.Objects; + +/** + * 任务超时检查 + * + * @author opensnail + * @date 2024-05-20 21:16:09 + * @since sj_1.0.0 + */ +@AllArgsConstructor +public class JobTimeoutCheckTask implements TimerTask { + private final Long taskBatchId; + private final Long jobId; + + @Override + public void run(Timeout timeout) throws Exception { + JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBean(JobTaskBatchMapper.class); + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId); + if (Objects.isNull(jobTaskBatch)) { + SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskBatchId); + return; + } + + // 已经完成了,无需重复停止任务 + if (JobTaskBatchStatusEnum.COMPLETED.contains(jobTaskBatch.getTaskBatchStatus())) { + return; + } + + JobMapper jobMapper = SpringContext.getBean(JobMapper.class); + Job job = jobMapper.selectById(jobId); + if (Objects.isNull(job)) { + SnailJobLog.LOCAL.error("job:[{}]不存在", jobId); + return; + } + + SnailJobLog.LOCAL.info("任务执行超时.taskBatchId:[{}]", taskBatchId); + // 超时停止任务 + JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); + TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job); + stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); + stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); + instanceInterrupt.stop(stopJobContext); + SpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId)); + } + + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java index 45e52c661..2efbc9c62 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java @@ -1,14 +1,11 @@ package com.aizuda.snailjob.server.job.task.support.timer; import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.Lifecycle; -import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent; import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent; import io.netty.util.HashedWheelTimer; import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; -import org.springframework.stereotype.Component; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -19,9 +16,8 @@ import java.util.concurrent.TimeUnit; * @date : 2023-09-22 17:03 * @since : 2.4.0 */ -@Component @Slf4j -public class JobTimerWheel implements Lifecycle { +public class JobTimerWheel { private static final int TICK_DURATION = 100; private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-"; @@ -32,8 +28,7 @@ public class JobTimerWheel implements Lifecycle { private static final TimerIdempotent idempotent = new TimerIdempotent(); - @Override - public void start() { + static { timer = new HashedWheelTimer( new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS, 512, true, -1, executor); @@ -47,7 +42,7 @@ public class JobTimerWheel implements Lifecycle { delay = delay < 0 ? 0 : delay; try { timer.newTimeout(task, delay, unit); - idempotent.set(uniqueId, uniqueId); + idempotent.set(taskType, uniqueId); } catch (Exception e) { SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e); } @@ -55,15 +50,11 @@ public class JobTimerWheel implements Lifecycle { } public static boolean isExisted(Integer taskType, Long uniqueId) { - return idempotent.isExist(Long.valueOf(taskType), uniqueId); + return idempotent.isExist(taskType, uniqueId); } public static void clearCache(Integer taskType, Long uniqueId) { - idempotent.clear(Long.valueOf(taskType), uniqueId); + idempotent.clear(taskType, uniqueId); } - @Override - public void close() { - timer.stop(); - } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java new file mode 100644 index 000000000..f19227b49 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java @@ -0,0 +1,39 @@ +package com.aizuda.snailjob.server.job.task.support.timer; + +import com.aizuda.snailjob.common.core.context.SpringContext; +import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import lombok.AllArgsConstructor; + +import java.util.Objects; + +/** + * @author opensnail + * @date 2024-05-20 22:25:12 + * @since sj_1.0.0 + */ +@AllArgsConstructor +public class WorkflowTimeoutCheckTask implements TimerTask { + private final Long workflowTaskBatchId; + @Override + public void run(Timeout timeout) throws Exception { + WorkflowTaskBatchMapper workflowTaskBatchMapper = SpringContext.getBean(WorkflowTaskBatchMapper.class); + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(workflowTaskBatchId); + // 幂等检查 + if (Objects.isNull(workflowTaskBatch) || JobTaskBatchStatusEnum.COMPLETED.contains(workflowTaskBatch.getTaskBatchStatus())) { + return; + } + + WorkflowBatchHandler workflowBatchHandler = SpringContext.getBean(WorkflowBatchHandler.class); + + // 超时停止任务 + workflowBatchHandler.stop(workflowTaskBatchId, JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); + SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId)); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java index 9427f8dcf..e073ab104 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java @@ -11,6 +11,7 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; /** * @author: xiaowoniu