diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java
index f7e0ef23..8a80e5e3 100644
--- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java
@@ -69,7 +69,7 @@ public class JobEndPoint {
if (IJobExecutor.class.isAssignableFrom(executor.getClass())) {
if (JobTaskTypeEnum.MAP.getType() == jobContext.getTaskType()) {
jobExecutor = (AbstractMapExecutor) executor;
- } else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskId()) {
+ } else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskType()) {
jobExecutor = (AbstractMapReduceExecutor) executor;
} else {
jobExecutor = (AbstractJobExecutor) executor;
@@ -119,6 +119,7 @@ public class JobEndPoint {
jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId());
jobContext.setRetry(dispatchJob.isRetry());
jobContext.setRetryScene(dispatchJob.getRetryScene());
+ jobContext.setMapName(dispatchJob.getMapName());
return jobContext;
}
diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java
new file mode 100644
index 00000000..f29b83cb
--- /dev/null
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java
@@ -0,0 +1,16 @@
+package com.aizuda.snailjob.client.job.core.dto;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * @author: opensnail
+ * @date : 2024-06-13
+ * @since : sj_1.1.0
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class MapReduceArgs extends JobArgs {
+
+ private String mapName;
+}
diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java
index ef882102..8e5cdffd 100644
--- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java
@@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.cache.FutureCache;
import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
+import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ShardingJobArgs;
import com.aizuda.snailjob.client.job.core.log.JobLogMeta;
import com.aizuda.snailjob.client.job.core.timer.StopTaskTimerTask;
@@ -47,6 +48,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
JobArgs jobArgs;
if (jobContext.getTaskType() == JobTaskTypeEnum.SHARDING.getType()) {
jobArgs = buildShardingJobArgs(jobContext);
+ } else if (jobContext.getTaskType() == JobTaskTypeEnum.MAP_REDUCE.getType()) {
+ jobArgs = buildMapReduceJobArgs(jobContext);
} else {
jobArgs = buildJobArgs(jobContext);
}
@@ -73,6 +76,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
logMeta.setJobId(jobContext.getJobId());
logMeta.setTaskBatchId(jobContext.getTaskBatchId());
SnailJobLogManager.initLogInfo(logMeta, LogTypeEnum.JOB);
+ JobContextManager.setJobContext(jobContext);
}
private static JobArgs buildJobArgs(JobContext jobContext) {
@@ -92,5 +96,14 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
return jobArgs;
}
+ private static JobArgs buildMapReduceJobArgs(JobContext jobContext) {
+ MapReduceArgs jobArgs = new MapReduceArgs();
+ jobArgs.setArgsStr(jobContext.getArgsStr());
+ jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
+ jobArgs.setMapName(jobContext.getMapName());
+ jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
+ return jobArgs;
+ }
+
protected abstract ExecuteResult doJobExecute(JobArgs jobArgs);
}
diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java
index dbe748ca..1f6b9dae 100644
--- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java
@@ -1,11 +1,16 @@
package com.aizuda.snailjob.client.job.core.executor;
+import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
+import com.aizuda.snailjob.client.job.core.dto.JobArgs;
+import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
+import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
+import com.aizuda.snailjob.common.core.model.MapContext;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.Result;
import lombok.extern.slf4j.Slf4j;
@@ -23,36 +28,45 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
.client(JobNettyClient.class)
+ .async(Boolean.FALSE)
.build();
- public void doMapExecute(JobContext jobContext) {
+ @Override
+ protected ExecuteResult doJobExecute(final JobArgs jobArgs) {
+ MapReduceArgs mapReduceArgs = (MapReduceArgs) jobArgs;
+ return doJobExecute(mapReduceArgs);
+ }
- List<Object> taskList = jobContext.getTaskList();
- String mapName = jobContext.getMapName();
- if (CollectionUtils.isEmpty(taskList)) {
+ public abstract ExecuteResult doJobExecute(MapReduceArgs mapReduceArgs);
+
+ public void doMapExecute(List<Object> taskList, String nextMapName) {
+
+ if (CollectionUtils.isEmpty(taskList) || StrUtil.isBlank(nextMapName)) {
return;
}
// mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
- if ("ROOT_TASK".equals(mapName) || "ROOT_TASK".equals(mapName)) {
- log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically.", jobContext.getTaskId(), mapName, mapName);
- mapName = "X-" + mapName;
+ if ("ROOT_TASK".equals(nextMapName)) {
+ throw new SnailJobMapReduceException("NextMapName can not be ROOT_TASK");
}
+ JobContext jobContext = JobContextManager.getJobContext();
+
// 1. 构造请求
MapTaskRequest mapTaskRequest = new MapTaskRequest();
mapTaskRequest.setJobId(jobContext.getJobId());
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
- mapTaskRequest.setTaskName(mapName);
+ mapTaskRequest.setMapName(nextMapName);
mapTaskRequest.setSubTask(taskList);
+ mapTaskRequest.setParentId(jobContext.getTaskId());
// 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
Result<Boolean> booleanResult = CLIENT.batchReportMapTask(mapTaskRequest);
if (booleanResult.getData()) {
- log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), mapName, taskList.size());
+ log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), nextMapName, taskList.size());
} else {
- throw new SnailJobMapReduceException("map failed for task: " + mapName);
+ throw new SnailJobMapReduceException("map failed for task: " + nextMapName);
}
}
}
diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java
new file mode 100644
index 00000000..eaa2b6e3
--- /dev/null
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java
@@ -0,0 +1,26 @@
+package com.aizuda.snailjob.client.job.core.executor;
+
+import com.aizuda.snailjob.common.core.model.JobContext;
+
+/**
+ * @author: opensnail
+ * @date : 2024-06-13
+ * @since : sj_1.1.0
+ */
+public final class JobContextManager {
+
+ private static final ThreadLocal<JobContext> JOB_CONTEXT_LOCAL = new ThreadLocal<>();
+
+
+ public static void setJobContext(JobContext jobContext) {
+ JOB_CONTEXT_LOCAL.set(jobContext);
+ }
+
+ public static JobContext getJobContext() {
+ return JOB_CONTEXT_LOCAL.get();
+ }
+
+ public static void removeJobContext() {
+ JOB_CONTEXT_LOCAL.remove();
+ }
+}
diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
index 604a65ff..8418f276 100644
--- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
+++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
@@ -44,7 +44,7 @@ public class DispatchJobRequest {
/**
* 任务名称
*/
- private String taskName;
+ private String mapName;
private String argsStr;
diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java
index 7f9480a2..b1f633bf 100644
--- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java
+++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java
@@ -27,8 +27,8 @@ public class MapTaskRequest {
private Long workflowNodeId;
- @NotBlank(message = "taskName 不能为空")
- private String taskName;
+ @NotBlank(message = "mapName 不能为空")
+ private String mapName;
@NotEmpty(message = "subTask 不能为空")
private List<Object> subTask;
diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java
new file mode 100644
index 00000000..b9434f63
--- /dev/null
+++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java
@@ -0,0 +1,30 @@
+package com.aizuda.snailjob.common.core.model;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * @author: opensnail
+ * @date : 2024-06-13
+ * @since : sj_1.1.0
+ */
+@Data
+public final class MapContext {
+
+ /**
+ * Map集合列表
+ */
+ private List<Object> taskList;
+
+ /**
+ * Map名称
+ */
+ private String mapName;
+
+ private Long jobId;
+
+ private Long taskBatchId;
+
+ private Long taskId;
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java
index e9c1bcc5..a5490561 100644
--- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java
@@ -37,11 +37,6 @@ public class JobTask implements Serializable {
*/
private String groupName;
- /**
- * 任务名称
- */
- private String taskName;
-
/**
* 任务信息id
*/
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
index cfffeffb..a09fb2a2 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
@@ -33,7 +33,7 @@ public class RealJobExecutorDTO extends BaseDTO {
/**
* 任务名称
*/
- private String taskName;
+ private String mapName;
/**
* 扩展字段
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java
index 287de28c..8dc4b223 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java
@@ -3,8 +3,10 @@ package com.aizuda.snailjob.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
+import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
+import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
@@ -51,6 +53,12 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
+ String extAttrs = jobTask.getExtAttrs();
+ // TODO 待优化
+ if (StrUtil.isNotBlank(extAttrs)) {
+ JobTaskExtAttrsDTO extAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
+ realJobExecutor.setMapName(extAttrsDTO.getMapName());
+ }
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
return;
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java
new file mode 100644
index 00000000..f84696c1
--- /dev/null
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java
@@ -0,0 +1,56 @@
+package com.aizuda.snailjob.server.job.task.support.callback;
+
+import akka.actor.ActorRef;
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.RandomUtil;
+import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
+import com.aizuda.snailjob.server.common.akka.ActorGenerator;
+import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
+import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
+import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
+import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
+import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Set;
+
+/**
+ * @author: opensnail
+ * @date : 2024-06-13 21:22x
+ * @since : sj_1.1.0
+ */
+@Component
+@Slf4j
+public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandler {
+
+ @Override
+ public JobTaskTypeEnum getTaskInstanceType() {
+ return JobTaskTypeEnum.MAP_REDUCE;
+ }
+
+ @Override
+ protected void doCallback(final ClientCallbackContext context) {
+
+ JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
+ jobExecutorResultDTO.setTaskId(context.getTaskId());
+ jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
+ jobExecutorResultDTO.setResult(context.getExecuteResult().getResult());
+ jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType());
+
+ ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
+ actorRef.tell(jobExecutorResultDTO, actorRef);
+ }
+
+ @Override
+ protected String chooseNewClient(ClientCallbackContext context) {
+ Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
+ if (CollUtil.isEmpty(nodes)) {
+ log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
+ return null;
+ }
+
+ RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0]));
+ return ClientInfoUtils.generate(serverNode);
+ }
+}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
index 8ffe492d..96fcb93e 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
@@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
+import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
@@ -36,6 +37,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@@ -133,6 +135,7 @@ public class JobExecutorActor extends AbstractActor {
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
instanceGenerateContext.setMapName("ROOT_TASK");
+ instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
// TODO 此处需要判断任务类型
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
@@ -176,6 +179,7 @@ public class JobExecutorActor extends AbstractActor {
context.setJobId(job.getId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
+ context.setMapName("ROOT_TASK");
return context;
}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
index dc0b0133..6a7db868 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
@@ -90,4 +90,6 @@ public class JobExecutorContext {
private Long workflowNodeId;
+ private String mapName;
+
}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java
index 6e204dfa..3842f4e7 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java
@@ -45,4 +45,9 @@ public class JobTaskGenerateContext {
* 动态分片的阶段
*/
private MapReduceStageEnum mrStage;
+
+ /**
+ * 父任务id
+ */
+ private Long parentId;
}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java
index b2dae57c..f3fcb9df 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java
@@ -58,7 +58,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
@Override
public String doHandler(final String content, final UrlQuery query, final HttpHeaders headers) {
- SnailJobLog.LOCAL.debug("map task Request. content:[{}]", content);
+ SnailJobLog.LOCAL.info("map task Request. content:[{}]", content);
String groupName = HttpHeaderUtil.getGroupName(headers);
String namespace = HttpHeaderUtil.getNamespace(headers);
@@ -72,10 +72,11 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
context.setMrStage(MapReduceStageEnum.MAP);
+ context.setMapSubTask(mapTaskRequest.getSubTask());
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
return JsonUtil.toJsonString(
- new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.TRUE, retryRequest.getReqId()));
+ new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE, retryRequest.getReqId()));
}
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
@@ -86,7 +87,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
if (Objects.isNull(job)) {
return JsonUtil.toJsonString(
- new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.TRUE,
+ new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.FALSE,
retryRequest.getReqId()));
}
@@ -106,6 +107,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setTaskBatchId(mapTaskRequest.getTaskBatchId());
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
+ context.setMapName(mapTaskRequest.getMapName());
return context;
}