diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java index c2facd4d..a4f508bf 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java @@ -5,6 +5,8 @@ import lombok.AllArgsConstructor; import lombok.Getter; /** + * 阻塞策略针对处于待处理 or 运行中的批次做了一种异常容错策略 + * * @author: xiaowoniu * @date : 2024-01-18 * @since : 2.6.0 @@ -12,11 +14,21 @@ import lombok.Getter; @AllArgsConstructor @Getter public enum BlockStrategyEnum { + + /** + * 不创建新的批次,等待当前批次执行完成 + */ DISCARD(1), + /** + * 停止当前的批次,然后新增一个新的批次 + */ OVERLAY(2), + /** + * 每次都创建一个新的批次 + */ CONCURRENCY(3), /** - * 丢弃新的并重新触发老的任务(失败的任务) + * 不创建新的批次, 重新执行当前的批次中已经失败的任务 */ RECOVERY(4); ; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java index 520573a3..7516ebc6 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java @@ -20,6 +20,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -36,8 +37,10 @@ import java.util.stream.Stream; @Component @RequiredArgsConstructor public class RecoveryBlockStrategy extends AbstracJobBlockStrategy { + private final JobTaskMapper jobTaskMapper; private final JobMapper jobMapper; + @Override protected void doBlock(BlockStrategyContext context) { Assert.notNull(context.getJobId(), () -> new SnailJobServerException("job id can not be null")); @@ -45,9 +48,8 @@ public class RecoveryBlockStrategy extends AbstracJobBlockStrategy { Assert.notNull(context.getTaskType(), () -> new SnailJobServerException("task type can not be null")); List jobTasks = jobTaskMapper.selectList( - new LambdaQueryWrapper() - .select(JobTask::getId, JobTask::getTaskStatus) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + new LambdaQueryWrapper() + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) ); // 若任务项为空则生成任务项 @@ -67,7 +69,9 @@ public class RecoveryBlockStrategy extends AbstracJobBlockStrategy { // 执行任务 Stop or Fail 任务 JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(context.getTaskType()); jobExecutor.execute(buildJobExecutorContext(context, job, - StreamUtils.filter(jobTasks, (jobTask) -> JobTaskStatusEnum.NOT_SUCCESS.contains(jobTask.getTaskStatus())))); + StreamUtils.filter(jobTasks, + (jobTask) -> JobTaskStatusEnum.NOT_SUCCESS.contains(jobTask.getTaskStatus()) + || JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus())))); } @Override @@ -76,7 +80,7 @@ public class RecoveryBlockStrategy extends AbstracJobBlockStrategy { } private static JobExecutorContext buildJobExecutorContext(BlockStrategyContext strategyContext, Job job, - List taskList) { + List taskList) { JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); context.setTaskList(taskList); context.setTaskBatchId(strategyContext.getTaskBatchId()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java index 2da23f21..b4b1d500 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java @@ -1,8 +1,10 @@ package com.aizuda.snailjob.server.job.task.support.executor.job; import akka.actor.ActorRef; +import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; @@ -30,6 +32,10 @@ public class ClusterJobExecutor extends AbstractJobExecutor { // 调度客户端 List taskList = context.getTaskList(); + if (CollUtil.isEmpty(taskList)) { + throw new SnailJobServerException("No executable job task"); + } + JobTask jobTask = taskList.get(0); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java index d18d8b05..cf968a51 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java @@ -1,8 +1,17 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.aizuda.snailjob.template.datasource.utils.DbUtils; +import org.apache.ibatis.executor.BatchResult; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import java.util.Arrays; import java.util.List; /** @@ -12,6 +21,9 @@ import java.util.List; */ public abstract class AbstractJobTaskGenerator implements JobTaskGenerator, InitializingBean { + @Autowired + private JobTaskMapper jobTaskMapper; + @Override public List generate(JobTaskGenerateContext context) { return doGenerate(context); @@ -23,4 +35,21 @@ public abstract class AbstractJobTaskGenerator implements JobTaskGenerator, Init public void afterPropertiesSet() throws Exception { JobTaskGeneratorFactory.registerTaskInstance(getTaskInstanceType(), this); } + + protected void batchSaveJobTasks(List jobTasks) { + // ORACLE 批次插入不能直接返回id,因此此处特殊处理 + // 后期版本会对snail-job-datasource进行重构,在考虑此处的兼容逻辑 + if (DbUtils.getDbType().getDb().equals(DbTypeEnum.ORACLE.getDb())) { + List inserts = jobTaskMapper.insert(jobTasks); + + if (CollUtil.isNotEmpty(inserts)) { + BatchResult batchResult = inserts.get(0); + Assert.isTrue(jobTasks.size() == Arrays.stream(batchResult.getUpdateCounts()).sum(), () -> new SnailJobServerException("新增任务实例失败")); + } else { + throw new SnailJobServerException("新增任务实例失败"); + } + } + + Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java index a2ac0bb5..4dca7ef9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java @@ -8,6 +8,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; @@ -16,6 +17,7 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -30,11 +32,10 @@ import java.util.*; * @since 2.4.0 */ @Component -@Slf4j +@RequiredArgsConstructor public class BroadcastTaskGenerator extends AbstractJobTaskGenerator { - - @Autowired - private JobTaskMapper jobTaskMapper; + private static final String TASK_NAME ="BROADCAST_TASK"; + private final JobTaskMapper jobTaskMapper; @Override public JobTaskTypeEnum getTaskInstanceType() { @@ -46,7 +47,7 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator { public List doGenerate(JobTaskGenerateContext context) { Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); if (CollUtil.isEmpty(serverNodes)) { - log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return Lists.newArrayList(); } @@ -70,13 +71,14 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator { jobTask.setParentId(0L); jobTask.setLeaf(StatusEnum.YES.getStatus()); jobTask.setRetryCount(0); + jobTask.setTaskName(TASK_NAME); jobTask.setCreateDt(LocalDateTime.now()); jobTask.setUpdateDt(LocalDateTime.now()); clientInfoSet.add(address); jobTasks.add(jobTask); } - Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); + batchSaveJobTasks(jobTasks); return jobTasks; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java index 7f67777e..e9aa2a06 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java @@ -7,6 +7,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; @@ -15,6 +16,7 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -29,13 +31,11 @@ import java.util.Optional; * @since 2.4.0 */ @Component -@Slf4j +@RequiredArgsConstructor public class ClusterTaskGenerator extends AbstractJobTaskGenerator { - - @Autowired - protected ClientNodeAllocateHandler clientNodeAllocateHandler; - @Autowired - private JobTaskMapper jobTaskMapper; + private static final String TASK_NAME ="CLUSTER_TASK"; + private final ClientNodeAllocateHandler clientNodeAllocateHandler; + private final JobTaskMapper jobTaskMapper; @Override public JobTaskTypeEnum getTaskInstanceType() { @@ -48,7 +48,7 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator { RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(), context.getGroupName(), context.getNamespaceId(), context.getRouteKey()); if (Objects.isNull(serverNode)) { - log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return Lists.newArrayList(); } @@ -60,6 +60,7 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator { jobArgsHolder.setJobParams(context.getArgsStr()); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setTaskName(TASK_NAME); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index c5922374..107c796d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -140,6 +140,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTasks = new ArrayList<>(partition.size()); final List finalJobTasks = jobTasks; String finalJobParams = jobParams; + final List finalJobTasks1 = jobTasks; transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { @@ -165,8 +166,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { finalJobTasks.add(jobTask); } - Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insertBatch(finalJobTasks), () -> new SnailJobServerException("新增任务实例失败")); - + batchSaveJobTasks(finalJobTasks1); } }); @@ -218,7 +218,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTasks.add(jobTask); } - Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); + batchSaveJobTasks(jobTasks); // 更新父节点的为非叶子节点 if (CollUtil.isNotEmpty(parentJobTasks)) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java index b66e8875..71b75ede 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java @@ -41,7 +41,7 @@ import java.util.Set; @Component @RequiredArgsConstructor public class ShardingTaskGenerator extends AbstractJobTaskGenerator { - private final ClientNodeAllocateHandler clientNodeAllocateHandler; + private static final String TASK_NAME ="SHARDING_TASK"; private final JobTaskMapper jobTaskMapper; @Override @@ -90,12 +90,11 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { jobTask.setLeaf(StatusEnum.YES.getStatus()); jobTask.setCreateDt(LocalDateTime.now()); jobTask.setUpdateDt(LocalDateTime.now()); + jobTask.setTaskName(TASK_NAME); jobTasks.add(jobTask); } - Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); - - + batchSaveJobTasks(jobTasks); return jobTasks; }