From 5f50e31c2a1184bf0aa28bcb55ea6544eac05936 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sat, 15 Jun 2024 23:44:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.1):=20=E5=AE=8C=E6=88=90reduce=20?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/job/core/client/JobEndPoint.java | 7 +- .../snailjob/client/job/core/dto/MapArgs.java | 2 +- .../client/job/core/dto/MapReduceArgs.java | 2 +- .../core/executor/AbstractJobExecutor.java | 4 +- .../core/executor/AbstractMapExecutor.java | 32 +++--- .../executor/AbstractMapReduceExecutor.java | 6 +- .../model/request/DispatchJobRequest.java | 4 +- .../client/model/request/MapTaskRequest.java | 4 +- .../common/core/enums/MapReduceStageEnum.java | 28 ++++- .../common/core/model/JobContext.java | 4 +- .../common/core/model/MapContext.java | 2 +- .../datasource/persistence/po/JobTask.java | 18 +++ .../job/task/dto/JobExecutorResultDTO.java | 1 + .../job/task/dto/JobTaskExtAttrsDTO.java | 37 ------- .../job/task/dto/RealJobExecutorDTO.java | 10 +- .../job/task/support/JobTaskConverter.java | 4 +- .../AbstractClientCallbackHandler.java | 9 +- .../MapReduceClientCallbackHandler.java | 20 +++- .../support/dispatch/JobExecutorActor.java | 6 +- .../dispatch/JobExecutorResultActor.java | 7 ++ .../task/support/dispatch/ReduceActor.java | 40 +++++-- .../executor/job/JobExecutorContext.java | 4 - .../task/JobTaskGenerateContext.java | 4 +- .../task/MapReduceTaskGenerator.java | 84 +++++++------- .../support/handler/JobTaskBatchHandler.java | 103 +++++++----------- .../MapTaskPostHttpRequestHandler.java | 4 +- 26 files changed, 234 insertions(+), 212 deletions(-) delete mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobTaskExtAttrsDTO.java diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 61487c8cb..3025ab44b 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -21,15 +21,10 @@ import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.enums.LogTypeEnum; -import jakarta.validation.ConstraintViolation; import jakarta.validation.Valid; -import jakarta.validation.Validation; -import jakarta.validation.Validator; -import jakarta.validation.ValidatorFactory; import org.springframework.validation.annotation.Validated; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH; @@ -119,7 +114,7 @@ public class JobEndPoint { jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId()); jobContext.setRetry(dispatchJob.isRetry()); jobContext.setRetryScene(dispatchJob.getRetryScene()); - jobContext.setMapName(dispatchJob.getMapName()); + jobContext.setTaskName(dispatchJob.getTaskName()); jobContext.setMrStage(dispatchJob.getMrStage()); return jobContext; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java index c6310a5e3..8c18744dd 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java @@ -13,6 +13,6 @@ import lombok.EqualsAndHashCode; @Data public class MapArgs extends JobArgs { - private String mapName; + private String taskName; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java index 4cead9a2f..d4e0a3e3c 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java @@ -14,7 +14,7 @@ import java.util.List; @Data public class MapReduceArgs extends JobArgs { - private String mapName; + private String taskName; private List mapArgsList; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 951c910cc..a081d8615 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -50,7 +50,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { jobArgs = buildShardingJobArgs(jobContext); } else if (Lists.newArrayList(JobTaskTypeEnum.MAP_REDUCE.getType(), JobTaskTypeEnum.MAP.getType()) .contains(jobContext.getTaskType())) { - if (MapReduceStageEnum.MAP.name().equals(jobContext.getMrStage())) { + if (MapReduceStageEnum.MAP.getStage() == jobContext.getMrStage()) { jobArgs = buildMapJobArgs(jobContext); } else { jobArgs = buildReduceJobArgs(jobContext); @@ -106,7 +106,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { MapArgs jobArgs = new MapArgs(); jobArgs.setArgsStr(jobContext.getArgsStr()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); - jobArgs.setMapName(jobContext.getMapName()); + jobArgs.setTaskName(jobContext.getTaskName()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); return jobArgs; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java index fc551d6e7..c5a6aa47e 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java @@ -13,6 +13,7 @@ import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.log.SnailJobLog; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; @@ -39,15 +40,19 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs); - public void doMapExecute(List taskList, String nextMapName) { + public ExecuteResult doMap(List taskList, String nextTaskName) { - if (CollectionUtils.isEmpty(taskList) || StrUtil.isBlank(nextMapName)) { - return; + if (StrUtil.isBlank(nextTaskName)) { + throw new SnailJobMapReduceException("The next task name can not blank or null {}", nextTaskName); } - // mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) - if (SystemConstants.MAP_ROOT.equals(nextMapName)) { - throw new SnailJobMapReduceException("The Next mapName can not be {}", SystemConstants.MAP_ROOT); + if (CollectionUtils.isEmpty(taskList)) { + throw new SnailJobMapReduceException("The task list can not empty {}", nextTaskName); + } + + // taskName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) + if (SystemConstants.MAP_ROOT.equals(nextTaskName)) { + throw new SnailJobMapReduceException("The Next taskName can not be {}", SystemConstants.MAP_ROOT); } JobContext jobContext = JobContextManager.getJobContext(); @@ -56,17 +61,18 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements MapTaskRequest mapTaskRequest = new MapTaskRequest(); mapTaskRequest.setJobId(jobContext.getJobId()); mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId()); - mapTaskRequest.setMapName(nextMapName); + mapTaskRequest.setTaskName(nextTaskName); mapTaskRequest.setSubTask(taskList); mapTaskRequest.setParentId(jobContext.getTaskId()); - // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) - Result booleanResult = CLIENT.batchReportMapTask(mapTaskRequest); - - if (booleanResult.getData()) { - log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), nextMapName, taskList.size()); + // 2. 同步发送请求 + Result result = CLIENT.batchReportMapTask(mapTaskRequest); + if (result.getData()) { + SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId()); } else { - throw new SnailJobMapReduceException("map failed for task: " + nextMapName); + throw new SnailJobMapReduceException("map failed for task: " + nextTaskName); } + + return ExecuteResult.success(); } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java index 0d5a776b4..0c801e233 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java @@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; @@ -20,12 +21,13 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor { @Override public ExecuteResult doJobExecute(final JobArgs jobArgs) { JobContext jobContext = JobContextManager.getJobContext(); - if (jobContext.getMrStage().equals("MAP")) { + if (jobContext.getMrStage().equals(MapReduceStageEnum.MAP.getStage())) { return super.doJobExecute(jobArgs); - } else if(jobContext.getMrStage().equals("REDUCE")) { + } else if(jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) { ReduceArgs reduceArgs = (ReduceArgs) jobArgs; return doReduceExecute(reduceArgs); } + throw new SnailJobMapReduceException("非法的MapReduceStage"); } diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java index ee076dcc3..b6ccf2d41 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java @@ -44,9 +44,9 @@ public class DispatchJobRequest { /** * 任务名称 */ - private String mapName; + private String taskName; - private String mrStage; + private Integer mrStage; private String argsStr; diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java index b1f633bf3..7f9480a2e 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java @@ -27,8 +27,8 @@ public class MapTaskRequest { private Long workflowNodeId; - @NotBlank(message = "mapName 不能为空") - private String mapName; + @NotBlank(message = "taskName 不能为空") + private String taskName; @NotEmpty(message = "subTask 不能为空") private List subTask; diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/MapReduceStageEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/MapReduceStageEnum.java index b0642723d..6a7c33d68 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/MapReduceStageEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/MapReduceStageEnum.java @@ -1,11 +1,35 @@ package com.aizuda.snailjob.common.core.enums; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Objects; + /** * @author: opensnail * @date : 2024-06-12 * @since : sj_1.1.0 */ +@Getter +@AllArgsConstructor public enum MapReduceStageEnum { - MAP, - REDUCE + MAP(1), + REDUCE(2), + MERGE_REDUCE(3); + + private final int stage; + + public static MapReduceStageEnum ofStage(Integer stage) { + if (Objects.isNull(stage)) { + return null; + } + + for (MapReduceStageEnum value : MapReduceStageEnum.values()) { + if (value.getStage() == stage) { + return value; + } + } + + return null; + } } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java index 520efa1c3..5b976dc1f 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java @@ -57,7 +57,7 @@ public class JobContext { /** * Map名称 */ - private String mapName; + private String taskName; - private String mrStage; + private Integer mrStage; } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java index b9434f63e..77b295530 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/MapContext.java @@ -20,7 +20,7 @@ public final class MapContext { /** * Map名称 */ - private String mapName; + private String taskName; private Long jobId; diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java index a5490561a..00891a6c8 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java @@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; +import java.io.Serial; import java.io.Serializable; import java.time.LocalDateTime; @@ -19,6 +20,7 @@ import java.time.LocalDateTime; @TableName("sj_job_task") public class JobTask implements Serializable { + @Serial private static final long serialVersionUID = 1L; /** @@ -37,6 +39,11 @@ public class JobTask implements Serializable { */ private String groupName; + /** + * 任务名称 + */ + private String taskName; + /** * 任务信息id */ @@ -83,6 +90,17 @@ public class JobTask implements Serializable { */ private Integer argsType; + /** + * 叶子节点(0:非叶子节点 1:叶子节点) + */ + private Integer leaf; + + /** + * 动态分片使用 + * 1:map 2:reduce 3:mergeReduce + */ + private Integer mrStage; + /** * 扩展字段 */ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java index c2a58226c..b0951098a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java @@ -40,5 +40,6 @@ public class JobExecutorResultDTO { private Integer jobOperationReason; + private Integer isLeaf; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobTaskExtAttrsDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobTaskExtAttrsDTO.java deleted file mode 100644 index 8d12a16e1..000000000 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobTaskExtAttrsDTO.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.aizuda.snailjob.server.job.task.dto; - -import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; -import lombok.Data; - -/** - * @author: opensnail - * @date : 2024-06-12 - * @since : sj_1.1.0 - */ -@Data -public class JobTaskExtAttrsDTO { - - /** - * 任务名称(目前只有map reduce使用) - */ - private String mapName; - - /** - * see: {@link JobTaskTypeEnum} - */ - private Integer taskType; - - /** - * 当前任务处于map reduce的哪个阶段 - * see: {@link MapReduceStageEnum} - */ - private String mrStage; - - - @Override - public String toString() { - return JsonUtil.toJsonString(this); - } -} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java index 9f7e66397..a7e865908 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.job.task.dto; +import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import lombok.Data; import lombok.EqualsAndHashCode; @@ -31,14 +32,15 @@ public class RealJobExecutorDTO extends BaseDTO { private String argsType; /** - * MAP名称 + * 任务名称 */ - private String mapName; + private String taskName; /** - * MAP名称 + * 动态分片的阶段 + * {@link MapReduceStageEnum} */ - private String mrState; + private Integer mrStage; /** * 扩展字段 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java index f63afb44d..0b9069b25 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java @@ -104,7 +104,9 @@ public interface JobTaskConverter { @Mapping(source = "jobTask.argsStr", target = "argsStr"), @Mapping(source = "jobTask.argsType", target = "argsType"), @Mapping(source = "jobTask.extAttrs", target = "extAttrs"), - @Mapping(source = "jobTask.namespaceId", target = "namespaceId") + @Mapping(source = "jobTask.namespaceId", target = "namespaceId"), + @Mapping(source = "jobTask.taskName", target = "taskName"), + @Mapping(source = "jobTask.mrStage", target = "mrStage") }) RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java index 8dc4b2235..11d6eb4ec 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -3,10 +3,8 @@ package com.aizuda.snailjob.server.job.task.support.callback; import akka.actor.ActorRef; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; -import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; -import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO; import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum; import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler; @@ -53,12 +51,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1); realJobExecutor.setRetry(Boolean.TRUE); realJobExecutor.setRetryScene(context.getRetryScene()); - String extAttrs = jobTask.getExtAttrs(); - // TODO 待优化 - if (StrUtil.isNotBlank(extAttrs)) { - JobTaskExtAttrsDTO extAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class); - realJobExecutor.setMapName(extAttrsDTO.getMapName()); - } + realJobExecutor.setTaskName(jobTask.getTaskName()); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); actorRef.tell(realJobExecutor, actorRef); return; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java index f84696c1d..cf0a5f46d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapReduceClientCallbackHandler.java @@ -2,17 +2,25 @@ package com.aizuda.snailjob.server.job.task.support.callback; import akka.actor.ActorRef; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import cn.hutool.core.util.RandomUtil; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; -import lombok.extern.slf4j.Slf4j; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; +import java.util.Objects; import java.util.Set; /** @@ -21,8 +29,9 @@ import java.util.Set; * @since : sj_1.1.0 */ @Component -@Slf4j +@RequiredArgsConstructor public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandler { + private final JobTaskMapper jobTaskMapper; @Override public JobTaskTypeEnum getTaskInstanceType() { @@ -31,13 +40,16 @@ public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandle @Override protected void doCallback(final ClientCallbackContext context) { + JobTask jobTask = jobTaskMapper.selectOne(new LambdaQueryWrapper() + .eq(JobTask::getId, context.getTaskId())); + Assert.notNull(jobTask, () -> new SnailJobServerException("job task is null")); JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); jobExecutorResultDTO.setResult(context.getExecuteResult().getResult()); jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType()); - + jobExecutorResultDTO.setIsLeaf(jobTask.getLeaf()); ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); actorRef.tell(jobExecutorResultDTO, actorRef); } @@ -46,7 +58,7 @@ public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandle protected String chooseNewClient(ClientCallbackContext context) { Set nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); if (CollUtil.isEmpty(nodes)) { - log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return null; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 43db0e2fa..d49a1873e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -135,10 +135,10 @@ public class JobExecutorActor extends AbstractActor { JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType()); JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); - instanceGenerateContext.setMapName(SystemConstants.MAP_ROOT); + instanceGenerateContext.setTaskName(SystemConstants.MAP_ROOT); instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY)); // TODO 此处需要判断任务类型 - instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP); + instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP.getStage()); List taskList = taskInstance.generate(instanceGenerateContext); if (CollUtil.isEmpty(taskList)) { SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId()); @@ -181,8 +181,6 @@ public class JobExecutorActor extends AbstractActor { context.setJobId(job.getId()); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); - context.setMapName(SystemConstants.MAP_ROOT); - context.setMrStage(MapReduceStageEnum.MAP.name()); return context; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 54e70aa0c..2f12d4287 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; @@ -57,6 +58,12 @@ public class JobExecutorResultActor extends AbstractActor { Assert.isTrue(1 == jobTaskMapper.update(jobTask, new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), () -> new SnailJobServerException("更新任务实例失败")); + + // 除MAP和MAP_REDUCE 任务之外,其他任务都是叶子节点 + if (Objects.nonNull(result.getIsLeaf()) && StatusEnum.NO.getStatus().equals(result.getIsLeaf())) { + return; + } + // 先尝试完成,若已完成则不需要通过获取分布式锁来完成 boolean tryCompleteAndStop = tryCompleteAndStop(result); if (!tryCompleteAndStop) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index 99cfabe5f..2d6acf73d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -3,10 +3,11 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; @@ -16,8 +17,11 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; @@ -41,18 +45,19 @@ public class ReduceActor extends AbstractActor { private static final String KEY = "job_generate_reduce_{0}_{1}"; private final DistributedLockHandler distributedLockHandler; private final JobMapper jobMapper; - - + private final JobTaskMapper jobTaskMapper; @Override public Receive createReceive() { return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> { - + SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask)); try { distributedLockHandler.lockWithDisposableAndRetry(() -> { - doReduce(reduceTask); - }, MessageFormat.format(KEY, reduceTask.getTaskBatchId(), - reduceTask.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); + doReduce(reduceTask); + }, MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()), + Duration.ofSeconds(1), + Duration.ofSeconds(1), + 3); } catch (Exception e) { SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e); } @@ -62,13 +67,29 @@ public class ReduceActor extends AbstractActor { private void doReduce(final ReduceTaskDTO reduceTask) { + List jobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1), + new LambdaQueryWrapper() + .select(JobTask::getId) + .eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId()) + .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) + ); + + if (CollUtil.isNotEmpty(jobTasks)) { + // 说明已经创建了reduce任务了 + return; + } + Job job = jobMapper.selectById(reduceTask.getJobId()); + // 非MAP_REDUCE任务不处理 + if (JobTaskTypeEnum.MAP_REDUCE.getType() != job.getTaskType()) { + return; + } // 创建reduce任务 JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); context.setTaskBatchId(reduceTask.getTaskBatchId()); - context.setMrStage(MapReduceStageEnum.REDUCE); + context.setMrStage(MapReduceStageEnum.REDUCE.getStage()); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId()); @@ -82,13 +103,12 @@ public class ReduceActor extends AbstractActor { } private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job, - List taskList) { + List taskList) { JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); context.setTaskList(taskList); context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId()); context.setWorkflowNodeId(reduceTask.getWorkflowNodeId()); - context.setMrStage(MapReduceStageEnum.REDUCE.name()); return context; } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java index 5cd27e974..dc0b01336 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java @@ -90,8 +90,4 @@ public class JobExecutorContext { private Long workflowNodeId; - private String mapName; - - private String mrStage; - } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java index 6e3094a1b..cea89c8d4 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java @@ -39,12 +39,12 @@ public class JobTaskGenerateContext { /** * 任务名称 */ - private String mapName; + private String taskName; /** * 动态分片的阶段 */ - private MapReduceStageEnum mrStage; + private Integer mrStage; /** * 父任务id diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 9ffb5039a..65b614f38 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -5,6 +5,8 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -12,24 +14,20 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; -import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; -import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.Set; +import java.util.*; /** * 生成Map Reduce任务 @@ -54,15 +52,18 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { protected List doGenerate(final JobTaskGenerateContext context) { // TODO 若没有客户端节点JobTask是否需要创建???? Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), - context.getNamespaceId()); + context.getNamespaceId()); if (CollUtil.isEmpty(serverNodes)) { SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return Lists.newArrayList(); } List nodeInfoList = new ArrayList<>(serverNodes); + MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage()); + Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed")); - switch (context.getMrStage()) { + // todo 待优化 + switch (mapReduceStageEnum) { case MAP -> { // MAP任务 return createMapJobTasks(context, nodeInfoList, serverNodes); @@ -76,37 +77,24 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { } private List createReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, - Set serverNodes) { + Set serverNodes) { // TODO reduce阶段的并行度 - int reduceParallel = 10; + int reduceParallel = 2; List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage, JobTask::getExtAttrs) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); - - // 若存在已经生成的reduce任务不需要重新生成 - boolean existedReduce = jobTasks.stream() - .filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs())) - .map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class)) - .anyMatch(jobTaskExtAttrsDTO -> MapReduceStageEnum.REDUCE.name().equals(jobTaskExtAttrsDTO.getMrStage())); - if (existedReduce) { - SnailJobLog.LOCAL.warn("The reduce task already exists. taskBatchId:[{}]", context.getTaskBatchId()); - return Lists.newArrayList(); - } + .select(JobTask::getResultMessage) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + .eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + ); // 这里需要判断是否是map List allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage); List> partition = Lists.partition(allMapJobTasks, reduceParallel); - JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO(); - jobTaskExtAttrsDTO.setMapName(context.getMapName()); - jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType()); - jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name()); - jobTasks = new ArrayList<>(partition.size()); - final List finalJobTasks = jobTasks; transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override @@ -120,9 +108,11 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index))); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString()); + jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage()); + jobTask.setTaskName("REDUCE_TASK"); + // TODO 改批量插入 Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); + () -> new SnailJobServerException("新增任务实例失败")); finalJobTasks.add(jobTask); } } @@ -131,18 +121,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { return finalJobTasks; } - private @Nullable List createMapJobTasks(final JobTaskGenerateContext context, - final List nodeInfoList, final Set serverNodes) { + @NotNull + private List createMapJobTasks(final JobTaskGenerateContext context, + final List nodeInfoList, final Set serverNodes) { List mapSubTask = context.getMapSubTask(); if (CollUtil.isEmpty(mapSubTask)) { SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId()); return Lists.newArrayList(); } - JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO(); - jobTaskExtAttrsDTO.setMapName(context.getMapName()); - jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType()); - jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name()); + // 判定父节点是不是叶子节点,若是则不更新否则更新为非叶子节点 + List parentJobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1), + new LambdaQueryWrapper().select(JobTask::getId) + .eq(JobTask::getId, context.getParentId()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + ); + List jobTasks = new ArrayList<>(mapSubTask.size()); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @@ -157,12 +151,24 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setArgsType(context.getArgsType()); jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index))); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); + jobTask.setTaskName(context.getTaskName()); + jobTask.setLeaf(StatusEnum.YES.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString()); + // TODO 改批量插入 Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); + () -> new SnailJobServerException("新增任务实例失败")); jobTasks.add(jobTask); } + + // 更新父节点的为非叶子节点 + if (CollUtil.isNotEmpty(parentJobTasks)) { + JobTask parentJobTask = new JobTask(); + parentJobTask.setId(context.getParentId()); + parentJobTask.setLeaf(StatusEnum.NO.getStatus()); + jobTaskMapper.updateById(parentJobTask); + } + } }); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 12199822d..98430e06a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -16,12 +16,7 @@ import com.aizuda.snailjob.server.common.dto.DistributeInstance; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; -import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO; -import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; -import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; -import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; +import com.aizuda.snailjob.server.job.task.dto.*; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; @@ -48,6 +43,8 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP; + /** * @author: opensnail * @date : 2023-10-10 16:50 @@ -62,29 +59,24 @@ public class JobTaskBatchHandler { private final WorkflowBatchHandler workflowBatchHandler; private final GroupConfigMapper groupConfigMapper; - @Transactional public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) { List jobTasks = jobTaskMapper.selectList( - new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getExtAttrs) - .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); + new LambdaQueryWrapper() + .select(JobTask::getTaskStatus, JobTask::getMrStage) + .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); + + if (CollUtil.isEmpty(jobTasks) || + jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { + return false; + } - SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks)); JobTaskBatch jobTaskBatch = new JobTaskBatch(); jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId()); - if (CollUtil.isEmpty(jobTasks)) { - return false; - } - - if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { - return false; - } - Map statusCountMap = jobTasks.stream() - .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); + .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); @@ -95,8 +87,10 @@ public class JobTaskBatchHandler { } else if (stopCount > 0) { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); } else { - jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); + // todo 调试完成删除 + SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks)); + jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); if (needReduceTask(completeJobBatchDTO, jobTasks)) { return false; } @@ -115,9 +109,9 @@ public class JobTaskBatchHandler { jobTaskBatch.setUpdateDt(LocalDateTime.now()); return 1 == jobTaskBatchMapper.update(jobTaskBatch, - new LambdaUpdateWrapper() - .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) ); } @@ -126,26 +120,19 @@ public class JobTaskBatchHandler { * 若需要执行reduce则返回false 不需要更新批次状态, 否则需要更新批次状态 * * @param completeJobBatchDTO 需要执行批次完成所需的参数信息 - * @param jobTasks 任务项列表 + * @param jobTasks 任务项列表 * @return true-需要reduce false-不需要reduce */ private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List jobTasks) { // 判断是否是mapreduce任务 - // todo 此处待优化 - JobTask firstJobTask = jobTasks.get(0); - String extAttrs = firstJobTask.getExtAttrs(); - if (StrUtil.isNotBlank(extAttrs)) { - JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class); - Integer taskType = jobTaskExtAttrsDTO.getTaskType(); - if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) { - // 开启reduce阶段 - try { - ActorRef actorRef = ActorGenerator.jobReduceActor(); - actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); - return true; - } catch (Exception e) { - SnailJobLog.LOCAL.error("tell reduce actor error", e); - } + if (isAllMapTask(jobTasks)) { + // 开启reduce阶段 + try { + ActorRef actorRef = ActorGenerator.jobReduceActor(); + actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); + return true; + } catch (Exception e) { + SnailJobLog.LOCAL.error("tell reduce actor error", e); } } @@ -153,17 +140,9 @@ public class JobTaskBatchHandler { } private static boolean isAllMapTask(final List jobTasks) { - return jobTasks.size() == jobTasks.stream() - .filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs())) - .map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class)) - .filter(jobTaskExtAttrsDTO -> { - String mrStage = jobTaskExtAttrsDTO.getMrStage(); - if (StrUtil.isNotBlank(mrStage) && MapReduceStageEnum.MAP.name().equals(mrStage)) { - return true; - } - - return false; - }).count(); + return jobTasks.size() == jobTasks.stream() + .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage()) + .count(); } /** @@ -174,21 +153,21 @@ public class JobTaskBatchHandler { */ public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { if (Objects.isNull(job) - || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - // 是否是常驻任务 - || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) - // 防止任务已经分配到其他节点导致的任务重复执行 - || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) + || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + // 是否是常驻任务 + || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) + // 防止任务已经分配到其他节点导致的任务重复执行 + || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) ) { return; } long count = groupConfigMapper.selectCount(new LambdaQueryWrapper() - .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) - .eq(GroupConfig::getGroupName, job.getGroupName()) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); + .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) + .eq(GroupConfig::getGroupName, job.getGroupName()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); if (count == 0) { return; } @@ -215,7 +194,7 @@ public class JobTaskBatchHandler { Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000); log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, - DateUtils.toNowMilli() % 1000); + DateUtils.toNowMilli() % 1000); job.setNextTriggerAt(nextTriggerAt); JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration); ResidentTaskCache.refresh(job.getId(), nextTriggerAt); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index af9bc9358..7fdb2b4b5 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -71,7 +71,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest); context.setGroupName(HttpHeaderUtil.getGroupName(headers)); context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); - context.setMrStage(MapReduceStageEnum.MAP); + context.setMrStage(MapReduceStageEnum.MAP.getStage()); context.setMapSubTask(mapTaskRequest.getSubTask()); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { @@ -107,8 +107,6 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { context.setTaskBatchId(mapTaskRequest.getTaskBatchId()); context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId()); context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId()); - context.setMapName(mapTaskRequest.getMapName()); - context.setMrStage(MapReduceStageEnum.MAP.name()); return context; }