feat(sj_1.1.0): 阻塞策略支持RECOVERY模式
This commit is contained in:
parent
c2fd01d1d7
commit
8d6213d165
@ -358,12 +358,15 @@ CREATE TABLE `sj_job_task`
|
|||||||
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
|
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
|
||||||
`task_batch_id` bigint(20) NOT NULL COMMENT '调度任务id',
|
`task_batch_id` bigint(20) NOT NULL COMMENT '调度任务id',
|
||||||
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父执行器id',
|
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父执行器id',
|
||||||
`task_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '执行的状态 0、失败 1、成功',
|
`task_status` tinyint NOT NULL DEFAULT 0 COMMENT '执行的状态 0、失败 1、成功',
|
||||||
`retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数',
|
`retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数',
|
||||||
|
`mr_stage` tinyint DEFAULT NULL COMMENT '动态分片所处阶段 1:map 2:reduce 3:mergeReduce',
|
||||||
|
`leaf` tinyint NOT NULL DEFAULT '1' COMMENT '叶子节点',
|
||||||
|
`task_name` varchar(255) NOT NULL DEFAULT '' COMMENT '任务名称',
|
||||||
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
|
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
|
||||||
`result_message` text NOT NULL COMMENT '执行结果',
|
`result_message` text NOT NULL COMMENT '执行结果',
|
||||||
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
|
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
|
||||||
`args_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '参数类型 ',
|
`args_type` tinyint NOT NULL DEFAULT 1 COMMENT '参数类型 ',
|
||||||
`ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段',
|
`ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
@ -66,6 +66,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
|||||||
return doJobExecute(jobArgs);
|
return doJobExecute(jobArgs);
|
||||||
} finally {
|
} finally {
|
||||||
SnailJobLogManager.removeLogMeta();
|
SnailJobLogManager.removeLogMeta();
|
||||||
|
JobContextManager.removeJobContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
@ -47,4 +47,6 @@ public enum JobTaskStatusEnum {
|
|||||||
public static final List<Integer> NOT_COMPLETE = Collections.singletonList(RUNNING.status);
|
public static final List<Integer> NOT_COMPLETE = Collections.singletonList(RUNNING.status);
|
||||||
|
|
||||||
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status);
|
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status);
|
||||||
|
|
||||||
|
public static final List<Integer> NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status);
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,12 @@ import lombok.Getter;
|
|||||||
public enum BlockStrategyEnum {
|
public enum BlockStrategyEnum {
|
||||||
DISCARD(1),
|
DISCARD(1),
|
||||||
OVERLAY(2),
|
OVERLAY(2),
|
||||||
CONCURRENCY(3);
|
CONCURRENCY(3),
|
||||||
|
/**
|
||||||
|
* 丢弃新的并重新触发老的任务(失败的任务)
|
||||||
|
*/
|
||||||
|
RECOVERY(4);
|
||||||
|
;
|
||||||
|
|
||||||
private final int blockStrategy;
|
private final int blockStrategy;
|
||||||
|
|
||||||
|
@ -0,0 +1,74 @@
|
|||||||
|
package com.aizuda.snailjob.server.job.task.support.block.job;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||||
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
|
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
|
||||||
|
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
|
||||||
|
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
||||||
|
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
|
||||||
|
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重新触发执行失败的任务
|
||||||
|
*
|
||||||
|
* @author opensnail
|
||||||
|
* @date 2024-06-16 11:30:59
|
||||||
|
* @since sj_1.1.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy {
|
||||||
|
private final JobTaskMapper jobTaskMapper;
|
||||||
|
private final JobMapper jobMapper;
|
||||||
|
@Override
|
||||||
|
protected void doBlock(BlockStrategyContext context) {
|
||||||
|
Assert.notNull(context.getJobId(), () -> new SnailJobServerException("job id can not be null"));
|
||||||
|
Assert.notNull(context.getTaskBatchId(), () -> new SnailJobServerException("task batch id can not be null"));
|
||||||
|
Assert.notNull(context.getTaskType(), () -> new SnailJobServerException("task type can not be null"));
|
||||||
|
|
||||||
|
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
||||||
|
new LambdaQueryWrapper<JobTask>()
|
||||||
|
.select(JobTask::getId, JobTask::getTaskStatus)
|
||||||
|
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
|
||||||
|
.eq(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_SUCCESS)
|
||||||
|
);
|
||||||
|
|
||||||
|
if (CollUtil.isEmpty(jobTasks)) {
|
||||||
|
SnailJobLog.LOCAL.warn("No executable job task. taskBatchId:[{}]", context.getTaskBatchId());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Job job = jobMapper.selectById(context.getJobId());
|
||||||
|
// 执行任务
|
||||||
|
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(context.getTaskType());
|
||||||
|
jobExecutor.execute(buildJobExecutorContext(context, job, jobTasks));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BlockStrategyEnum blockStrategyEnum() {
|
||||||
|
return BlockStrategyEnum.RECOVERY;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JobExecutorContext buildJobExecutorContext(BlockStrategyContext strategyContext, Job job,
|
||||||
|
List<JobTask> taskList) {
|
||||||
|
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
||||||
|
context.setTaskList(taskList);
|
||||||
|
context.setTaskBatchId(strategyContext.getTaskBatchId());
|
||||||
|
context.setWorkflowTaskBatchId(strategyContext.getWorkflowTaskBatchId());
|
||||||
|
context.setWorkflowNodeId(strategyContext.getWorkflowNodeId());
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user