feat(sj_1.0.0): 1、优化定时任务时间轮 2、 优化定时任务超时检查

This commit is contained in:
opensnail 2024-05-21 14:52:01 +08:00
parent 96b17b67db
commit 561f0efd13
15 changed files with 136 additions and 48 deletions

View File

@ -54,6 +54,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
@ -147,17 +148,22 @@ public class JobExecutorActor extends AbstractActor {
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList));
} finally {
log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
final int finalTaskStatus = taskStatus;
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
// 清除时间轮的缓存
JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId());
if (JobTaskBatchStatusEnum.RUNNING.getStatus() == status) {
if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) {
// 运行中的任务需要进行超时检查
JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(),
new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
job.getExecutorTimeout(), TimeUnit.SECONDS);
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
Duration.ofSeconds(job.getExecutorTimeout()));
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(),
// new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
// job.getExecutorTimeout(), TimeUnit.SECONDS);
}
//方法内容
@ -220,7 +226,7 @@ public class JobExecutorActor extends AbstractActor {
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
// ResidentJobTimerTask timerTask = ;
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
Long preTriggerAt = ResidentTaskCache.get(job.getId());
@ -238,8 +244,8 @@ public class JobExecutorActor extends AbstractActor {
log.debug("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000));
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}

View File

@ -37,6 +37,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@ -92,9 +93,11 @@ public class WorkflowExecutorActor extends AbstractActor {
Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId());
JobTimerWheel.clearCache(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId());
JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()),
Duration.ofSeconds(workflow.getExecutorTimeout()));
// 超时检查
JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(),
new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.MILLISECONDS);
// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(),
// new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.SECONDS);
}
// 获取DAG图

View File

@ -24,6 +24,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Optional;
@ -95,8 +96,9 @@ public class JobTaskBatchGenerator {
jobTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene());
jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId());
JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTaskBatch.getId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay));
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTaskBatch.getId(),
// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
return jobTaskBatch;
}

View File

@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -52,7 +53,9 @@ public class WorkflowBatchGenerator {
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
workflowTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene());
JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimerTask(workflowTimerTaskDTO), Duration.ofMillis(delay));
// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(),
// new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -1,10 +1,10 @@
package com.aizuda.snailjob.server.job.task.support.idempotent;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
/**
@ -13,9 +13,8 @@ import java.util.concurrent.TimeUnit;
* @since 2.4.0
*/
public class TimerIdempotent implements IdempotentStrategy<Integer, Long> {
private static final String KEY_FORMAT = "{0}_{1}_{2}";
private static final Cache<String, Long> cache;
private static final Cache<Pair<Integer/*任务类型: SyetemTaskTypeEnum*/, Long /*批次id*/>, Long> cache;
static {
cache = CacheBuilder.newBuilder()
@ -26,8 +25,8 @@ public class TimerIdempotent implements IdempotentStrategy<Integer, Long> {
}
@Override
public boolean set(Integer key, Long value) {
cache.put(getKey(key, value), value);
public boolean set(Integer type, Long value) {
cache.put(getKey(type, value), value);
return Boolean.TRUE;
}
@ -37,17 +36,17 @@ public class TimerIdempotent implements IdempotentStrategy<Integer, Long> {
}
@Override
public boolean isExist(Integer key, Long value) {
return cache.asMap().containsKey(getKey(key, value));
public boolean isExist(Integer type, Long value) {
return cache.asMap().containsKey(getKey(type, value));
}
@Override
public boolean clear(Integer key, Long value) {
cache.invalidate(getKey(key, value));
public boolean clear(Integer type, Long value) {
cache.invalidate(getKey(type, value));
return Boolean.TRUE;
}
private static String getKey(Integer key, Long value) {
return MessageFormat.format(KEY_FORMAT, key, value);
private static Pair<Integer, Long> getKey(Integer type, Long value) {
return Pair.of(type, value);
}
}

View File

@ -10,6 +10,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@ -41,8 +42,10 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId());
jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId());
JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay));
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(),
// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -10,6 +10,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -43,8 +44,10 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
workflowTimerTaskDTO.setTaskExecutorScene(workflowTaskPrepareDTO.getTaskExecutorScene());
JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimerTask(workflowTimerTaskDTO), Duration.ofMillis(delay));
// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
// new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -51,6 +51,7 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(context.getTaskBatchId());
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
jobTaskBatch.setOperationReason(context.getJobOperationReason());
jobTaskBatchMapper.updateById(jobTaskBatch);
return;
}

View File

@ -4,6 +4,8 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
@ -14,7 +16,6 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMa
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import java.util.Objects;
@ -27,7 +28,7 @@ import java.util.Objects;
* @since sj_1.0.0
*/
@AllArgsConstructor
public class JobTimeoutCheckTask implements TimerTask {
public class JobTimeoutCheckTask implements TimerTask<Long> {
private final Long taskBatchId;
private final Long jobId;
@ -52,15 +53,21 @@ public class JobTimeoutCheckTask implements TimerTask {
return;
}
SnailJobLog.LOCAL.info("任务执行超时.taskBatchId:[{}]", taskBatchId);
// 超时停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
stopJobContext.setForceStop(Boolean.TRUE);
stopJobContext.setTaskBatchId(taskBatchId);
instanceInterrupt.stop(stopJobContext);
SpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId));
SnailJobLog.REMOTE.info("超时中断.taskBatchId:[{}]", taskBatchId);
}
@Override
public Long getUniqueId() {
return taskBatchId;
}
}

