feat:(1.2.0-beta1): 1. map和map reduce支持配置参数全路径传递

This commit is contained in:
opensnail 2024-09-07 12:57:13 +08:00
parent 876f48f1c2
commit 01d24d754a
5 changed files with 98 additions and 130 deletions

View File

@ -15,12 +15,6 @@ import java.util.Objects;
@Data @Data
public class JobArgs { public class JobArgs {
/**
* 此字段即将废弃请使用see: jobParams
*/
@Deprecated
private String argsStr;
private Object jobParams; private Object jobParams;
private String executorInfo; private String executorInfo;

View File

@ -101,13 +101,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildJobArgs(JobContext jobContext) { private static JobArgs buildJobArgs(JobContext jobContext) {
JobArgs jobArgs = new JobArgs(); JobArgs jobArgs = new JobArgs();
// 下一个版本即将删除本期兼容此问题
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
@ -117,14 +110,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildShardingJobArgs(JobContext jobContext) { private static JobArgs buildShardingJobArgs(JobContext jobContext) {
ShardingJobArgs jobArgs = new ShardingJobArgs(); ShardingJobArgs jobArgs = new ShardingJobArgs();
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
// 下一个版本即将删除本期兼容此问题
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setShardingIndex(jobContext.getShardingIndex()); jobArgs.setShardingIndex(jobContext.getShardingIndex());
jobArgs.setShardingTotal(jobContext.getShardingTotal()); jobArgs.setShardingTotal(jobContext.getShardingTotal());
@ -134,13 +119,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildMapJobArgs(JobContext jobContext) { private static JobArgs buildMapJobArgs(JobContext jobContext) {
MapArgs jobArgs = new MapArgs(); MapArgs jobArgs = new MapArgs();
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
// 下一个版本即将删除本期兼容此问题
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setJobParams(jobArgsHolder.getJobParams()); jobArgs.setJobParams(jobArgsHolder.getJobParams());
jobArgs.setMapResult(jobArgsHolder.getMaps()); jobArgs.setMapResult(jobArgsHolder.getMaps());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
@ -152,13 +130,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildReduceJobArgs(JobContext jobContext) { private static JobArgs buildReduceJobArgs(JobContext jobContext) {
ReduceArgs jobArgs = new ReduceArgs(); ReduceArgs jobArgs = new ReduceArgs();
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
// 下一个版本即将删除本期兼容此问题
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setJobParams(jobArgsHolder.getJobParams()); jobArgs.setJobParams(jobArgsHolder.getJobParams());
Object maps = jobArgsHolder.getMaps(); Object maps = jobArgsHolder.getMaps();
if (Objects.nonNull(maps)) { if (Objects.nonNull(maps)) {

View File

@ -3,11 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum; import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -26,15 +22,16 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/** /**
* 生成Map Reduce任务 * 生成Map Reduce任务
@ -60,14 +57,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
@Override @Override
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) { protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
// Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
// context.getNamespaceId());
// if (CollUtil.isEmpty(serverNodes)) {
// SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
// return Lists.newArrayList();
// }
// List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage()); MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage());
Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed")); Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed"));
switch (Objects.requireNonNull(mapReduceStageEnum)) { switch (Objects.requireNonNull(mapReduceStageEnum)) {
@ -90,19 +79,20 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private List<JobTask> createMergeReduceJobTasks(JobTaskGenerateContext context) { private List<JobTask> createMergeReduceJobTasks(JobTaskGenerateContext context) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>() List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage) .select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()) .eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) .eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
); );
MapReduceArgsStrDTO jobParams = getJobParams(context);
Pair<String, Integer> clientInfo = getClientNodeInfo(context); Pair<String, Integer> clientInfo = getClientNodeInfo(context);
// 新增任务实例 // 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(clientInfo.getKey()); jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType()); jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder(); JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr()); jobArgsHolder.setJobParams(jobParams.getArgsStr());
jobArgsHolder.setReduces(StreamUtils.toList(jobTasks, JobTask::getResultMessage)); jobArgsHolder.setReduces(StreamUtils.toList(jobTasks, JobTask::getResultMessage));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(clientInfo.getValue()); jobTask.setTaskStatus(clientInfo.getValue());
@ -110,30 +100,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage()); jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage());
jobTask.setTaskName(MERGE_REDUCE_TASK); jobTask.setTaskName(MERGE_REDUCE_TASK);
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败")); () -> new SnailJobServerException("新增任务实例失败"));
return Lists.newArrayList(jobTask); return Lists.newArrayList(jobTask);
} }
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context) { private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context) {
int reduceParallel = 1; MapReduceArgsStrDTO jobParams = getJobParams(context);
String jobParams = null; int reduceParallel = Math.max(1,
try { Optional.ofNullable(jobParams.getShardNum()).orElse(1));
MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(),
MapReduceArgsStrDTO.class);
reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1);
jobParams = mapReduceArgsStrDTO.getArgsStr();
reduceParallel = Math.max(1, reduceParallel);
} catch (Exception e) {
SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr());
}
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>() List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getId) .select(JobTask::getResultMessage, JobTask::getId)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()) .eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage()) .eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) .eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
); );
if (CollUtil.isEmpty(jobTasks)) { if (CollUtil.isEmpty(jobTasks)) {
@ -146,7 +128,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTasks = new ArrayList<>(partition.size()); jobTasks = new ArrayList<>(partition.size());
final List<JobTask> finalJobTasks = jobTasks; final List<JobTask> finalJobTasks = jobTasks;
String finalJobParams = jobParams;
transactionTemplate.execute(new TransactionCallbackWithoutResult() { transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override @Override
protected void doInTransactionWithoutResult(final TransactionStatus status) { protected void doInTransactionWithoutResult(final TransactionStatus status) {
@ -158,7 +139,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setClientInfo(clientInfo.getKey()); jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType()); jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder(); JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(finalJobParams); jobArgsHolder.setJobParams(jobParams.getArgsStr());
jobArgsHolder.setMaps(partition.get(index)); jobArgsHolder.setMaps(partition.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(clientInfo.getValue()); jobTask.setTaskStatus(clientInfo.getValue());
@ -187,62 +168,74 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
return Lists.newArrayList(); return Lists.newArrayList();
} }
MapReduceArgsStrDTO jobParams = getJobParams(context);
// 判定父节点是不是叶子节点若是则不更新否则更新为非叶子节点 // 判定父节点是不是叶子节点若是则不更新否则更新为非叶子节点
JobTask parentJobTask = jobTaskMapper.selectOne( JobTask parentJobTask = jobTaskMapper.selectOne(
new LambdaQueryWrapper<JobTask>() new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId) .select(JobTask::getId)
.eq(JobTask::getId, Optional.ofNullable(context.getParentId()).orElse(0L)) .eq(JobTask::getId, Optional.ofNullable(context.getParentId()).orElse(0L))
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) .eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
); );
return transactionTemplate.execute(status -> { return transactionTemplate.execute(status -> {
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size()); List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
for (int index = 0; index < mapSubTask.size(); index++) { for (int index = 0; index < mapSubTask.size(); index++) {
Pair<String, Integer> clientInfo = getClientNodeInfo(context); Pair<String, Integer> clientInfo = getClientNodeInfo(context);
// 新增任务实例 // 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(clientInfo.getKey()); jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType()); jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder(); JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr()); jobArgsHolder.setJobParams(jobParams.getArgsStr());
jobArgsHolder.setMaps(mapSubTask.get(index)); jobArgsHolder.setMaps(mapSubTask.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
jobTask.setTaskStatus(clientInfo.getValue()); jobTask.setTaskStatus(clientInfo.getValue());
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
jobTask.setTaskName(context.getTaskName()); jobTask.setTaskName(context.getTaskName());
jobTask.setLeaf(StatusEnum.YES.getStatus()); jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId()); jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId());
jobTask.setRetryCount(0); jobTask.setRetryCount(0);
jobTask.setCreateDt(LocalDateTime.now()); jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now()); jobTask.setUpdateDt(LocalDateTime.now());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTasks.add(jobTask); jobTasks.add(jobTask);
} }
batchSaveJobTasks(jobTasks); batchSaveJobTasks(jobTasks);
// 更新父节点的为非叶子节点 // 更新父节点的为非叶子节点
if (Objects.nonNull(parentJobTask)) { if (Objects.nonNull(parentJobTask)) {
JobTask parentJobTask1 = new JobTask(); JobTask parentJobTask1 = new JobTask();
parentJobTask1.setId(context.getParentId()); parentJobTask1.setId(context.getParentId());
parentJobTask1.setLeaf(StatusEnum.NO.getStatus()); parentJobTask1.setLeaf(StatusEnum.NO.getStatus());
Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask1), Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask1),
() -> new SnailJobMapReduceException("更新父节点失败")); () -> new SnailJobMapReduceException("更新父节点失败"));
} }
return jobTasks; return jobTasks;
}); });
} }
protected MapReduceArgsStrDTO getJobParams(JobTaskGenerateContext context) {
try {
return JsonUtil.parseObject(context.getArgsStr(), MapReduceArgsStrDTO.class);
} catch (Exception e) {
SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr());
}
return new MapReduceArgsStrDTO();
}
private Pair<String, Integer> getClientNodeInfo(JobTaskGenerateContext context) { private Pair<String, Integer> getClientNodeInfo(JobTaskGenerateContext context) {
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode( RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(
context.getJobId().toString(), context.getJobId().toString(),
context.getGroupName(), context.getGroupName(),
context.getNamespaceId(), context.getNamespaceId(),
ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType() ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType()
); );
String clientInfo = StrUtil.EMPTY; String clientInfo = StrUtil.EMPTY;
int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus(); int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus();

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.job.task.dto.MapReduceArgsStrDTO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -32,4 +33,12 @@ public class MapTaskGenerator extends MapReduceTaskGenerator {
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) { protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
return super.doGenerate(context); return super.doGenerate(context);
} }
@Override
protected MapReduceArgsStrDTO getJobParams(JobTaskGenerateContext context) {
// 这里复用map reduce的参数能力
MapReduceArgsStrDTO mapReduceArgsStrDTO = new MapReduceArgsStrDTO();
mapReduceArgsStrDTO.setArgsStr(context.getArgsStr());
return mapReduceArgsStrDTO;
}
} }

View File

@ -69,10 +69,23 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
Object[] args = retryRequest.getArgs(); Object[] args = retryRequest.getArgs();
MapTaskRequest mapTaskRequest = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), MapTaskRequest.class); MapTaskRequest mapTaskRequest = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), MapTaskRequest.class);
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
.eq(Job::getId, mapTaskRequest.getJobId())
.eq(Job::getGroupName, groupName)
.eq(Job::getNamespaceId, namespace)
);
if (Objects.isNull(job)) {
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.FALSE,
retryRequest.getReqId()));
}
// 创建map任务 // 创建map任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest);
context.setGroupName(HttpHeaderUtil.getGroupName(headers)); context.setGroupName(HttpHeaderUtil.getGroupName(headers));
context.setArgsStr(job.getArgsStr());
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
context.setMrStage(MapReduceStageEnum.MAP.getStage()); context.setMrStage(MapReduceStageEnum.MAP.getStage());
context.setMapSubTask(mapTaskRequest.getSubTask()); context.setMapSubTask(mapTaskRequest.getSubTask());
@ -84,18 +97,6 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
retryRequest.getReqId())); retryRequest.getReqId()));
} }
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
.eq(Job::getId, mapTaskRequest.getJobId())
.eq(Job::getGroupName, groupName)
.eq(Job::getNamespaceId, namespace)
);
if (Objects.isNull(job)) {
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.FALSE,
retryRequest.getReqId()));
}
String newWfContext = null; String newWfContext = null;
if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId())) { if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId())) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(