diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index ca7658f6..22d99a3a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -54,6 +54,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; +import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.Objects; @@ -147,17 +148,22 @@ public class JobExecutorActor extends AbstractActor { jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList)); } finally { log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute)); + final int finalTaskStatus = taskStatus; TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCompletion(int status) { // 清除时间轮的缓存 JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId()); - if (JobTaskBatchStatusEnum.RUNNING.getStatus() == status) { + if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) { + // 运行中的任务,需要进行超时检查 - JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(), - new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), - job.getExecutorTimeout(), TimeUnit.SECONDS); + JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), + Duration.ofSeconds(job.getExecutorTimeout())); + +// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(), +// new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()), +// job.getExecutorTimeout(), TimeUnit.SECONDS); } //方法内容 @@ -220,7 +226,7 @@ public class JobExecutorActor extends AbstractActor { jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType()); - ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); +// ResidentJobTimerTask timerTask = ; WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); Long preTriggerAt = ResidentTaskCache.get(job.getId()); @@ -238,8 +244,8 @@ public class JobExecutorActor extends AbstractActor { log.debug("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000); job.setNextTriggerAt(nextTriggerAt); - - JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); + JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000)); +// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); ResidentTaskCache.refresh(job.getId(), nextTriggerAt); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 524c8010..a419207d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -37,6 +37,7 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.Map; @@ -92,9 +93,11 @@ public class WorkflowExecutorActor extends AbstractActor { Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId()); JobTimerWheel.clearCache(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId()); + JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), + Duration.ofSeconds(workflow.getExecutorTimeout())); // 超时检查 - JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(), - new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.MILLISECONDS); +// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(), +// new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.SECONDS); } // 获取DAG图 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 521c9a07..b58a921e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; +import java.time.Duration; import java.time.LocalDateTime; import java.util.Objects; import java.util.Optional; @@ -95,8 +96,9 @@ public class JobTaskBatchGenerator { jobTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene()); jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId()); - JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTaskBatch.getId(), - new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); + JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay)); +// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTaskBatch.getId(), +// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); return jobTaskBatch; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/WorkflowBatchGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/WorkflowBatchGenerator.java index 9aec3668..407fec4b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/WorkflowBatchGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/WorkflowBatchGenerator.java @@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -52,7 +53,9 @@ public class WorkflowBatchGenerator { workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId()); workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId()); workflowTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene()); - JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(), - new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); + + JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimerTask(workflowTimerTaskDTO), Duration.ofMillis(delay)); +// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(), +// new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java index b99bf2d7..175f8e6f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java @@ -1,10 +1,10 @@ package com.aizuda.snailjob.server.job.task.support.idempotent; import com.aizuda.snailjob.server.common.IdempotentStrategy; +import com.aizuda.snailjob.server.common.triple.Pair; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import java.text.MessageFormat; import java.util.concurrent.TimeUnit; /** @@ -13,9 +13,8 @@ import java.util.concurrent.TimeUnit; * @since 2.4.0 */ public class TimerIdempotent implements IdempotentStrategy { - private static final String KEY_FORMAT = "{0}_{1}_{2}"; - private static final Cache cache; + private static final Cache, Long> cache; static { cache = CacheBuilder.newBuilder() @@ -26,8 +25,8 @@ public class TimerIdempotent implements IdempotentStrategy { } @Override - public boolean set(Integer key, Long value) { - cache.put(getKey(key, value), value); + public boolean set(Integer type, Long value) { + cache.put(getKey(type, value), value); return Boolean.TRUE; } @@ -37,17 +36,17 @@ public class TimerIdempotent implements IdempotentStrategy { } @Override - public boolean isExist(Integer key, Long value) { - return cache.asMap().containsKey(getKey(key, value)); + public boolean isExist(Integer type, Long value) { + return cache.asMap().containsKey(getKey(type, value)); } @Override - public boolean clear(Integer key, Long value) { - cache.invalidate(getKey(key, value)); + public boolean clear(Integer type, Long value) { + cache.invalidate(getKey(type, value)); return Boolean.TRUE; } - private static String getKey(Integer key, Long value) { - return MessageFormat.format(KEY_FORMAT, key, value); + private static Pair getKey(Integer type, Long value) { + return Pair.of(type, value); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java index 7798fb19..e81d67ce 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/WaitJobPrepareHandler.java @@ -10,6 +10,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.concurrent.TimeUnit; /** @@ -41,8 +42,10 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId()); jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId()); - JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(), - new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); + + JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay)); +// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(), +// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java index 8e724064..d58f92d1 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java @@ -10,6 +10,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -43,8 +44,10 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId()); workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId()); workflowTimerTaskDTO.setTaskExecutorScene(workflowTaskPrepareDTO.getTaskExecutorScene()); - JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(), - new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); + + JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimerTask(workflowTimerTaskDTO), Duration.ofMillis(delay)); +// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(), +// new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java index 6761e26c..97e59a1e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/AbstractJobTaskStopHandler.java @@ -51,6 +51,7 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler, JobTaskBatch jobTaskBatch = new JobTaskBatch(); jobTaskBatch.setId(context.getTaskBatchId()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); + jobTaskBatch.setOperationReason(context.getJobOperationReason()); jobTaskBatchMapper.updateById(jobTaskBatch); return; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java index 970ff0e5..b6053125 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimeoutCheckTask.java @@ -4,6 +4,8 @@ import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; +import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; @@ -14,7 +16,6 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMa import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import io.netty.util.Timeout; -import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import java.util.Objects; @@ -27,7 +28,7 @@ import java.util.Objects; * @since sj_1.0.0 */ @AllArgsConstructor -public class JobTimeoutCheckTask implements TimerTask { +public class JobTimeoutCheckTask implements TimerTask { private final Long taskBatchId; private final Long jobId; @@ -52,15 +53,21 @@ public class JobTimeoutCheckTask implements TimerTask { return; } - SnailJobLog.LOCAL.info("任务执行超时.taskBatchId:[{}]", taskBatchId); // 超时停止任务 JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job); stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); + stopJobContext.setForceStop(Boolean.TRUE); + stopJobContext.setTaskBatchId(taskBatchId); instanceInterrupt.stop(stopJobContext); + SpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId)); + SnailJobLog.REMOTE.info("超时中断.taskBatchId:[{}]", taskBatchId); } - + @Override + public Long getUniqueId() { + return taskBatchId; + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java index 042eb691..4a233e99 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerTask.java @@ -5,7 +5,6 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import io.netty.util.Timeout; -import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -18,7 +17,7 @@ import java.time.LocalDateTime; */ @AllArgsConstructor @Slf4j -public class JobTimerTask implements TimerTask { +public class JobTimerTask implements TimerTask { private JobTimerTaskDTO jobTimerTaskDTO; @@ -42,4 +41,8 @@ public class JobTimerTask implements TimerTask { } } + @Override + public Long getUniqueId() { + return jobTimerTaskDTO.getTaskBatchId(); + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java index 2efbc9c6..5104940e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java @@ -1,22 +1,25 @@ package com.aizuda.snailjob.server.job.task.support.timer; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent; import io.netty.util.HashedWheelTimer; -import io.netty.util.TimerTask; -import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import java.time.Duration; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; /** * @author: opensnail * @date : 2023-09-22 17:03 * @since : 2.4.0 */ -@Slf4j public class JobTimerWheel { private static final int TICK_DURATION = 100; @@ -35,13 +38,47 @@ public class JobTimerWheel { timer.start(); } - public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) { +// @Deprecated +// public static synchronized void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) { +// +// if (!isExisted(taskType, uniqueId)) { +// SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); +// delay = delay < 0 ? 0 : delay; +// try { +// timer.newTimeout(task, delay, unit); +// idempotent.set(taskType, uniqueId); +// } catch (Exception e) { +// SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e); +// } +// } +// } + + public static synchronized void registerWithWorkflow(Supplier> task, Duration delay) { + TimerTask timerTask = task.get(); + register(SyetemTaskTypeEnum.WORKFLOW.getType(), timerTask.getUniqueId(), timerTask, delay); + } + + public static synchronized void registerWithJob(Supplier> task, Duration delay) { + TimerTask timerTask = task.get(); + register(SyetemTaskTypeEnum.JOB.getType(), timerTask.getUniqueId(), timerTask, delay); + } + + public static synchronized void register(Integer taskType, Long uniqueId, TimerTask task, Duration delay) { + + register(taskType, uniqueId, new Consumer() { + @Override + public void accept(final HashedWheelTimer hashedWheelTimer) { + SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); + timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS); + } + }); + } + + public static synchronized void register(Integer taskType, Long uniqueId, Consumer consumer) { if (!isExisted(taskType, uniqueId)) { - log.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); - delay = delay < 0 ? 0 : delay; try { - timer.newTimeout(task, delay, unit); + consumer.accept(timer); idempotent.set(taskType, uniqueId); } catch (Exception e) { SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java index b4394d28..7c3af067 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/ResidentJobTimerTask.java @@ -9,7 +9,6 @@ import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import io.netty.util.Timeout; -import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -20,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j @AllArgsConstructor -public class ResidentJobTimerTask implements TimerTask { +public class ResidentJobTimerTask implements TimerTask { private JobTimerTaskDTO jobTimerTaskDTO; private Job job; @@ -39,4 +38,9 @@ public class ResidentJobTimerTask implements TimerTask { log.error("任务调度执行失败", e); } } + + @Override + public Long getUniqueId() { + return jobTimerTaskDTO.getTaskBatchId(); + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/TimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/TimerTask.java new file mode 100644 index 00000000..e164b9a8 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/TimerTask.java @@ -0,0 +1,11 @@ +package com.aizuda.snailjob.server.job.task.support.timer; + +/** + * @author: opensnail + * @date : 2024-05-21 + * @since : sj_1.0.0 + */ +public interface TimerTask extends io.netty.util.TimerTask{ + + T getUniqueId(); +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java index f19227b4..87b67175 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java @@ -8,7 +8,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import io.netty.util.Timeout; -import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import java.util.Objects; @@ -19,7 +18,7 @@ import java.util.Objects; * @since sj_1.0.0 */ @AllArgsConstructor -public class WorkflowTimeoutCheckTask implements TimerTask { +public class WorkflowTimeoutCheckTask implements TimerTask { private final Long workflowTaskBatchId; @Override public void run(Timeout timeout) throws Exception { @@ -36,4 +35,9 @@ public class WorkflowTimeoutCheckTask implements TimerTask { workflowBatchHandler.stop(workflowTaskBatchId, JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId)); } + + @Override + public Long getUniqueId() { + return workflowTaskBatchId; + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java index e073ab10..1243ec9b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimerTask.java @@ -6,12 +6,10 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO; import io.netty.util.Timeout; -import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; -import java.util.concurrent.TimeUnit; /** * @author: xiaowoniu @@ -20,7 +18,7 @@ import java.util.concurrent.TimeUnit; */ @AllArgsConstructor @Slf4j -public class WorkflowTimerTask implements TimerTask { +public class WorkflowTimerTask implements TimerTask { private WorkflowTimerTaskDTO workflowTimerTaskDTO; @@ -43,4 +41,8 @@ public class WorkflowTimerTask implements TimerTask { } } + @Override + public Long getUniqueId() { + return workflowTimerTaskDTO.getWorkflowTaskBatchId(); + } }