From 88556aa7c1f89104db595760b865f11d1651f975 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Tue, 2 Jul 2024 23:30:27 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.1.0-beta2):=20=E4=BC=98=E5=8C=96Map?= =?UTF-8?q?Reduce=E5=AE=A2=E6=88=B7=E7=AB=AF=E8=B4=9F=E8=BD=BD=E5=9D=87?= =?UTF-8?q?=E8=A1=A1=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/ClientNodeAllocateHandler.java | 1 - .../task/support/dispatch/ReduceActor.java | 2 +- .../job/BroadcastTaskJobExecutor.java | 4 + .../executor/job/ClusterJobExecutor.java | 4 + .../executor/job/MapReduceJobExecutor.java | 4 + .../executor/job/ShardingJobExecutor.java | 4 + .../task/MapReduceTaskGenerator.java | 74 ++++++++++++------- .../generator/task/MapTaskGenerator.java | 6 +- 8 files changed, 69 insertions(+), 30 deletions(-) diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ClientNodeAllocateHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ClientNodeAllocateHandler.java index 682fe9b7..27fc6d13 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ClientNodeAllocateHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ClientNodeAllocateHandler.java @@ -22,7 +22,6 @@ import java.util.stream.Stream; @Component @RequiredArgsConstructor public class ClientNodeAllocateHandler { - private final AccessTemplate accessTemplate; /** * 获取分配的节点 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index 5eb9384d..4a5d569f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -67,7 +67,7 @@ public class ReduceActor extends AbstractActor { String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()); distributedLockHandler.lockWithDisposableAndRetry(() -> { doReduce(reduceTask); - }, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 3); + }, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 6); } catch (Exception e) { SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/BroadcastTaskJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/BroadcastTaskJobExecutor.java index 9e6d36f3..c7ae1dc9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/BroadcastTaskJobExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/BroadcastTaskJobExecutor.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.executor.job; import akka.actor.ActorRef; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; @@ -32,6 +33,9 @@ public class BroadcastTaskJobExecutor extends AbstractJobExecutor { List taskList = context.getTaskList(); for (JobTask jobTask : taskList) { + if (StrUtil.isBlank(jobTask.getClientInfo())) { + continue; + } RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java index b4b1d500..8fa81215 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.executor.job; import akka.actor.ActorRef; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; @@ -37,6 +38,9 @@ public class ClusterJobExecutor extends AbstractJobExecutor { } JobTask jobTask = taskList.get(0); + if (StrUtil.isBlank(jobTask.getClientInfo())) { + return; + } RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java index ea6d1245..569dbebf 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.executor.job; import akka.actor.ActorRef; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; @@ -28,6 +29,9 @@ public class MapReduceJobExecutor extends AbstractJobExecutor { protected void doExecute(final JobExecutorContext context) { List taskList = context.getTaskList(); for (final JobTask jobTask : taskList) { + if (StrUtil.isBlank(jobTask.getClientInfo())) { + return; + } RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ShardingJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ShardingJobExecutor.java index 05115a9d..0bf7876a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ShardingJobExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ShardingJobExecutor.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.executor.job; import akka.actor.ActorRef; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; @@ -31,6 +32,9 @@ public class ShardingJobExecutor extends AbstractJobExecutor { List taskList = context.getTaskList(); for (int i = 0; i < taskList.size(); i++) { JobTask jobTask = taskList.get(i); + if (StrUtil.isBlank(jobTask.getClientInfo())) { + return; + } RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); realJobExecutor.setShardingIndex(i); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index a69a27fa..d9019d95 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -12,9 +12,12 @@ import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.allocate.client.ClientLoadBalanceManager; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.job.task.dto.MapReduceArgsStrDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; @@ -48,6 +51,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { private static final String REDUCE_TASK = "REDUCE_TASK"; private final JobTaskMapper jobTaskMapper; private final TransactionTemplate transactionTemplate; + private final ClientNodeAllocateHandler clientNodeAllocateHandler; @Override public JobTaskTypeEnum getTaskInstanceType() { @@ -56,34 +60,34 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { @Override protected List doGenerate(final JobTaskGenerateContext context) { - Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), - context.getNamespaceId()); - if (CollUtil.isEmpty(serverNodes)) { - SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); - return Lists.newArrayList(); - } +// Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), +// context.getNamespaceId()); +// if (CollUtil.isEmpty(serverNodes)) { +// SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); +// return Lists.newArrayList(); +// } - List nodeInfoList = new ArrayList<>(serverNodes); +// List nodeInfoList = new ArrayList<>(serverNodes); MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage()); Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed")); switch (Objects.requireNonNull(mapReduceStageEnum)) { case MAP -> { // MAP任务 - return createMapJobTasks(context, nodeInfoList, serverNodes); + return createMapJobTasks(context); } case REDUCE -> { // REDUCE任务 - return createReduceJobTasks(context, nodeInfoList, serverNodes); + return createReduceJobTasks(context); } case MERGE_REDUCE -> { // REDUCE任务 - return createMergeReduceJobTasks(context, nodeInfoList, serverNodes); + return createMergeReduceJobTasks(context); } default -> throw new SnailJobServerException("Map reduce stage is not existed"); } } - private List createMergeReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, Set serverNodes) { + private List createMergeReduceJobTasks(JobTaskGenerateContext context) { List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() .select(JobTask::getResultMessage) @@ -92,16 +96,16 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); - RegisterNodeInfo registerNodeInfo = nodeInfoList.get(0); + Pair clientInfo = getClientNodeInfo(context); // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); - jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); + jobTask.setClientInfo(clientInfo.getKey()); jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(context.getArgsStr()); jobArgsHolder.setReduces(JsonUtil.toJsonString(StreamUtils.toList(jobTasks, JobTask::getResultMessage))); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); - jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setTaskStatus(clientInfo.getValue()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage()); jobTask.setTaskName(MERGE_REDUCE_TASK); @@ -111,8 +115,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { return Lists.newArrayList(jobTask); } - private List createReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, - Set serverNodes) { + private List createReduceJobTasks(JobTaskGenerateContext context) { int reduceParallel = 1; String jobParams = null; @@ -142,21 +145,21 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTasks = new ArrayList<>(partition.size()); final List finalJobTasks = jobTasks; String finalJobParams = jobParams; - final List finalJobTasks1 = jobTasks; transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { for (int index = 0; index < partition.size(); index++) { - RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size()); + Pair clientInfo = getClientNodeInfo(context); + // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); - jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); + jobTask.setClientInfo(clientInfo.getKey()); jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(finalJobParams); jobArgsHolder.setMaps(partition.get(index)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); - jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setTaskStatus(clientInfo.getValue()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage()); jobTask.setTaskName(REDUCE_TASK); @@ -168,7 +171,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { finalJobTasks.add(jobTask); } - batchSaveJobTasks(finalJobTasks1); + batchSaveJobTasks(finalJobTasks); } }); @@ -176,8 +179,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { } @NotNull - private List createMapJobTasks(final JobTaskGenerateContext context, - final List nodeInfoList, final Set serverNodes) { + private List createMapJobTasks(final JobTaskGenerateContext context) { List mapSubTask = context.getMapSubTask(); if (CollUtil.isEmpty(mapSubTask)) { SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId()); @@ -198,17 +200,18 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { protected void doInTransactionWithoutResult(final TransactionStatus status) { for (int index = 0; index < mapSubTask.size(); index++) { - RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size()); + Pair clientInfo = getClientNodeInfo(context); + // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); - jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); + jobTask.setClientInfo(clientInfo.getKey()); jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(context.getArgsStr()); jobArgsHolder.setMaps(mapSubTask.get(index)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); - jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setTaskStatus(clientInfo.getValue()); jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); jobTask.setTaskName(context.getTaskName()); jobTask.setLeaf(StatusEnum.YES.getStatus()); @@ -236,6 +239,25 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { return jobTasks; } + + private Pair getClientNodeInfo(JobTaskGenerateContext context) { + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode( + context.getJobId().toString(), + context.getGroupName(), + context.getNamespaceId(), + ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType() + ); + String clientInfo = StrUtil.EMPTY; + int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus(); + if (Objects.isNull(serverNode)) { + JobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus(); + } else { + clientInfo = ClientInfoUtils.generate(serverNode); + } + + return Pair.of(clientInfo, JobTaskStatus); + } + private List> averageAlgorithm(List allMapJobTasks, int shard) { // 最多分片数为allMapJobTasks.size() diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java index ab7341f8..b52bbe9d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import org.springframework.stereotype.Component; @@ -17,8 +18,9 @@ import java.util.List; public class MapTaskGenerator extends MapReduceTaskGenerator { public MapTaskGenerator(final JobTaskMapper jobTaskMapper, - final TransactionTemplate transactionTemplate) { - super(jobTaskMapper, transactionTemplate); + final TransactionTemplate transactionTemplate, + final ClientNodeAllocateHandler clientNodeAllocateHandler) { + super(jobTaskMapper, transactionTemplate, clientNodeAllocateHandler); } @Override