From 6f37691be1dca903fdce616d4886401d6b50e8c8 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Fri, 14 Jun 2024 09:12:23 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0):=20map=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=94=9F=E6=88=90=E5=92=8C=E8=B0=83=E5=BA=A6=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/client/job/core/client/JobEndPoint.java | 7 ------- .../client/job/core/executor/AbstractMapExecutor.java | 5 +++-- .../snailjob/common/core/constant/SystemConstants.java | 5 +++++ .../server/job/task/support/dispatch/JobExecutorActor.java | 6 ++++-- .../support/generator/task/MapReduceTaskGenerator.java | 3 ++- 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 38293d730..b977c5414 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -122,13 +122,6 @@ public class JobEndPoint { @Mapping(path = JOB_STOP, method = RequestMethod.POST) public Result stopJob(@Valid StopJobDTO interruptJob) { - ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); - Validator validator = vf.getValidator(); - Set> set = validator.validate(interruptJob); - for (final ConstraintViolation violation : set) { - return new Result<>(violation.getMessage(), Boolean.FALSE); - } - ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId()); if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) { return new Result<>(Boolean.TRUE); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java index 1f6b9dae3..2ba2dc634 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java @@ -8,6 +8,7 @@ import com.aizuda.snailjob.client.job.core.dto.JobArgs; import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.client.model.request.MapTaskRequest; +import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.MapContext; @@ -46,8 +47,8 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements } // mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) - if ("ROOT_TASK".equals(nextMapName)) { - throw new SnailJobMapReduceException("NextMapName can not be ROOT_TASK"); + if (SystemConstants.MAP_ROOT.equals(nextMapName)) { + throw new SnailJobMapReduceException("The Next mapName can not be {}", SystemConstants.MAP_ROOT); } JobContext jobContext = JobContextManager.getJobContext(); diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index a6dbc098a..e92c3a462 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -208,4 +208,9 @@ public interface SystemConstants { */ String YYYY_MM_DD = "yyyy-MM-dd"; + + /** + * 动态分片的root节点 + */ + String MAP_ROOT = "MAP_ROOT"; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 96fcb93e8..bfbab20e2 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -4,6 +4,7 @@ import akka.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; @@ -134,12 +135,13 @@ public class JobExecutorActor extends AbstractActor { JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType()); JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); - instanceGenerateContext.setMapName("ROOT_TASK"); + instanceGenerateContext.setMapName(SystemConstants.MAP_ROOT); instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY)); // TODO 此处需要判断任务类型 instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP); List taskList = taskInstance.generate(instanceGenerateContext); if (CollUtil.isEmpty(taskList)) { + SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId()); return; } @@ -179,7 +181,7 @@ public class JobExecutorActor extends AbstractActor { context.setJobId(job.getId()); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); - context.setMapName("ROOT_TASK"); + context.setMapName(SystemConstants.MAP_ROOT); return context; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 52254fd10..974419e34 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -114,6 +114,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { final List nodeInfoList, final Set serverNodes) { List mapSubTask = context.getMapSubTask(); if (CollUtil.isEmpty(mapSubTask)) { + SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId()); return Lists.newArrayList(); } @@ -138,6 +139,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTasks.add(jobTask); } - return Lists.newArrayList(); + return jobTasks; } }