From acaa2e966822ec55e24e99e0cc38db5a8993eb3b Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Wed, 4 Sep 2024 19:27:16 +0800 Subject: [PATCH] =?UTF-8?q?fix:(1.2.0-beta1):=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/task/dto/CompleteJobBatchDTO.java | 1 + .../job/task/dto/JobExecutorResultDTO.java | 2 + .../support/JobExecutorResultHandler.java | 17 ++ .../job/task/support/JobTaskConverter.java | 7 + .../dispatch/JobExecutorResultActor.java | 21 +-- .../support/handler/JobTaskBatchHandler.java | 166 +++--------------- .../prepare/job/RunningJobPrepareHandler.java | 2 +- .../job/AbstractJobExecutorResultHandler.java | 125 +++++++++++++ .../job/BroadcastJobExecutorHandler.java | 45 +++++ .../result/job/ClusterJobExecutorHandler.java | 52 ++++++ .../result/job/JobExecutorResultContext.java | 35 ++++ .../result/job/MapJobExecutorHandler.java | 45 +++++ .../job/MapReduceJobExecutorHandler.java | 126 +++++++++++++ .../job/ShardingJobExecutorHandler.java | 45 +++++ 14 files changed, 524 insertions(+), 165 deletions(-) create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobExecutorResultHandler.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/BroadcastJobExecutorHandler.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ClusterJobExecutorHandler.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/JobExecutorResultContext.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapJobExecutorHandler.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapReduceJobExecutorHandler.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ShardingJobExecutorHandler.java diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java index 004930cb..b85002dc 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java @@ -18,5 +18,6 @@ public class CompleteJobBatchDTO { private Integer jobOperationReason; private Object result; private Integer taskType; + private boolean isRetry; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java index 41b8b803..6667e543 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java @@ -44,4 +44,6 @@ public class JobExecutorResultDTO { private String wfContext; + private boolean isRetry; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobExecutorResultHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobExecutorResultHandler.java new file mode 100644 index 00000000..15ea9e1a --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobExecutorResultHandler.java @@ -0,0 +1,17 @@ +package com.aizuda.snailjob.server.job.task.support; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO; +import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +public interface JobExecutorResultHandler { + + JobTaskTypeEnum getTaskInstanceType(); + + void handleResult(JobExecutorResultContext context); +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java index f9805265..da5c559f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java @@ -13,6 +13,7 @@ import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorConte import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; +import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext; import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.snailjob.server.model.dto.LogTaskDTO; import com.aizuda.snailjob.template.datasource.persistence.po.Job; @@ -70,6 +71,8 @@ public interface JobTaskConverter { TaskStopJobContext toStopJobContext(BlockStrategyContext context); + TaskStopJobContext toStopJobContext(JobExecutorResultContext context); + TaskStopJobContext toStopJobContext(JobExecutorResultDTO context); @Mappings( @@ -140,4 +143,8 @@ public interface JobTaskConverter { JobLogMessage toJobLogMessage(JobLogMessage jobLogMessage); ReduceTaskDTO toReduceTaskDTO(CompleteJobBatchDTO jobBatchDTO); + + ReduceTaskDTO toReduceTaskDTO(JobExecutorResultContext context); + + JobExecutorResultContext toJobExecutorResultContext(CompleteJobBatchDTO completeJobBatchDTO); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 2f228ffe..610d4e58 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; -import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -11,12 +10,8 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; -import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; -import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; -import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory; -import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; @@ -62,7 +57,6 @@ public class JobExecutorResultActor extends AbstractActor { } else { jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); } - } Assert.isTrue(1 == jobTaskMapper.update(jobTask, @@ -95,19 +89,6 @@ public class JobExecutorResultActor extends AbstractActor { private boolean tryCompleteAndStop(JobExecutorResultDTO result) { CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result); - boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO); - if (complete) { - // 尝试停止任务 - // 若是集群任务则客户端会主动关闭 - if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) { - JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType()); - TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result); - stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE); - stopJobContext.setForceStop(Boolean.TRUE); - instanceInterrupt.stop(stopJobContext); - } - } - - return complete; + return jobTaskBatchHandler.handleResult(completeJobBatchDTO); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index f42b9336..3e1d06f0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -1,45 +1,34 @@ package com.aizuda.snailjob.server.job.task.support.handler; -import akka.actor.ActorRef; -import cn.hutool.core.collection.CollUtil; -import com.aizuda.snailjob.common.core.context.SnailSpringContext; -import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.WaitStrategy; -import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.dto.DistributeInstance; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.*; +import com.aizuda.snailjob.server.job.task.support.JobExecutorResultHandler; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; -import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; +import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.Job; -import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.time.Duration; -import java.time.LocalDateTime; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*; @@ -53,148 +42,37 @@ import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*; @Slf4j public class JobTaskBatchHandler { - private final JobTaskMapper jobTaskMapper; private final JobTaskBatchMapper jobTaskBatchMapper; - private final WorkflowBatchHandler workflowBatchHandler; private final GroupConfigMapper groupConfigMapper; + private final List resultHandlerList; @Transactional - public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) { + public boolean handleResult(CompleteJobBatchDTO completeJobBatchDTO) { + Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null")); - // 幂等处理 - Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() + // 非重试流量幂等处理 + if(!completeJobBatchDTO.isRetry()) { + // 幂等处理 + Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) .in(JobTaskBatch::getTaskBatchStatus, COMPLETED) - ); - if (countJobTaskBatch > 0) { - // 批次已经完成了,不需要重复更新 - return true; - } - - List jobTasks = jobTaskMapper.selectList( - new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getMrStage) - .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); - - if (CollUtil.isEmpty(jobTasks) || - jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { - return false; - } - - JobTaskBatch jobTaskBatch = new JobTaskBatch(); - jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId()); - - Map statusCountMap = jobTasks.stream() - .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); - - long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); - long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); - - if (failCount > 0) { - jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus()); - SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(completeJobBatchDTO.getTaskBatchId())); - } else if (stopCount > 0) { - jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); - } else { - - jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); - if (needReduceTask(completeJobBatchDTO, jobTasks) - && JobTaskTypeEnum.MAP_REDUCE.getType() == completeJobBatchDTO.getTaskType()) { - // 此时中断批次完成,需要开启reduce任务 - return false; + ); + if (countJobTaskBatch > 0) { + // 批次已经完成了,不需要重复更新 + return true; } } - if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) { - jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason()); - } - - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId()); - taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); - taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId()); - taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId()); - workflowBatchHandler.openNextNode(taskExecuteDTO); - - jobTaskBatch.setUpdateDt(LocalDateTime.now()); - return 1 == jobTaskBatchMapper.update(jobTaskBatch, - new LambdaUpdateWrapper() - .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) - ); - - } - - /** - * 若需要执行reduce则返回false 不需要更新批次状态, 否则需要更新批次状态 - * - * @param completeJobBatchDTO 需要执行批次完成所需的参数信息 - * @param jobTasks 任务项列表 - * @return true-需要reduce false-不需要reduce - */ - private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List jobTasks) { - Integer mrStage = null; - - int reduceCount = 0; - int mapCount = 0; - for (final JobTask jobTask : jobTasks) { - if (Objects.isNull(jobTask.getMrStage())) { - continue; - } - - // 存在MERGE_REDUCE任务了不需要生成 - if (MERGE_REDUCE.getStage() == jobTask.getMrStage()) { - return false; - } - - // REDUCE任务累加 - if (REDUCE.getStage() == jobTask.getMrStage()) { - reduceCount++; - continue; - } - - // MAP任务累加 - if (MAP.getStage() == jobTask.getMrStage()) { - mapCount++; + JobExecutorResultContext context = JobTaskConverter.INSTANCE.toJobExecutorResultContext(completeJobBatchDTO); + for (final JobExecutorResultHandler jobExecutorResultHandler : resultHandlerList) { + if (completeJobBatchDTO.getTaskType().equals(jobExecutorResultHandler.getTaskInstanceType().getType())) { + jobExecutorResultHandler.handleResult(context); + break; } } - // 若存在2个以上的reduce任务则开启merge reduce任务 - if (reduceCount > 1) { - mrStage = MERGE_REDUCE.getStage(); - } else if (mapCount == jobTasks.size()) { - // 若都是MAP任务则开启Reduce任务 - mrStage = REDUCE.getStage(); - } else { - // 若既不是MAP也是不REDUCE则是其他类型的任务,直接返回即可 - return false; - } - - // 开启reduce or mergeReduce阶段 - try { - ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO); - reduceTaskDTO.setMrStage(mrStage); - ActorRef actorRef = ActorGenerator.jobReduceActor(); - actorRef.tell(reduceTaskDTO, actorRef); - return true; - } catch (Exception e) { - SnailJobLog.LOCAL.error("tell reduce actor error", e); - } - - return false; - } - - private static boolean isAllMapTask(final List jobTasks) { - return jobTasks.size() == jobTasks.stream() - .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage()) - .count(); - } - - private static boolean isALeastOneReduceTask(final List jobTasks) { - return jobTasks.stream() - .filter( - jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage()) - .count() > 1; + // 处理的结果 若已经更新成功 或者 需要开启reduce任务都算是已经处理了 + return context.isTaskBatchComplete() || context.isCreateReduceTask(); } /** diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java index 7da8d69a..97dcec2d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java @@ -49,7 +49,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrepareHandler { JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE; CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.completeJobBatchDTO(prepare); completeJobBatchDTO.setJobOperationReason(jobOperationReasonEnum.getReason()); - if (jobTaskBatchHandler.complete(completeJobBatchDTO)) { + if (jobTaskBatchHandler.handleResult(completeJobBatchDTO)) { blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); } else { // 计算超时时间 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java new file mode 100644 index 00000000..6f4249db --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/AbstractJobExecutorResultHandler.java @@ -0,0 +1,125 @@ +package com.aizuda.snailjob.server.job.task.support.result.job; + +import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; +import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.snailjob.server.job.task.support.JobExecutorResultHandler; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; +import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import lombok.RequiredArgsConstructor; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +@RequiredArgsConstructor +public abstract class AbstractJobExecutorResultHandler implements JobExecutorResultHandler { + + private final JobTaskMapper jobTaskMapper; + private final JobTaskBatchMapper jobTaskBatchMapper; + private final WorkflowBatchHandler workflowBatchHandler; + private final GroupConfigMapper groupConfigMapper; + + @Override + public void handleResult(final JobExecutorResultContext context) { + + List jobTasks = jobTaskMapper.selectList( + new LambdaQueryWrapper() + .select(JobTask::getTaskStatus, JobTask::getMrStage) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); + + if (CollUtil.isEmpty(jobTasks) || + jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { + return; + } + + Map statusCountMap = jobTasks.stream() + .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); + + long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); + long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); + + int taskBatchStatus; + if (failCount > 0) { + taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); + SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(context.getTaskBatchId())); + doHandleFail(context); + } else if (stopCount > 0) { + taskBatchStatus = JobTaskBatchStatusEnum.STOP.getStatus(); + doHandleStop(context); + } else { + taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus(); + doHandleSuccess(context); + } + + // 开启下一个工作流节点 + openNextWorkflowNode(context); + + boolean res = updateStatus(context, taskBatchStatus); + context.setTaskBatchComplete(res); + if (res) { + // 停止客户端的任务 + stop(context); + } + } + + protected void openNextWorkflowNode(final JobExecutorResultContext context) { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); + taskExecuteDTO.setParentId(context.getWorkflowNodeId()); + taskExecuteDTO.setTaskBatchId(context.getTaskBatchId()); + workflowBatchHandler.openNextNode(taskExecuteDTO); + } + + protected boolean updateStatus(final JobExecutorResultContext context, final Integer taskBatchStatus) { + JobTaskBatch jobTaskBatch = new JobTaskBatch(); + jobTaskBatch.setId(context.getTaskBatchId()); + jobTaskBatch.setTaskBatchStatus(taskBatchStatus); + jobTaskBatch.setUpdateDt(LocalDateTime.now()); + if (Objects.nonNull(context.getJobOperationReason())) { + jobTaskBatch.setOperationReason(context.getJobOperationReason()); + } + return 1 == jobTaskBatchMapper.update(jobTaskBatch, + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, context.getTaskBatchId()) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) + ); + } + + protected void stop(JobExecutorResultContext context) { + JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(getTaskInstanceType().getType()); + TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context); + stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE); + stopJobContext.setForceStop(Boolean.TRUE); + instanceInterrupt.stop(stopJobContext); + } + + protected abstract void doHandleSuccess(final JobExecutorResultContext context); + + protected abstract void doHandleStop(final JobExecutorResultContext context); + + protected abstract void doHandleFail(final JobExecutorResultContext context); + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/BroadcastJobExecutorHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/BroadcastJobExecutorHandler.java new file mode 100644 index 00000000..5553d2e3 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/BroadcastJobExecutorHandler.java @@ -0,0 +1,45 @@ +package com.aizuda.snailjob.server.job.task.support.result.job; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +@Component +public class BroadcastJobExecutorHandler extends AbstractJobExecutorResultHandler { + + public BroadcastJobExecutorHandler( + final JobTaskMapper jobTaskMapper, + final JobTaskBatchMapper jobTaskBatchMapper, + final WorkflowBatchHandler workflowBatchHandler, + final GroupConfigMapper groupConfigMapper) { + super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper); + } + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.BROADCAST; + } + + @Override + protected void doHandleSuccess(final JobExecutorResultContext context) { + } + + @Override + protected void doHandleStop(final JobExecutorResultContext context) { + + } + + @Override + protected void doHandleFail(final JobExecutorResultContext context) { + + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ClusterJobExecutorHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ClusterJobExecutorHandler.java new file mode 100644 index 00000000..d748c39a --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ClusterJobExecutorHandler.java @@ -0,0 +1,52 @@ +package com.aizuda.snailjob.server.job.task.support.result.job; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import org.springframework.stereotype.Component; + +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP; +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MERGE_REDUCE; +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.REDUCE; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +@Component +public class ClusterJobExecutorHandler extends AbstractJobExecutorResultHandler { + + public ClusterJobExecutorHandler( + final JobTaskMapper jobTaskMapper, + final JobTaskBatchMapper jobTaskBatchMapper, + final WorkflowBatchHandler workflowBatchHandler, + final GroupConfigMapper groupConfigMapper) { + super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper); + } + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.CLUSTER; + } + + @Override + protected void doHandleSuccess(final JobExecutorResultContext context) { + } + + @Override + protected void doHandleStop(final JobExecutorResultContext context) { + + } + + @Override + protected void doHandleFail(final JobExecutorResultContext context) { + + } + + @Override + protected void stop(final JobExecutorResultContext context) { + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/JobExecutorResultContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/JobExecutorResultContext.java new file mode 100644 index 00000000..020cb2c8 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/JobExecutorResultContext.java @@ -0,0 +1,35 @@ +package com.aizuda.snailjob.server.job.task.support.result.job; + +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import lombok.Data; + +import java.util.List; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +@Data +public class JobExecutorResultContext { + + private Long jobId; + private Long workflowNodeId; + private Long workflowTaskBatchId; + private Long taskBatchId; + private Integer jobOperationReason; + private Integer taskType; + private boolean isRetry; + private List jobTaskList; + + /** + * 是否开启创建Reduce任务 + */ + private boolean createReduceTask; + + /** + * 是否更新批次完成 + */ + private boolean taskBatchComplete; + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapJobExecutorHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapJobExecutorHandler.java new file mode 100644 index 00000000..ce838664 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapJobExecutorHandler.java @@ -0,0 +1,45 @@ +package com.aizuda.snailjob.server.job.task.support.result.job; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +@Component +public class MapJobExecutorHandler extends AbstractJobExecutorResultHandler { + + public MapJobExecutorHandler( + final JobTaskMapper jobTaskMapper, + final JobTaskBatchMapper jobTaskBatchMapper, + final WorkflowBatchHandler workflowBatchHandler, + final GroupConfigMapper groupConfigMapper) { + super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper); + } + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.MAP; + } + + @Override + protected void doHandleSuccess(final JobExecutorResultContext context) { + } + + @Override + protected void doHandleStop(final JobExecutorResultContext context) { + + } + + @Override + protected void doHandleFail(final JobExecutorResultContext context) { + + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapReduceJobExecutorHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapReduceJobExecutorHandler.java new file mode 100644 index 00000000..30287b70 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/MapReduceJobExecutorHandler.java @@ -0,0 +1,126 @@ +package com.aizuda.snailjob.server.job.task.support.result.job; + +import akka.actor.ActorRef; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP; +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MERGE_REDUCE; +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.REDUCE; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +@Component +public class MapReduceJobExecutorHandler extends AbstractJobExecutorResultHandler { + + public MapReduceJobExecutorHandler( + final JobTaskMapper jobTaskMapper, + final JobTaskBatchMapper jobTaskBatchMapper, + final WorkflowBatchHandler workflowBatchHandler, + final GroupConfigMapper groupConfigMapper) { + super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper); + } + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.MAP_REDUCE; + } + + @Override + protected void doHandleSuccess(final JobExecutorResultContext context) { + // 判断是否需要创建Reduce任务 + context.setCreateReduceTask(needReduceTask(context)); + } + + @Override + protected void doHandleStop(final JobExecutorResultContext context) { + + } + + @Override + protected void doHandleFail(final JobExecutorResultContext context) { + + } + + @Override + protected boolean updateStatus(final JobExecutorResultContext context, final Integer status) { + if (context.isCreateReduceTask()) { + // 此时中断批次完成,需要开启reduce任务 + return false; + } + return super.updateStatus(context, status); + } + + /** + * 若需要执行reduce则返回false 不需要更新批次状态, 否则需要更新批次状态 + * + * @param context 需要执行批次完成所需的参数信息 + * @return true-需要reduce false-不需要reduce + */ + private boolean needReduceTask(final JobExecutorResultContext context) { + int mrStage; + + int reduceCount = 0; + int mapCount = 0; + for (final JobTask jobTask : context.getJobTaskList()) { + if (Objects.isNull(jobTask.getMrStage())) { + continue; + } + + // 存在MERGE_REDUCE任务了不需要生成 + if (MERGE_REDUCE.getStage() == jobTask.getMrStage()) { + return false; + } + + // REDUCE任务累加 + if (REDUCE.getStage() == jobTask.getMrStage()) { + reduceCount++; + continue; + } + + // MAP任务累加 + if (MAP.getStage() == jobTask.getMrStage()) { + mapCount++; + } + } + + // 若存在2个以上的reduce任务则开启merge reduce任务 + if (reduceCount > 1) { + mrStage = MERGE_REDUCE.getStage(); + } else if (mapCount == context.getJobTaskList().size()) { + // 若都是MAP任务则开启Reduce任务 + mrStage = REDUCE.getStage(); + } else { + // 若既不是MAP也是不REDUCE则是其他类型的任务,直接返回即可 + return false; + } + + // 开启reduce or mergeReduce阶段 + try { + ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(context); + reduceTaskDTO.setMrStage(mrStage); + ActorRef actorRef = ActorGenerator.jobReduceActor(); + actorRef.tell(reduceTaskDTO, actorRef); + return true; + } catch (Exception e) { + SnailJobLog.LOCAL.error("tell reduce actor error", e); + } + + return false; + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ShardingJobExecutorHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ShardingJobExecutorHandler.java new file mode 100644 index 00000000..cac2ff42 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/result/job/ShardingJobExecutorHandler.java @@ -0,0 +1,45 @@ +package com.aizuda.snailjob.server.job.task.support.result.job; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-09-04 + * @since :1.2.0 + */ +@Component +public class ShardingJobExecutorHandler extends AbstractJobExecutorResultHandler { + + public ShardingJobExecutorHandler( + final JobTaskMapper jobTaskMapper, + final JobTaskBatchMapper jobTaskBatchMapper, + final WorkflowBatchHandler workflowBatchHandler, + final GroupConfigMapper groupConfigMapper) { + super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper); + } + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.SHARDING; + } + + @Override + protected void doHandleSuccess(final JobExecutorResultContext context) { + } + + @Override + protected void doHandleStop(final JobExecutorResultContext context) { + + } + + @Override + protected void doHandleFail(final JobExecutorResultContext context) { + + } + +}