feat:(1.3.0-beta1): 1.手动触发map reduce任务执行时支持全链路传递

This commit is contained in:
opensnail 2024-12-28 15:17:15 +08:00
parent a83a76e733
commit 875f7d5cf3
7 changed files with 74 additions and 18 deletions

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.common.client;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.common.dto.PullRemoteNodeClientRegisterInfoDTO;
import com.aizuda.snailjob.server.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Body;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
@ -22,5 +23,5 @@ public interface CommonRpcClient {
Result syncConfig(@Body ConfigDTO configDTO);
@Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.POST)
Result<String> getRegNodesAndFlush();
Result<String> pullRemoteNodeClientRegisterInfo(@Body PullRemoteNodeClientRegisterInfoDTO registerInfo);
}

View File

@ -0,0 +1,12 @@
package com.aizuda.snailjob.server.common.dto;
/**
* <p>
*
* </p>
*
* @author opensnail
* @date 2024-12-28
*/
public class PullRemoteNodeClientRegisterInfoDTO {
}

View File

@ -6,25 +6,21 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.client.CommonRpcClient;
import com.aizuda.snailjob.server.common.dto.PullRemoteNodeClientRegisterInfoDTO;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.time.Duration;
@ -34,7 +30,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* 客户端注册
@ -189,7 +184,8 @@ public class ClientRegister extends AbstractRegister {
nodeInfo.setHostPort(serverNode.getHostPort());
nodeInfo.setHostIp(serverNode.getHostIp());
CommonRpcClient serverRpcClient = buildRpcClient(nodeInfo);
Result<String> regNodesAndFlush = serverRpcClient.getRegNodesAndFlush();
Result<String> regNodesAndFlush =
serverRpcClient.pullRemoteNodeClientRegisterInfo(new PullRemoteNodeClientRegisterInfoDTO());
return regNodesAndFlush.getData();
} catch (Exception e) {
return StrUtil.EMPTY;

View File

@ -130,7 +130,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
// 统一设置Token
requestHeaders.set(SystemConstants.SNAIL_JOB_AUTH_TOKEN, CacheToken.get(groupName, namespaceId));
SnailJobRequest snailJobRequest = new SnailJobRequest(args);
SnailJobRequest snailJobRequest = new SnailJobRequest(parasResult.body);
Result result = retryer.call(() -> {
StopWatch sw = new StopWatch();

View File

@ -17,6 +17,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -52,6 +53,7 @@ public class ReduceActor extends AbstractActor {
private final JobMapper jobMapper;
private final JobTaskMapper jobTaskMapper;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final JobTaskBatchHandler jobTaskBatchHandler;
@Override
public Receive createReceive() {
@ -95,12 +97,15 @@ public class ReduceActor extends AbstractActor {
return;
}
String argStr = jobTaskBatchHandler.getArgStr(reduceTask.getTaskBatchId(), job);
// 创建reduce任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setMrStage(reduceTask.getMrStage());
context.setWfContext(reduceTask.getWfContext());
context.setArgsStr(argStr);
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());

View File

@ -1,15 +1,12 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
@ -25,10 +22,13 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -50,6 +50,7 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL
public class JobTaskBatchHandler {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobTaskMapper jobTaskMapper;
private final GroupConfigMapper groupConfigMapper;
private final List<JobExecutorResultHandler> resultHandlerList;
@ -136,4 +137,41 @@ public class JobTaskBatchHandler {
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
/**
* 这里为了兼容MAP或者MAP_REDUCE场景下手动执行任务的时候参数丢失问题
* 需要从JobTask中获取任务类型为MAP的且是taskName是ROOT_MAP的任务的参数作为执行参数下发给客户端
*
* @param taskBatchId 任务批次
* @param job 任务
* @return 需要给客户端下发的参数
*/
public String getArgStr(Long taskBatchId, Job job) {
JobTask rootMapTask = jobTaskMapper.selectList(new PageDTO<>(1, 1),
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId, JobTask::getArgsStr)
.eq(JobTask::getTaskBatchId, taskBatchId)
.eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage())
.eq(JobTask::getTaskName, SystemConstants.ROOT_MAP)
.orderByAsc(JobTask::getId)
).stream().findFirst().orElse(null);
// {"jobParams":"测试参数传递","maps":""}
String argsStr = job.getArgsStr();
if (Objects.nonNull(rootMapTask) && StrUtil.isNotBlank(rootMapTask.getArgsStr())) {
JobArgsHolder jobArgsHolder = JsonUtil.parseObject(rootMapTask.getArgsStr(), JobArgsHolder.class);
// MAP_REDUCE的参数: {"shardNum":2,"argsStr":"测试参数传递"} 这里得解析出来覆盖argsStr
if (JobTaskTypeEnum.MAP_REDUCE.getType() == job.getTaskType()) {
MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(argsStr, MapReduceArgsStrDTO.class);
mapReduceArgsStrDTO.setArgsStr((String) jobArgsHolder.getJobParams());
argsStr = JsonUtil.toJsonString(mapReduceArgsStrDTO);
} else {
// MAP的参数: 测试参数传递 直接覆盖即可
argsStr = (String) jobArgsHolder.getJobParams();
}
}
return argsStr;
}
}

View File

@ -22,6 +22,7 @@ import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFacto
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
@ -49,6 +50,7 @@ import java.util.Objects;
public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final JobMapper jobMapper;
private final JobTaskBatchHandler jobTaskBatchHandler;
@Override
public boolean supports(final String path) {
@ -81,11 +83,13 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
retryRequest.getReqId());
}
String argStr = jobTaskBatchHandler.getArgStr(mapTaskRequest.getTaskBatchId(), job);
// 创建map任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest);
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
context.setArgsStr(job.getArgsStr());
context.setArgsStr(argStr);
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
context.setMrStage(MapReduceStageEnum.MAP.getStage());
context.setMapSubTask(mapTaskRequest.getSubTask());