diff --git a/doc/sql/snail_job_sqlserver.sql b/doc/sql/snail_job_sqlserver.sql index d72ecd09b..c0a949cc0 100644 --- a/doc/sql/snail_job_sqlserver.sql +++ b/doc/sql/snail_job_sqlserver.sql @@ -1138,7 +1138,7 @@ GO -- sj_distributed_lock CREATE TABLE sj_distributed_lock ( - name nvarchar(64) NOT NULL PRIMARY KEY IDENTITY, + name nvarchar(64) NOT NULL PRIMARY KEY, lock_until datetime2 NOT NULL DEFAULT CURRENT_TIMESTAMP, locked_at datetime2 NOT NULL DEFAULT CURRENT_TIMESTAMP, locked_by nvarchar(255) NOT NULL, diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index aa0d9fad2..06182d905 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -4,7 +4,6 @@ import akka.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; @@ -63,7 +62,7 @@ public class ReduceActor extends AbstractActor { Assert.notNull(reduceTask.getMrStage(), () -> new SnailJobServerException("mrStage can not be null")); Assert.notNull(reduceTask.getJobId(), () -> new SnailJobServerException("jobId can not be null")); Assert.notNull(reduceTask.getTaskBatchId(), - () -> new SnailJobServerException("taskBatchId can not be null")); + () -> new SnailJobServerException("taskBatchId can not be null")); String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()); distributedLockHandler.lockWithDisposableAndRetry(() -> { doReduce(reduceTask); @@ -78,10 +77,11 @@ public class ReduceActor extends AbstractActor { private void doReduce(final ReduceTaskDTO reduceTask) { List jobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1), - new LambdaQueryWrapper() - .select(JobTask::getId) - .eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId()) - .eq(JobTask::getMrStage, reduceTask.getMrStage()) + new LambdaQueryWrapper() + .select(JobTask::getId) + .eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId()) + .eq(JobTask::getMrStage, reduceTask.getMrStage()) + .orderByAsc(JobTask::getId) ); if (CollUtil.isNotEmpty(jobTasks)) { @@ -110,9 +110,9 @@ public class ReduceActor extends AbstractActor { String wfContext = null; if (Objects.nonNull(reduceTask.getWorkflowTaskBatchId())) { WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( - new LambdaQueryWrapper() - .select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getId) - .eq(WorkflowTaskBatch::getId, reduceTask.getWorkflowTaskBatchId()) + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getId) + .eq(WorkflowTaskBatch::getId, reduceTask.getWorkflowTaskBatchId()) ); wfContext = workflowTaskBatch.getWfContext(); } @@ -124,10 +124,10 @@ public class ReduceActor extends AbstractActor { } private static JobExecutorContext buildJobExecutorContext( - ReduceTaskDTO reduceTask, - Job job, - List taskList, - String wfContext) { + ReduceTaskDTO reduceTask, + Job job, + List taskList, + String wfContext) { JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); context.setTaskList(taskList); context.setTaskBatchId(reduceTask.getTaskBatchId()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java index cf968a510..bf8928b83 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/AbstractJobTaskGenerator.java @@ -7,6 +7,7 @@ import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.utils.DbUtils; +import com.google.common.collect.Sets; import org.apache.ibatis.executor.BatchResult; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -39,17 +40,14 @@ public abstract class AbstractJobTaskGenerator implements JobTaskGenerator, Init protected void batchSaveJobTasks(List jobTasks) { // ORACLE 批次插入不能直接返回id,因此此处特殊处理 // 后期版本会对snail-job-datasource进行重构,在考虑此处的兼容逻辑 - if (DbUtils.getDbType().getDb().equals(DbTypeEnum.ORACLE.getDb())) { - List inserts = jobTaskMapper.insert(jobTasks); - - if (CollUtil.isNotEmpty(inserts)) { - BatchResult batchResult = inserts.get(0); - Assert.isTrue(jobTasks.size() == Arrays.stream(batchResult.getUpdateCounts()).sum(), () -> new SnailJobServerException("新增任务实例失败")); - } else { - throw new SnailJobServerException("新增任务实例失败"); + if (Sets.newHashSet(DbTypeEnum.ORACLE.getDb(), DbTypeEnum.SQLSERVER.getDb()) + .contains(DbUtils.getDbType().getDb())) { + // sqlserver oracle 不支持返回批量id,故暂时先这样处理 + for (JobTask jobTask : jobTasks) { + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); } + } else { + Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); } - - Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 3c4268f2e..2f9272bf9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -124,6 +124,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { MapReduceArgsStrDTO.class); reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1); jobParams = mapReduceArgsStrDTO.getArgsStr(); + reduceParallel = Math.max(1, reduceParallel); } catch (Exception e) { SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr()); } @@ -190,7 +191,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { JobTask parentJobTask = jobTaskMapper.selectOne( new LambdaQueryWrapper() .select(JobTask::getId) - .eq(JobTask::getId, context.getParentId()) + .eq(JobTask::getId, Optional.ofNullable(context.getParentId()).orElse(0L)) .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) );