From a64ac93b06d1a580b2fc0178022cc150e97c937c Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Fri, 14 Jun 2024 15:00:57 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0):=20reduce=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=88=9B=E5=BB=BA=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/request/DispatchJobRequest.java | 2 ++ .../job/task/dto/CompleteJobBatchDTO.java | 1 + .../job/task/dto/RealJobExecutorDTO.java | 7 ++++++- .../support/dispatch/JobExecutorActor.java | 1 + .../dispatch/JobExecutorResultActor.java | 14 ++++++------- .../task/support/dispatch/ReduceActor.java | 3 +++ .../executor/job/JobExecutorContext.java | 2 ++ .../support/handler/JobTaskBatchHandler.java | 20 +++++++++---------- .../MapTaskPostHttpRequestHandler.java | 1 + 9 files changed, 32 insertions(+), 19 deletions(-) diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java index 8418f276c..ee076dcc3 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java @@ -46,6 +46,8 @@ public class DispatchJobRequest { */ private String mapName; + private String mrStage; + private String argsStr; private Integer shardingTotal; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java index e56f62430..9eb50a1b8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java @@ -11,6 +11,7 @@ import lombok.Data; @Data public class CompleteJobBatchDTO { + private Long jobId; private Long workflowNodeId; private Long workflowTaskBatchId; private Long taskBatchId; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java index a09fb2a24..9f7e66397 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java @@ -31,10 +31,15 @@ public class RealJobExecutorDTO extends BaseDTO { private String argsType; /** - * 任务名称 + * MAP名称 */ private String mapName; + /** + * MAP名称 + */ + private String mrState; + /** * 扩展字段 */ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index bfbab20e2..72b65165e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -182,6 +182,7 @@ public class JobExecutorActor extends AbstractActor { context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); context.setMapName(SystemConstants.MAP_ROOT); + context.setMrStage(MapReduceStageEnum.MAP.name()); return context; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 219456634..54e70aa0c 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -18,6 +18,7 @@ import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -35,20 +36,17 @@ import java.util.Objects; */ @Component(ActorGenerator.JOB_EXECUTOR_RESULT_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -@Slf4j +@RequiredArgsConstructor public class JobExecutorResultActor extends AbstractActor { private static final String KEY = "job_complete_{0}_{1}"; - @Autowired - private JobTaskMapper jobTaskMapper; - @Autowired - private JobTaskBatchHandler jobTaskBatchHandler; - @Autowired - private DistributedLockHandler distributedLockHandler; + private final JobTaskMapper jobTaskMapper; + private final JobTaskBatchHandler jobTaskBatchHandler; + private final DistributedLockHandler distributedLockHandler; @Override public Receive createReceive() { return receiveBuilder().match(JobExecutorResultDTO.class, result -> { - log.debug("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); + SnailJobLog.LOCAL.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); try { JobTask jobTask = new JobTask(); jobTask.setTaskStatus(result.getTaskStatus()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index 7990cfc0d..2e87c4c66 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -57,9 +57,11 @@ public class ReduceActor extends AbstractActor { // 创建reduce任务 JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); + context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setMrStage(MapReduceStageEnum.REDUCE); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { + SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId()); return; } @@ -76,6 +78,7 @@ public class ReduceActor extends AbstractActor { context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId()); context.setWorkflowNodeId(reduceTask.getWorkflowNodeId()); + context.setMrStage(MapReduceStageEnum.REDUCE.name()); return context; } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java index 6a7db8681..5cd27e974 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java @@ -92,4 +92,6 @@ public class JobExecutorContext { private String mapName; + private String mrStage; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index de234c8c2..45b02b9a2 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -70,9 +70,10 @@ public class JobTaskBatchHandler { List jobTasks = jobTaskMapper.selectList( new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getResultMessage) + .select(JobTask::getTaskStatus, JobTask::getResultMessage, JobTask::getExtAttrs) .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); + SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks)); JobTaskBatch jobTaskBatch = new JobTaskBatch(); jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId()); @@ -106,15 +107,14 @@ public class JobTaskBatchHandler { JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(), JobTaskExtAttrsDTO.class); Integer taskType = jobTaskExtAttrsDTO.getTaskType(); - if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType) { - if (isAllMapTask(jobTasks)) { - // 开启reduce阶段 - try { - ActorRef actorRef = ActorGenerator.jobReduceActor(); - actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); - } catch (Exception e) { - SnailJobLog.LOCAL.error("tell reduce actor error", e); - } + if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) { + // 开启reduce阶段 + try { + ActorRef actorRef = ActorGenerator.jobReduceActor(); + actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); + return false; + } catch (Exception e) { + SnailJobLog.LOCAL.error("tell reduce actor error", e); } } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index f3fcb9df0..295c892a0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -108,6 +108,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId()); context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId()); context.setMapName(mapTaskRequest.getMapName()); + context.setMrStage(MapReduceStageEnum.MAP.name()); return context; }