From 875f7d5cf3e50dfdb3fbef2dac2346c04ca22c7d Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sat, 28 Dec 2024 15:17:15 +0800 Subject: [PATCH] =?UTF-8?q?feat:(1.3.0-beta1):=201.=E6=89=8B=E5=8A=A8?= =?UTF-8?q?=E8=A7=A6=E5=8F=91map=20reduce=E4=BB=BB=E5=8A=A1=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=97=B6=E6=94=AF=E6=8C=81=E5=85=A8=E9=93=BE=E8=B7=AF?= =?UTF-8?q?=E4=BC=A0=E9=80=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/common/client/CommonRpcClient.java | 3 +- .../PullRemoteNodeClientRegisterInfoDTO.java | 12 +++++ .../common/register/ClientRegister.java | 10 ++-- .../rpc/client/RpcClientInvokeHandler.java | 2 +- .../task/support/dispatch/ReduceActor.java | 5 ++ .../support/handler/JobTaskBatchHandler.java | 54 ++++++++++++++++--- .../MapTaskPostHttpRequestHandler.java | 6 ++- 7 files changed, 74 insertions(+), 18 deletions(-) create mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PullRemoteNodeClientRegisterInfoDTO.java diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java index 21025810b..cfbe3a573 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.common.client; import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.server.common.dto.PullRemoteNodeClientRegisterInfoDTO; import com.aizuda.snailjob.server.common.rpc.client.RequestMethod; import com.aizuda.snailjob.server.common.rpc.client.annotation.Body; import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; @@ -22,5 +23,5 @@ public interface CommonRpcClient { Result syncConfig(@Body ConfigDTO configDTO); @Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.POST) - Result getRegNodesAndFlush(); + Result pullRemoteNodeClientRegisterInfo(@Body PullRemoteNodeClientRegisterInfoDTO registerInfo); } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PullRemoteNodeClientRegisterInfoDTO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PullRemoteNodeClientRegisterInfoDTO.java new file mode 100644 index 000000000..4e073cfb3 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PullRemoteNodeClientRegisterInfoDTO.java @@ -0,0 +1,12 @@ +package com.aizuda.snailjob.server.common.dto; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2024-12-28 + */ +public class PullRemoteNodeClientRegisterInfoDTO { +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java index ee4b25897..141c3c431 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java @@ -6,25 +6,21 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.NetUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.client.CommonRpcClient; +import com.aizuda.snailjob.server.common.dto.PullRemoteNodeClientRegisterInfoDTO; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; -import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Lists; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; -import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Component; import java.time.Duration; @@ -34,7 +30,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.*; -import java.util.stream.Collectors; /** * 客户端注册 @@ -189,7 +184,8 @@ public class ClientRegister extends AbstractRegister { nodeInfo.setHostPort(serverNode.getHostPort()); nodeInfo.setHostIp(serverNode.getHostIp()); CommonRpcClient serverRpcClient = buildRpcClient(nodeInfo); - Result regNodesAndFlush = serverRpcClient.getRegNodesAndFlush(); + Result regNodesAndFlush = + serverRpcClient.pullRemoteNodeClientRegisterInfo(new PullRemoteNodeClientRegisterInfoDTO()); return regNodesAndFlush.getData(); } catch (Exception e) { return StrUtil.EMPTY; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java index 9335b3689..3dcf1346a 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/RpcClientInvokeHandler.java @@ -130,7 +130,7 @@ public class RpcClientInvokeHandler implements InvocationHandler { // 统一设置Token requestHeaders.set(SystemConstants.SNAIL_JOB_AUTH_TOKEN, CacheToken.get(groupName, namespaceId)); - SnailJobRequest snailJobRequest = new SnailJobRequest(args); + SnailJobRequest snailJobRequest = new SnailJobRequest(parasResult.body); Result result = retryer.call(() -> { StopWatch sw = new StopWatch(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index 06182d905..0f4f88e6a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -17,6 +17,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; +import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; @@ -52,6 +53,7 @@ public class ReduceActor extends AbstractActor { private final JobMapper jobMapper; private final JobTaskMapper jobTaskMapper; private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + private final JobTaskBatchHandler jobTaskBatchHandler; @Override public Receive createReceive() { @@ -95,12 +97,15 @@ public class ReduceActor extends AbstractActor { return; } + String argStr = jobTaskBatchHandler.getArgStr(reduceTask.getTaskBatchId(), job); + // 创建reduce任务 JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setMrStage(reduceTask.getMrStage()); context.setWfContext(reduceTask.getWfContext()); + context.setArgsStr(argStr); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index b4d7f4d75..b6485c6c9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -1,15 +1,12 @@ package com.aizuda.snailjob.server.job.task.support.handler; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.enums.*; import com.aizuda.snailjob.common.core.enums.StatusEnum; -import akka.actor.ActorRef; -import cn.hutool.core.collection.CollUtil; -import com.aizuda.snailjob.common.core.context.SnailSpringContext; -import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.common.core.model.JobArgsHolder; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.server.common.dto.DistributeInstance; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; @@ -25,10 +22,13 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -50,6 +50,7 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL public class JobTaskBatchHandler { private final JobTaskBatchMapper jobTaskBatchMapper; + private final JobTaskMapper jobTaskMapper; private final GroupConfigMapper groupConfigMapper; private final List resultHandlerList; @@ -136,4 +137,41 @@ public class JobTaskBatchHandler { JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration); ResidentTaskCache.refresh(job.getId(), nextTriggerAt); } + + /** + * 这里为了兼容MAP或者MAP_REDUCE场景下手动执行任务的时候参数丢失问题, + * 需要从JobTask中获取任务类型为MAP的且是taskName是ROOT_MAP的任务的参数作为执行参数下发给客户端 + * + * @param taskBatchId 任务批次 + * @param job 任务 + * @return 需要给客户端下发的参数 + */ + public String getArgStr(Long taskBatchId, Job job) { + JobTask rootMapTask = jobTaskMapper.selectList(new PageDTO<>(1, 1), + new LambdaQueryWrapper() + .select(JobTask::getId, JobTask::getArgsStr) + .eq(JobTask::getTaskBatchId, taskBatchId) + .eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage()) + .eq(JobTask::getTaskName, SystemConstants.ROOT_MAP) + .orderByAsc(JobTask::getId) + ).stream().findFirst().orElse(null); + + // {"jobParams":"测试参数传递","maps":""} + String argsStr = job.getArgsStr(); + if (Objects.nonNull(rootMapTask) && StrUtil.isNotBlank(rootMapTask.getArgsStr())) { + JobArgsHolder jobArgsHolder = JsonUtil.parseObject(rootMapTask.getArgsStr(), JobArgsHolder.class); + // MAP_REDUCE的参数: {"shardNum":2,"argsStr":"测试参数传递"} 这里得解析出来覆盖argsStr + if (JobTaskTypeEnum.MAP_REDUCE.getType() == job.getTaskType()) { + MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(argsStr, MapReduceArgsStrDTO.class); + mapReduceArgsStrDTO.setArgsStr((String) jobArgsHolder.getJobParams()); + argsStr = JsonUtil.toJsonString(mapReduceArgsStrDTO); + } else { + // MAP的参数: 测试参数传递 直接覆盖即可 + argsStr = (String) jobArgsHolder.getJobParams(); + } + } + + return argsStr; + } + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index 14e028489..e82c40501 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -22,6 +22,7 @@ import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFacto import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; +import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; @@ -49,6 +50,7 @@ import java.util.Objects; public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final JobMapper jobMapper; + private final JobTaskBatchHandler jobTaskBatchHandler; @Override public boolean supports(final String path) { @@ -81,11 +83,13 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { retryRequest.getReqId()); } + String argStr = jobTaskBatchHandler.getArgStr(mapTaskRequest.getTaskBatchId(), job); + // 创建map任务 JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType()); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest); context.setGroupName(HttpHeaderUtil.getGroupName(headers)); - context.setArgsStr(job.getArgsStr()); + context.setArgsStr(argStr); context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); context.setMrStage(MapReduceStageEnum.MAP.getStage()); context.setMapSubTask(mapTaskRequest.getSubTask());