From ca2ada38753c45854e3a8dc328ed8d7756a45cf8 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Tue, 25 Jun 2024 23:04:44 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0-beta2):=201.=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8DMAP=E4=BB=BB=E5=8A=A1=E5=A4=B1=E8=B4=A5=202.=20?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E9=85=8D=E7=BD=AE=E7=9A=84reduce=20=E5=88=86?= =?UTF-8?q?=E7=89=87=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/task/dto/CompleteJobBatchDTO.java | 1 + .../job/task/dto/MapReduceArgsStrDTO.java | 16 ++++++++++++ ...rategy.java => RecoveryBlockStrategy.java} | 24 +++++++++++++---- .../dispatch/JobExecutorResultActor.java | 4 +-- .../task/MapReduceTaskGenerator.java | 26 ++++++++++++------- .../support/handler/JobTaskBatchHandler.java | 6 ++--- 6 files changed, 56 insertions(+), 21 deletions(-) create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/MapReduceArgsStrDTO.java rename snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/{DiscardRetryBlockStrategy.java => RecoveryBlockStrategy.java} (72%) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java index 9eb50a1b8..004930cb3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java @@ -17,5 +17,6 @@ public class CompleteJobBatchDTO { private Long taskBatchId; private Integer jobOperationReason; private Object result; + private Integer taskType; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/MapReduceArgsStrDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/MapReduceArgsStrDTO.java new file mode 100644 index 000000000..8156c6fd9 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/MapReduceArgsStrDTO.java @@ -0,0 +1,16 @@ +package com.aizuda.snailjob.server.job.task.dto; + +import lombok.Data; + +/** + * @author opensnail + * @date 2024-06-25 22:58:05 + * @since sj_1.1.0 + */ +@Data +public class MapReduceArgsStrDTO { + + private Integer shardNum; + + private String argsStr; +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardRetryBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java similarity index 72% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardRetryBlockStrategy.java rename to snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java index 83b925e21..520573a3e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardRetryBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java @@ -1,10 +1,15 @@ package com.aizuda.snailjob.server.job.task.support.block.job; +import akka.actor.ActorRef; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; @@ -19,6 +24,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.List; +import java.util.stream.Stream; /** * 重新触发执行失败的任务 @@ -29,7 +35,7 @@ import java.util.List; */ @Component @RequiredArgsConstructor -public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy { +public class RecoveryBlockStrategy extends AbstracJobBlockStrategy { private final JobTaskMapper jobTaskMapper; private final JobMapper jobMapper; @Override @@ -42,18 +48,26 @@ public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy { new LambdaQueryWrapper() .select(JobTask::getId, JobTask::getTaskStatus) .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - .eq(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_SUCCESS) ); + // 若任务项为空则生成任务项 if (CollUtil.isEmpty(jobTasks)) { - SnailJobLog.LOCAL.warn("No executable job task. taskBatchId:[{}]", context.getTaskBatchId()); + TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); + taskExecuteDTO.setTaskBatchId(context.getTaskBatchId()); + taskExecuteDTO.setJobId(context.getJobId()); + taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType()); + taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + taskExecuteDTO.setWorkflowNodeId(context.getWorkflowNodeId()); + ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); return; } Job job = jobMapper.selectById(context.getJobId()); - // 执行任务 + // 执行任务 Stop or Fail 任务 JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(context.getTaskType()); - jobExecutor.execute(buildJobExecutorContext(context, job, jobTasks)); + jobExecutor.execute(buildJobExecutorContext(context, job, + StreamUtils.filter(jobTasks, (jobTask) -> JobTaskStatusEnum.NOT_SUCCESS.contains(jobTask.getTaskStatus())))); } @Override diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 3710f644b..3a3439c80 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -51,6 +51,7 @@ public class JobExecutorResultActor extends AbstractActor { Assert.notNull(result.getTaskId(), ()-> new SnailJobServerException("taskId can not be null")); Assert.notNull(result.getJobId(), ()-> new SnailJobServerException("jobId can not be null")); Assert.notNull(result.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null")); + Assert.notNull(result.getTaskType(), ()-> new SnailJobServerException("taskType can not be null")); JobTask jobTask = new JobTask(); jobTask.setTaskStatus(result.getTaskStatus()); @@ -63,9 +64,6 @@ public class JobExecutorResultActor extends AbstractActor { new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), () -> new SnailJobServerException("更新任务实例失败")); - // 更新工作流的全局上下文 如果并发更新失败则需要自旋重试更新 -// workflowBatchHandler.mergeWorkflowContextAndRetry(result.getWorkflowTaskBatchId(), result.getWfContext()); - // 除MAP和MAP_REDUCE 任务之外,其他任务都是叶子节点 if (Objects.nonNull(result.getIsLeaf()) && StatusEnum.NO.getStatus().equals(result.getIsLeaf())) { return; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 3c66b5e24..c59223749 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.job.task.dto.MapReduceArgsStrDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; @@ -43,6 +44,8 @@ import java.util.*; @RequiredArgsConstructor public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { + private static final String MERGE_REDUCE_TASK = "MERGE_REDUCE_TASK"; + private static final String REDUCE_TASK = "REDUCE_TASK"; private final JobTaskMapper jobTaskMapper; private final TransactionTemplate transactionTemplate; @@ -53,7 +56,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { @Override protected List doGenerate(final JobTaskGenerateContext context) { - // TODO 若没有客户端节点JobTask是否需要创建???? Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); if (CollUtil.isEmpty(serverNodes)) { @@ -64,9 +66,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { List nodeInfoList = new ArrayList<>(serverNodes); MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage()); Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed")); - - // todo 待优化 - switch (mapReduceStageEnum) { + switch (Objects.requireNonNull(mapReduceStageEnum)) { case MAP -> { // MAP任务 return createMapJobTasks(context, nodeInfoList, serverNodes); @@ -105,7 +105,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage()); - jobTask.setTaskName("MERGE_REDUCE_TASK"); + jobTask.setTaskName(MERGE_REDUCE_TASK); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); @@ -115,8 +115,15 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { private List createReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, Set serverNodes) { - // TODO reduce阶段的并行度 - int reduceParallel = 2; + int reduceParallel = 1; + String jobParams = null; + try { + MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(), MapReduceArgsStrDTO.class); + reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1); + jobParams = mapReduceArgsStrDTO.getArgsStr(); + } catch (Exception e) { + SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr()); + } List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() .select(JobTask::getResultMessage) @@ -132,6 +139,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTasks = new ArrayList<>(partition.size()); final List finalJobTasks = jobTasks; + String finalJobParams = jobParams; transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { @@ -142,13 +150,13 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); - jobArgsHolder.setJobParams(StrUtil.isBlank(context.getArgsStr()) ? null : context.getArgsStr()); + jobArgsHolder.setJobParams(finalJobParams); jobArgsHolder.setMaps(JsonUtil.toJsonString(partition.get(index))); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage()); - jobTask.setTaskName("REDUCE_TASK"); + jobTask.setTaskName(REDUCE_TASK); jobTask.setParentId(0L); jobTask.setRetryCount(0); jobTask.setLeaf(StatusEnum.YES.getStatus()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 80004964e..5998fea22 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -95,12 +95,10 @@ public class JobTaskBatchHandler { } else if (stopCount > 0) { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); } else { - // todo 调试完成删除 - SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), - JsonUtil.toJsonString(jobTasks)); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); - if (needReduceTask(completeJobBatchDTO, jobTasks)) { + if (needReduceTask(completeJobBatchDTO, jobTasks) + && JobTaskTypeEnum.MAP_REDUCE.getType() == completeJobBatchDTO.getTaskType()) { // 此时中断批次完成,需要开启reduce任务 return false; }