From 27adf034202bd6ae7fdfe7dcdc3b542fce1b5229 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Thu, 4 Jul 2024 11:25:43 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0-beta2):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E6=B5=81=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/MapReduceTaskGenerator.java | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index d9019d954..5bd20e1f3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -8,12 +8,12 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.allocate.client.ClientLoadBalanceManager; -import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; @@ -24,7 +24,6 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; @@ -90,10 +89,10 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { private List createMergeReduceJobTasks(JobTaskGenerateContext context) { List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) - .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + .select(JobTask::getResultMessage) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); Pair clientInfo = getClientNodeInfo(context); @@ -110,7 +109,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage()); jobTask.setTaskName(MERGE_REDUCE_TASK); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); + () -> new SnailJobServerException("新增任务实例失败")); return Lists.newArrayList(jobTask); } @@ -120,7 +119,8 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { int reduceParallel = 1; String jobParams = null; try { - MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(), MapReduceArgsStrDTO.class); + MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(), + MapReduceArgsStrDTO.class); reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1); jobParams = mapReduceArgsStrDTO.getArgsStr(); } catch (Exception e) { @@ -128,10 +128,10 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { } List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - .eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage()) - .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + .select(JobTask::getResultMessage, JobTask::getId) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + .eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); if (CollUtil.isEmpty(jobTasks)) { @@ -187,10 +187,11 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { } // 判定父节点是不是叶子节点,若是则不更新否则更新为非叶子节点 - List parentJobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1), - new LambdaQueryWrapper().select(JobTask::getId) - .eq(JobTask::getId, context.getParentId()) - .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + JobTask parentJobTask = jobTaskMapper.selectOne( + new LambdaQueryWrapper() + .select(JobTask::getId) + .eq(JobTask::getId, context.getParentId()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); List jobTasks = new ArrayList<>(mapSubTask.size()); @@ -226,26 +227,26 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { batchSaveJobTasks(jobTasks); // 更新父节点的为非叶子节点 - if (CollUtil.isNotEmpty(parentJobTasks)) { + if (Objects.nonNull(parentJobTask)) { JobTask parentJobTask = new JobTask(); parentJobTask.setId(context.getParentId()); parentJobTask.setLeaf(StatusEnum.NO.getStatus()); - jobTaskMapper.updateById(parentJobTask); + Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask), + () -> new SnailJobMapReduceException("更新父节点失败")); } } }); - return jobTasks; } private Pair getClientNodeInfo(JobTaskGenerateContext context) { RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode( - context.getJobId().toString(), - context.getGroupName(), - context.getNamespaceId(), - ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType() + context.getJobId().toString(), + context.getGroupName(), + context.getNamespaceId(), + ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType() ); String clientInfo = StrUtil.EMPTY; int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus();