feat(sj_1.0.0): 添加超时检查功能

This commit is contained in:
opensnail 2024-05-20 23:15:57 +08:00
parent 546e1b2097
commit d7d5322e53
7 changed files with 142 additions and 31 deletions

View File

@ -32,6 +32,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimeoutCheckTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
@ -105,10 +106,8 @@ public class JobExecutorActor extends AbstractActor {
}
Job job = jobMapper.selectOne(queryWrapper.eq(Job::getId, taskExecute.getJobId()));
try {
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
try {
int operationReason = JobOperationReasonEnum.NONE.getReason();
if (Objects.isNull(job)) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
@ -153,6 +152,14 @@ public class JobExecutorActor extends AbstractActor {
public void afterCompletion(int status) {
// 清除时间轮的缓存
JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId());
if (JobTaskBatchStatusEnum.RUNNING.getStatus() == status) {
// 运行中的任务需要进行超时检查
JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(),
new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
job.getExecutorTimeout(), TimeUnit.SECONDS);
}
//方法内容
doHandlerResidentTask(job, taskExecute);
}

View File

@ -11,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
@ -21,14 +22,11 @@ import com.aizuda.snailjob.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimeoutCheckTask;
import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.*;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph;
@ -44,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -59,6 +58,7 @@ public class WorkflowExecutorActor extends AbstractActor {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final WorkflowMapper workflowMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
@ -88,6 +88,13 @@ public class WorkflowExecutorActor extends AbstractActor {
if (SystemConstants.ROOT.equals(taskExecute.getParentId()) && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason());
Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId());
JobTimerWheel.clearCache(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId());
// 超时检查
JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(),
new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.MILLISECONDS);
}
// 获取DAG图

View File

@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit;
* @date 2023-10-19 21:54:57
* @since 2.4.0
*/
public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
public class TimerIdempotent implements IdempotentStrategy<Integer, Long> {
private static final String KEY_FORMAT = "{0}_{1}_{2}";
private static final Cache<String, Long> cache;
@ -26,28 +26,28 @@ public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
}
@Override
public boolean set(Long key, Long value) {
public boolean set(Integer key, Long value) {
cache.put(getKey(key, value), value);
return Boolean.TRUE;
}
@Override
public Long get(Long s) {
public Long get(Integer s) {
throw new UnsupportedOperationException("不支持此操作");
}
@Override
public boolean isExist(Long key, Long value) {
public boolean isExist(Integer key, Long value) {
return cache.asMap().containsKey(getKey(key, value));
}
@Override
public boolean clear(Long key, Long value) {
public boolean clear(Integer key, Long value) {
cache.invalidate(getKey(key, value));
return Boolean.TRUE;
}
private static String getKey(Long key, Long value) {
private static String getKey(Integer key, Long value) {
return MessageFormat.format(KEY_FORMAT, key, value);
}
}

View File

@ -0,0 +1,66 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import java.util.Objects;
/**
* 任务超时检查
*
* @author opensnail
* @date 2024-05-20 21:16:09
* @since sj_1.0.0
*/
@AllArgsConstructor
public class JobTimeoutCheckTask implements TimerTask {
private final Long taskBatchId;
private final Long jobId;
@Override
public void run(Timeout timeout) throws Exception {
JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBean(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId);
if (Objects.isNull(jobTaskBatch)) {
SnailJobLog.LOCAL.error("jobTaskBatch:[{}]不存在", taskBatchId);
return;
}
// 已经完成了无需重复停止任务
if (JobTaskBatchStatusEnum.COMPLETED.contains(jobTaskBatch.getTaskBatchStatus())) {
return;
}
JobMapper jobMapper = SpringContext.getBean(JobMapper.class);
Job job = jobMapper.selectById(jobId);
if (Objects.isNull(job)) {
SnailJobLog.LOCAL.error("job:[{}]不存在", jobId);
return;
}
SnailJobLog.LOCAL.info("任务执行超时.taskBatchId:[{}]", taskBatchId);
// 超时停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
SpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId));
}
}

View File

@ -1,14 +1,11 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent;
import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@ -19,9 +16,8 @@ import java.util.concurrent.TimeUnit;
* @date : 2023-09-22 17:03
* @since : 2.4.0
*/
@Component
@Slf4j
public class JobTimerWheel implements Lifecycle {
public class JobTimerWheel {
private static final int TICK_DURATION = 100;
private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-";
@ -32,8 +28,7 @@ public class JobTimerWheel implements Lifecycle {
private static final TimerIdempotent idempotent = new TimerIdempotent();
@Override
public void start() {
static {
timer = new HashedWheelTimer(
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION,
TimeUnit.MILLISECONDS, 512, true, -1, executor);
@ -47,7 +42,7 @@ public class JobTimerWheel implements Lifecycle {
delay = delay < 0 ? 0 : delay;
try {
timer.newTimeout(task, delay, unit);
idempotent.set(uniqueId, uniqueId);
idempotent.set(taskType, uniqueId);
} catch (Exception e) {
SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e);
}
@ -55,15 +50,11 @@ public class JobTimerWheel implements Lifecycle {
}
public static boolean isExisted(Integer taskType, Long uniqueId) {
return idempotent.isExist(Long.valueOf(taskType), uniqueId);
return idempotent.isExist(taskType, uniqueId);
}
public static void clearCache(Integer taskType, Long uniqueId) {
idempotent.clear(Long.valueOf(taskType), uniqueId);
idempotent.clear(taskType, uniqueId);
}
@Override
public void close() {
timer.stop();
}
}

View File

@ -0,0 +1,39 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import java.util.Objects;
/**
* @author opensnail
* @date 2024-05-20 22:25:12
* @since sj_1.0.0
*/
@AllArgsConstructor
public class WorkflowTimeoutCheckTask implements TimerTask {
private final Long workflowTaskBatchId;
@Override
public void run(Timeout timeout) throws Exception {
WorkflowTaskBatchMapper workflowTaskBatchMapper = SpringContext.getBean(WorkflowTaskBatchMapper.class);
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(workflowTaskBatchId);
// 幂等检查
if (Objects.isNull(workflowTaskBatch) || JobTaskBatchStatusEnum.COMPLETED.contains(workflowTaskBatch.getTaskBatchStatus())) {
return;
}
WorkflowBatchHandler workflowBatchHandler = SpringContext.getBean(WorkflowBatchHandler.class);
// 超时停止任务
workflowBatchHandler.stop(workflowTaskBatchId, JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
}
}

View File

@ -11,6 +11,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author: xiaowoniu