diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 93e84de2..38293d73 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -65,7 +65,7 @@ public class JobEndPoint { if (IJobExecutor.class.isAssignableFrom(executor.getClass())) { if (JobTaskTypeEnum.MAP.getType() == jobContext.getTaskType()) { jobExecutor = (AbstractMapExecutor) executor; - } else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskId()) { + } else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskType()) { jobExecutor = (AbstractMapReduceExecutor) executor; } else { jobExecutor = (AbstractJobExecutor) executor; @@ -115,6 +115,7 @@ public class JobEndPoint { jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId()); jobContext.setRetry(dispatchJob.isRetry()); jobContext.setRetryScene(dispatchJob.getRetryScene()); + jobContext.setMapName(dispatchJob.getMapName()); return jobContext; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java new file mode 100644 index 00000000..f29b83cb --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java @@ -0,0 +1,16 @@ +package com.aizuda.snailjob.client.job.core.dto; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class MapReduceArgs extends JobArgs { + + private String mapName; +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index ef882102..8e5cdffd 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.job.core.IJobExecutor; import com.aizuda.snailjob.client.job.core.cache.FutureCache; import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache; import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ShardingJobArgs; import com.aizuda.snailjob.client.job.core.log.JobLogMeta; import com.aizuda.snailjob.client.job.core.timer.StopTaskTimerTask; @@ -47,6 +48,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor { JobArgs jobArgs; if (jobContext.getTaskType() == JobTaskTypeEnum.SHARDING.getType()) { jobArgs = buildShardingJobArgs(jobContext); + } else if (jobContext.getTaskType() == JobTaskTypeEnum.MAP_REDUCE.getType()) { + jobArgs = buildMapReduceJobArgs(jobContext); } else { jobArgs = buildJobArgs(jobContext); } @@ -73,6 +76,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { logMeta.setJobId(jobContext.getJobId()); logMeta.setTaskBatchId(jobContext.getTaskBatchId()); SnailJobLogManager.initLogInfo(logMeta, LogTypeEnum.JOB); + JobContextManager.setJobContext(jobContext); } private static JobArgs buildJobArgs(JobContext jobContext) { @@ -92,5 +96,14 @@ public abstract class AbstractJobExecutor implements IJobExecutor { return jobArgs; } + private static JobArgs buildMapReduceJobArgs(JobContext jobContext) { + MapReduceArgs jobArgs = new MapReduceArgs(); + jobArgs.setArgsStr(jobContext.getArgsStr()); + jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); + jobArgs.setMapName(jobContext.getMapName()); + jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); + return jobArgs; + } + protected abstract ExecuteResult doJobExecute(JobArgs jobArgs); } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java index dbe748ca..1f6b9dae 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java @@ -1,11 +1,16 @@ package com.aizuda.snailjob.client.job.core.executor; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.client.job.core.IJobExecutor; import com.aizuda.snailjob.client.job.core.client.JobNettyClient; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; +import com.aizuda.snailjob.common.core.model.MapContext; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.model.Result; import lombok.extern.slf4j.Slf4j; @@ -23,36 +28,45 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements private static final JobNettyClient CLIENT = RequestBuilder.newBuilder() .client(JobNettyClient.class) + .async(Boolean.FALSE) .build(); - public void doMapExecute(JobContext jobContext) { + @Override + protected ExecuteResult doJobExecute(final JobArgs jobArgs) { + MapReduceArgs mapReduceArgs = (MapReduceArgs) jobArgs; + return doJobExecute(mapReduceArgs); + } - List taskList = jobContext.getTaskList(); - String mapName = jobContext.getMapName(); - if (CollectionUtils.isEmpty(taskList)) { + public abstract ExecuteResult doJobExecute(MapReduceArgs mapReduceArgs); + + public void doMapExecute(List taskList, String nextMapName) { + + if (CollectionUtils.isEmpty(taskList) || StrUtil.isBlank(nextMapName)) { return; } // mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) - if ("ROOT_TASK".equals(mapName) || "ROOT_TASK".equals(mapName)) { - log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically.", jobContext.getTaskId(), mapName, mapName); - mapName = "X-" + mapName; + if ("ROOT_TASK".equals(nextMapName)) { + throw new SnailJobMapReduceException("NextMapName can not be ROOT_TASK"); } + JobContext jobContext = JobContextManager.getJobContext(); + // 1. 构造请求 MapTaskRequest mapTaskRequest = new MapTaskRequest(); mapTaskRequest.setJobId(jobContext.getJobId()); mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId()); - mapTaskRequest.setTaskName(mapName); + mapTaskRequest.setMapName(nextMapName); mapTaskRequest.setSubTask(taskList); + mapTaskRequest.setParentId(jobContext.getTaskId()); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) Result booleanResult = CLIENT.batchReportMapTask(mapTaskRequest); if (booleanResult.getData()) { - log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), mapName, taskList.size()); + log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), nextMapName, taskList.size()); } else { - throw new SnailJobMapReduceException("map failed for task: " + mapName); + throw new SnailJobMapReduceException("map failed for task: " + nextMapName); } } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java new file mode 100644 index 00000000..eaa2b6e3 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java @@ -0,0 +1,26 @@ +package com.aizuda.snailjob.client.job.core.executor; + +import com.aizuda.snailjob.common.core.model.JobContext; + +/** + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +public final class JobContextManager { + + private static final ThreadLocal JOB_CONTEXT_LOCAL = new ThreadLocal<>(); + + + public static void setJobContext(JobContext jobContext) { + JOB_CONTEXT_LOCAL.set(jobContext); + } + + public static JobContext getJobContext() { + return JOB_CONTEXT_LOCAL.get(); + } + + public static void removeJobContext() { + JOB_CONTEXT_LOCAL.remove(); + } +} diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java index 604a65ff..8418f276 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java @@ -44,7 +44,7 @@ public class DispatchJobRequest { /** * 任务名称 */ - private String taskName; + private String mapName; private String argsStr; diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java index 7f9480a2..b1f633bf 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java @@ -27,8 +27,8 @@ public class MapTaskRequest { private Long workflowNodeId; - @NotBlank(message = "taskName 不能为空") - private String taskName; + @NotBlank(message = "mapName 不能为空") + private String mapName; @NotEmpty(message = "subTask 不能为空") private List subTask; diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java new file mode 100644 index 00000000..b9434f63 --- /dev/null +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java @@ -0,0 +1,30 @@ +package com.aizuda.snailjob.common.core.model; + +import lombok.Data; + +import java.util.List; + +/** + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Data +public final class MapContext { + + /** + * Map集合列表 + */ + private List taskList; + + /** + * Map名称 + */ + private String mapName; + + private Long jobId; + + private Long taskBatchId; + + private Long taskId; +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java index 34c70595..181da8fc 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java @@ -32,11 +32,6 @@ public class JobTask extends CreateUpdateDt { */ private String groupName; - /** - * 任务名称 - */ - private String taskName; - /** * 任务信息id */ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java index cfffeffb..a09fb2a2 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java @@ -33,7 +33,7 @@ public class RealJobExecutorDTO extends BaseDTO { /** * 任务名称 */ - private String taskName; + private String mapName; /** * 扩展字段 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java index 287de28c..8dc4b223 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -3,8 +3,10 @@ package com.aizuda.snailjob.server.job.task.support.callback; import akka.actor.ActorRef; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum; import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler; @@ -51,6 +53,12 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1); realJobExecutor.setRetry(Boolean.TRUE); realJobExecutor.setRetryScene(context.getRetryScene()); + String extAttrs = jobTask.getExtAttrs(); + // TODO 待优化 + if (StrUtil.isNotBlank(extAttrs)) { + JobTaskExtAttrsDTO extAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class); + realJobExecutor.setMapName(extAttrsDTO.getMapName()); + } ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); actorRef.tell(realJobExecutor, actorRef); return; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java new file mode 100644 index 00000000..f84696c1 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java @@ -0,0 +1,56 @@ +package com.aizuda.snailjob.server.job.task.support.callback; + +import akka.actor.ActorRef; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.RandomUtil; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Set; + +/** + * @author: opensnail + * @date : 2024-06-13 21:22x + * @since : sj_1.1.0 + */ +@Component +@Slf4j +public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandler { + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.MAP_REDUCE; + } + + @Override + protected void doCallback(final ClientCallbackContext context) { + + JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); + jobExecutorResultDTO.setTaskId(context.getTaskId()); + jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); + jobExecutorResultDTO.setResult(context.getExecuteResult().getResult()); + jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType()); + + ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); + actorRef.tell(jobExecutorResultDTO, actorRef); + } + + @Override + protected String chooseNewClient(ClientCallbackContext context) { + Set nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); + if (CollUtil.isEmpty(nodes)) { + log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + return null; + } + + RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0])); + return ClientInfoUtils.generate(serverNode); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 8ffe492d..96fcb93e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; @@ -36,6 +37,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; @@ -133,6 +135,7 @@ public class JobExecutorActor extends AbstractActor { JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); instanceGenerateContext.setMapName("ROOT_TASK"); + instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY)); // TODO 此处需要判断任务类型 instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP); List taskList = taskInstance.generate(instanceGenerateContext); @@ -176,6 +179,7 @@ public class JobExecutorActor extends AbstractActor { context.setJobId(job.getId()); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); + context.setMapName("ROOT_TASK"); return context; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java index dc0b0133..6a7db868 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java @@ -90,4 +90,6 @@ public class JobExecutorContext { private Long workflowNodeId; + private String mapName; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java index 6e204dfa..3842f4e7 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java @@ -45,4 +45,9 @@ public class JobTaskGenerateContext { * 动态分片的阶段 */ private MapReduceStageEnum mrStage; + + /** + * 父任务id + */ + private Long parentId; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index b2dae57c..f3fcb9df 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -58,7 +58,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { @Override public String doHandler(final String content, final UrlQuery query, final HttpHeaders headers) { - SnailJobLog.LOCAL.debug("map task Request. content:[{}]", content); + SnailJobLog.LOCAL.info("map task Request. content:[{}]", content); String groupName = HttpHeaderUtil.getGroupName(headers); String namespace = HttpHeaderUtil.getNamespace(headers); @@ -72,10 +72,11 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { context.setGroupName(HttpHeaderUtil.getGroupName(headers)); context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); context.setMrStage(MapReduceStageEnum.MAP); + context.setMapSubTask(mapTaskRequest.getSubTask()); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { return JsonUtil.toJsonString( - new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.TRUE, retryRequest.getReqId())); + new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE, retryRequest.getReqId())); } Job job = jobMapper.selectOne(new LambdaQueryWrapper() @@ -86,7 +87,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { if (Objects.isNull(job)) { return JsonUtil.toJsonString( - new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.TRUE, + new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.FALSE, retryRequest.getReqId())); } @@ -106,6 +107,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { context.setTaskBatchId(mapTaskRequest.getTaskBatchId()); context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId()); context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId()); + context.setMapName(mapTaskRequest.getMapName()); return context; }