View File

@ -5,7 +5,6 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -18,7 +17,7 @@ import java.time.LocalDateTime;
*/
@AllArgsConstructor
@Slf4j
public class JobTimerTask implements TimerTask {
public class JobTimerTask implements TimerTask<Long> {
private JobTimerTaskDTO jobTimerTaskDTO;
@ -42,4 +41,8 @@ public class JobTimerTask implements TimerTask {
}
}
@Override
public Long getUniqueId() {
return jobTimerTaskDTO.getTaskBatchId();
}
}

View File

@ -1,22 +1,25 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author: opensnail
* @date : 2023-09-22 17:03
* @since : 2.4.0
*/
@Slf4j
public class JobTimerWheel {
private static final int TICK_DURATION = 100;
@ -35,13 +38,47 @@ public class JobTimerWheel {
timer.start();
}
public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) {
// @Deprecated
// public static synchronized void register(Integer taskType, Long uniqueId, TimerTask<Long> task, long delay, TimeUnit unit) {
//
// if (!isExisted(taskType, uniqueId)) {
// SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
// delay = delay < 0 ? 0 : delay;
// try {
// timer.newTimeout(task, delay, unit);
// idempotent.set(taskType, uniqueId);
// } catch (Exception e) {
// SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e);
// }
// }
// }
public static synchronized void registerWithWorkflow(Supplier<TimerTask<Long>> task, Duration delay) {
TimerTask<Long> timerTask = task.get();
register(SyetemTaskTypeEnum.WORKFLOW.getType(), timerTask.getUniqueId(), timerTask, delay);
}
public static synchronized void registerWithJob(Supplier<TimerTask<Long>> task, Duration delay) {
TimerTask<Long> timerTask = task.get();
register(SyetemTaskTypeEnum.JOB.getType(), timerTask.getUniqueId(), timerTask, delay);
}
public static synchronized void register(Integer taskType, Long uniqueId, TimerTask<Long> task, Duration delay) {
register(taskType, uniqueId, new Consumer<HashedWheelTimer>() {
@Override
public void accept(final HashedWheelTimer hashedWheelTimer) {
SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS);
}
});
}
public static synchronized void register(Integer taskType, Long uniqueId, Consumer<HashedWheelTimer> consumer) {
if (!isExisted(taskType, uniqueId)) {
log.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
delay = delay < 0 ? 0 : delay;
try {
timer.newTimeout(task, delay, unit);
consumer.accept(timer);
idempotent.set(taskType, uniqueId);
} catch (Exception e) {
SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e);

View File

@ -9,7 +9,6 @@ import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -20,7 +19,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@AllArgsConstructor
public class ResidentJobTimerTask implements TimerTask {
public class ResidentJobTimerTask implements TimerTask<Long> {
private JobTimerTaskDTO jobTimerTaskDTO;
private Job job;
@ -39,4 +38,9 @@ public class ResidentJobTimerTask implements TimerTask {
log.error("任务调度执行失败", e);
}
}
@Override
public Long getUniqueId() {
return jobTimerTaskDTO.getTaskBatchId();
}
}

View File

@ -0,0 +1,11 @@
package com.aizuda.snailjob.server.job.task.support.timer;
/**
* @author: opensnail
* @date : 2024-05-21
* @since : sj_1.0.0
*/
public interface TimerTask<T> extends io.netty.util.TimerTask{
T getUniqueId();
}

View File

@ -8,7 +8,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import java.util.Objects;
@ -19,7 +18,7 @@ import java.util.Objects;
* @since sj_1.0.0
*/
@AllArgsConstructor
public class WorkflowTimeoutCheckTask implements TimerTask {
public class WorkflowTimeoutCheckTask implements TimerTask<Long> {
private final Long workflowTaskBatchId;
@Override
public void run(Timeout timeout) throws Exception {
@ -36,4 +35,9 @@ public class WorkflowTimeoutCheckTask implements TimerTask {
workflowBatchHandler.stop(workflowTaskBatchId, JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
}
@Override
public Long getUniqueId() {
return workflowTaskBatchId;
}
}

View File

@ -6,12 +6,10 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author: xiaowoniu
@ -20,7 +18,7 @@ import java.util.concurrent.TimeUnit;
*/
@AllArgsConstructor
@Slf4j
public class WorkflowTimerTask implements TimerTask {
public class WorkflowTimerTask implements TimerTask<Long> {
private WorkflowTimerTaskDTO workflowTimerTaskDTO;
@ -43,4 +41,8 @@ public class WorkflowTimerTask implements TimerTask {
}
}
@Override
public Long getUniqueId() {
return workflowTimerTaskDTO.getWorkflowTaskBatchId();
}
}