feat(sj_1.0.0): 修复常驻任务执行阻塞策略时无法准时开启下一次任务

This commit is contained in:
opensnail 2024-06-02 14:03:19 +08:00
parent ef15020613
commit 71a03b083e
3 changed files with 112 additions and 69 deletions

View File

@ -9,36 +9,28 @@ import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimeoutCheckTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
@ -74,8 +66,8 @@ public class JobExecutorActor extends AbstractActor {
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final TransactionTemplate transactionTemplate;
private final GroupConfigMapper groupConfigMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobTaskBatchHandler jobTaskBatchHandler;
@Override
public Receive createReceive() {
@ -164,8 +156,8 @@ public class JobExecutorActor extends AbstractActor {
Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));
}
//方法内容
doHandleResidentTask(job, taskExecute);
// 开启下一个常驻任务
jobTaskBatchHandler.openResidentTask(job, taskExecute);
}
});
}
@ -199,52 +191,4 @@ public class JobExecutorActor extends AbstractActor {
}
private void doHandleResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
) {
return;
}
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
if (count == 0) {
return;
}
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
Long preTriggerAt = ResidentTaskCache.get(job.getId());
if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) {
preTriggerAt = job.getNextTriggerAt();
}
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(preTriggerAt);
Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext);
// 获取时间差的毫秒数
long milliseconds = nextTriggerAt - preTriggerAt;
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
log.info("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}

View File

@ -9,12 +9,16 @@ import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
@ -40,6 +44,8 @@ public class JobTaskBatchGenerator {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobTaskBatchHandler jobTaskBatchHandler;
private final JobMapper jobMapper;
@Transactional
public JobTaskBatch generateJobTaskBatch(JobTaskBatchGeneratorContext context) {
@ -76,13 +82,12 @@ public class JobTaskBatchGenerator {
// 非待处理状态无需进入时间轮中
if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(jobTaskBatch.getId());
workflowBatchHandler.openNextNode(taskExecuteDTO);
// 开启下一个工作流
openNextWorkflow(context, jobTaskBatch);
// 若是常驻任务则需要再次进入时间轮
openNextResidentTask(context, jobTaskBatch);
return jobTaskBatch;
}
@ -95,10 +100,35 @@ public class JobTaskBatchGenerator {
jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId());
JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay));
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTaskBatch.getId(),
// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
return jobTaskBatch;
}
private void openNextResidentTask(JobTaskBatchGeneratorContext context, JobTaskBatch jobTaskBatch) {
// 手动触发的定时任务工作流场景下不存在常驻任务无需触发
if (JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(context.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(context.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(context.getTaskExecutorScene())) {
return;
}
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(jobTaskBatch.getId());
taskExecuteDTO.setJobId(context.getJobId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowNodeId(context.getWorkflowNodeId());
Job job = jobMapper.selectById(context.getJobId());
jobTaskBatchHandler.openResidentTask(job, taskExecuteDTO);
}
private void openNextWorkflow(JobTaskBatchGeneratorContext context, JobTaskBatch jobTaskBatch) {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(jobTaskBatch.getId());
workflowBatchHandler.openNextNode(taskExecuteDTO);
}
}

View File

@ -4,12 +4,25 @@ import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -19,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@ -36,6 +50,7 @@ public class JobTaskBatchHandler {
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final GroupConfigMapper groupConfigMapper;
@Transactional
@ -91,4 +106,58 @@ public class JobTaskBatchHandler {
);
}
/**
* 开启常驻任务
*
* @param job 定时任务配置信息
* @param taskExecuteDTO 任务执行新
*/
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
) {
return;
}
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
if (count == 0) {
return;
}
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
Long preTriggerAt = ResidentTaskCache.get(job.getId());
if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) {
preTriggerAt = job.getNextTriggerAt();
}
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(preTriggerAt);
Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext);
// 获取时间差的毫秒数
long milliseconds = nextTriggerAt - preTriggerAt;
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
log.info("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}