feat(sj_1.1.0): 阻塞策略支持RECOVERY模式

This commit is contained in:
opensnail 2024-06-16 13:01:32 +08:00
parent 6fa8f49daa
commit 3820dc140c
5 changed files with 88 additions and 3 deletions

View File

@ -358,12 +358,15 @@ CREATE TABLE `sj_job_task`
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
`task_batch_id` bigint(20) NOT NULL COMMENT '调度任务id',
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父执行器id',
`task_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '执行的状态 0、失败 1、成功',
`task_status` tinyint NOT NULL DEFAULT 0 COMMENT '执行的状态 0、失败 1、成功',
`retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数',
`mr_stage` tinyint DEFAULT NULL COMMENT '动态分片所处阶段 1:map 2:reduce 3:mergeReduce',
`leaf` tinyint NOT NULL DEFAULT '1' COMMENT '叶子节点',
`task_name` varchar(255) NOT NULL DEFAULT '' COMMENT '任务名称',
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
`result_message` text NOT NULL COMMENT '执行结果',
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
`args_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '参数类型 ',
`args_type` tinyint NOT NULL DEFAULT 1 COMMENT '参数类型 ',
`ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',

View File

@ -66,6 +66,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
return doJobExecute(jobArgs);
} finally {
SnailJobLogManager.removeLogMeta();
JobContextManager.removeJobContext();
}
});

View File

@ -47,4 +47,6 @@ public enum JobTaskStatusEnum {
public static final List<Integer> NOT_COMPLETE = Collections.singletonList(RUNNING.status);
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status);
public static final List<Integer> NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status);
}

View File

@ -14,7 +14,12 @@ import lombok.Getter;
public enum BlockStrategyEnum {
DISCARD(1),
OVERLAY(2),
CONCURRENCY(3);
CONCURRENCY(3),
/**
* 丢弃新的并重新触发老的任务(失败的任务)
*/
RECOVERY(4);
;
private final int blockStrategy;

View File

@ -0,0 +1,74 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 重新触发执行失败的任务
*
* @author opensnail
* @date 2024-06-16 11:30:59
* @since sj_1.1.0
*/
@Component
@RequiredArgsConstructor
public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy {
private final JobTaskMapper jobTaskMapper;
private final JobMapper jobMapper;
@Override
protected void doBlock(BlockStrategyContext context) {
Assert.notNull(context.getJobId(), () -> new SnailJobServerException("job id can not be null"));
Assert.notNull(context.getTaskBatchId(), () -> new SnailJobServerException("task batch id can not be null"));
Assert.notNull(context.getTaskType(), () -> new SnailJobServerException("task type can not be null"));
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId, JobTask::getTaskStatus)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_SUCCESS)
);
if (CollUtil.isEmpty(jobTasks)) {
SnailJobLog.LOCAL.warn("No executable job task. taskBatchId:[{}]", context.getTaskBatchId());
return;
}
Job job = jobMapper.selectById(context.getJobId());
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(context.getTaskType());
jobExecutor.execute(buildJobExecutorContext(context, job, jobTasks));
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.RECOVERY;
}
private static JobExecutorContext buildJobExecutorContext(BlockStrategyContext strategyContext, Job job,
List<JobTask> taskList) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(strategyContext.getTaskBatchId());
context.setWorkflowTaskBatchId(strategyContext.getWorkflowTaskBatchId());
context.setWorkflowNodeId(strategyContext.getWorkflowNodeId());
return context;
}
}