diff --git a/doc/sql/snail_job_mysql.sql b/doc/sql/snail_job_mysql.sql index e943b9645..af1af5911 100644 --- a/doc/sql/snail_job_mysql.sql +++ b/doc/sql/snail_job_mysql.sql @@ -358,12 +358,15 @@ CREATE TABLE `sj_job_task` `job_id` bigint(20) NOT NULL COMMENT '任务信息id', `task_batch_id` bigint(20) NOT NULL COMMENT '调度任务id', `parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父执行器id', - `task_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '执行的状态 0、失败 1、成功', + `task_status` tinyint NOT NULL DEFAULT 0 COMMENT '执行的状态 0、失败 1、成功', `retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数', + `mr_stage` tinyint DEFAULT NULL COMMENT '动态分片所处阶段 1:map 2:reduce 3:mergeReduce', + `leaf` tinyint NOT NULL DEFAULT '1' COMMENT '叶子节点', + `task_name` varchar(255) NOT NULL DEFAULT '' COMMENT '任务名称', `client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port', `result_message` text NOT NULL COMMENT '执行结果', `args_str` text DEFAULT NULL COMMENT '执行方法参数', - `args_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '参数类型 ', + `args_type` tinyint NOT NULL DEFAULT 1 COMMENT '参数类型 ', `ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index a081d8615..22bbd034d 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -66,6 +66,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { return doJobExecute(jobArgs); } finally { SnailJobLogManager.removeLogMeta(); + JobContextManager.removeJobContext(); } }); diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskStatusEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskStatusEnum.java index c7316fbcd..9f59534aa 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskStatusEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskStatusEnum.java @@ -47,4 +47,6 @@ public enum JobTaskStatusEnum { public static final List NOT_COMPLETE = Collections.singletonList(RUNNING.status); public static final List COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status); + + public static final List NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java index dcfc05832..c2facd4d5 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java @@ -14,7 +14,12 @@ import lombok.Getter; public enum BlockStrategyEnum { DISCARD(1), OVERLAY(2), - CONCURRENCY(3); + CONCURRENCY(3), + /** + * 丢弃新的并重新触发老的任务(失败的任务) + */ + RECOVERY(4); + ; private final int blockStrategy; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardRetryBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardRetryBlockStrategy.java new file mode 100644 index 000000000..83b925e21 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardRetryBlockStrategy.java @@ -0,0 +1,74 @@ +package com.aizuda.snailjob.server.job.task.support.block.job; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.server.job.task.support.JobExecutor; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; +import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 重新触发执行失败的任务 + * + * @author opensnail + * @date 2024-06-16 11:30:59 + * @since sj_1.1.0 + */ +@Component +@RequiredArgsConstructor +public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy { + private final JobTaskMapper jobTaskMapper; + private final JobMapper jobMapper; + @Override + protected void doBlock(BlockStrategyContext context) { + Assert.notNull(context.getJobId(), () -> new SnailJobServerException("job id can not be null")); + Assert.notNull(context.getTaskBatchId(), () -> new SnailJobServerException("task batch id can not be null")); + Assert.notNull(context.getTaskType(), () -> new SnailJobServerException("task type can not be null")); + + List jobTasks = jobTaskMapper.selectList( + new LambdaQueryWrapper() + .select(JobTask::getId, JobTask::getTaskStatus) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + .eq(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_SUCCESS) + ); + + if (CollUtil.isEmpty(jobTasks)) { + SnailJobLog.LOCAL.warn("No executable job task. taskBatchId:[{}]", context.getTaskBatchId()); + return; + } + + Job job = jobMapper.selectById(context.getJobId()); + // 执行任务 + JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(context.getTaskType()); + jobExecutor.execute(buildJobExecutorContext(context, job, jobTasks)); + } + + @Override + protected BlockStrategyEnum blockStrategyEnum() { + return BlockStrategyEnum.RECOVERY; + } + + private static JobExecutorContext buildJobExecutorContext(BlockStrategyContext strategyContext, Job job, + List taskList) { + JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); + context.setTaskList(taskList); + context.setTaskBatchId(strategyContext.getTaskBatchId()); + context.setWorkflowTaskBatchId(strategyContext.getWorkflowTaskBatchId()); + context.setWorkflowNodeId(strategyContext.getWorkflowNodeId()); + return context; + } + +}