diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java index c3388491f..3abac8fed 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java @@ -13,4 +13,5 @@ public class ReduceTaskDTO { private Long workflowTaskBatchId; private Long taskBatchId; private Long jobId; + private Integer mrStage; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 2f12d4287..ba16a26f4 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -49,6 +49,10 @@ public class JobExecutorResultActor extends AbstractActor { return receiveBuilder().match(JobExecutorResultDTO.class, result -> { SnailJobLog.LOCAL.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); try { + Assert.notNull(result.getTaskId(), ()-> new SnailJobServerException("taskId can not be null")); + Assert.notNull(result.getJobId(), ()-> new SnailJobServerException("jobId can not be null")); + Assert.notNull(result.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null")); + JobTask jobTask = new JobTask(); jobTask.setTaskStatus(result.getTaskStatus()); if (Objects.nonNull(result.getResult())) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index 2d6acf73d..b6f2f2bf0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -2,11 +2,13 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; @@ -50,14 +52,17 @@ public class ReduceActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> { + SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask)); try { + + Assert.notNull(reduceTask.getMrStage(), ()-> new SnailJobServerException("mrStage can not be null")); + Assert.notNull(reduceTask.getJobId(), ()-> new SnailJobServerException("jobId can not be null")); + Assert.notNull(reduceTask.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null")); + String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()); distributedLockHandler.lockWithDisposableAndRetry(() -> { doReduce(reduceTask); - }, MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()), - Duration.ofSeconds(1), - Duration.ofSeconds(1), - 3); + }, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 3); } catch (Exception e) { SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e); } @@ -71,11 +76,11 @@ public class ReduceActor extends AbstractActor { new LambdaQueryWrapper() .select(JobTask::getId) .eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId()) - .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) + .eq(JobTask::getMrStage, reduceTask.getMrStage()) ); if (CollUtil.isNotEmpty(jobTasks)) { - // 说明已经创建了reduce任务了 + // 说明已经创建了reduce 或者 merge reduce 任务了 return; } @@ -89,7 +94,7 @@ public class ReduceActor extends AbstractActor { JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); context.setTaskBatchId(reduceTask.getTaskBatchId()); - context.setMrStage(MapReduceStageEnum.REDUCE.getStage()); + context.setMrStage(reduceTask.getMrStage()); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 65b614f38..9e96d7ae9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -28,6 +28,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.util.*; +import java.util.stream.Collectors; /** * 生成Map Reduce任务 @@ -72,10 +73,39 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { // REDUCE任务 return createReduceJobTasks(context, nodeInfoList, serverNodes); } + case MERGE_REDUCE -> { + // REDUCE任务 + return createMergeReduceJobTasks(context, nodeInfoList, serverNodes); + } default -> throw new SnailJobServerException("Map reduce stage is not existed"); } } + private List createMergeReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, Set serverNodes) { + + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() + .select(JobTask::getResultMessage) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + ); + + RegisterNodeInfo registerNodeInfo = nodeInfoList.get(0); + // 新增任务实例 + JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); + jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); + jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsStr(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage))); + jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); + jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage()); + jobTask.setTaskName("MERGE_REDUCE_TASK"); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), + () -> new SnailJobServerException("新增任务实例失败")); + + return Lists.newArrayList(jobTask); + } + private List createReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, Set serverNodes) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/DistributedLockHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/DistributedLockHandler.java index df079ce40..1c3e38dc5 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/DistributedLockHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/DistributedLockHandler.java @@ -77,7 +77,7 @@ public class DistributedLockHandler { } } - SnailJobLog.LOCAL.error("lock execute error. lockName:[{}]", lockName, throwable); + SnailJobLog.LOCAL.warn("lock execute error. lockName:[{}]", lockName, throwable); } finally { if (lock) { lockProvider.unlock(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 98430e06a..8c941e547 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -4,10 +4,7 @@ import akka.actor.ActorRef; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.context.SpringContext; -import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.enums.*; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.WaitStrategy; @@ -43,7 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP; +import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*; /** * @author: opensnail @@ -92,6 +89,7 @@ public class JobTaskBatchHandler { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); if (needReduceTask(completeJobBatchDTO, jobTasks)) { + // 此时中断批次完成,需要开启reduce任务 return false; } } @@ -124,16 +122,26 @@ public class JobTaskBatchHandler { * @return true-需要reduce false-不需要reduce */ private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List jobTasks) { - // 判断是否是mapreduce任务 + Integer mrStage = null; if (isAllMapTask(jobTasks)) { - // 开启reduce阶段 - try { - ActorRef actorRef = ActorGenerator.jobReduceActor(); - actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); - return true; - } catch (Exception e) { - SnailJobLog.LOCAL.error("tell reduce actor error", e); - } + // 若都是MAP任务则开启Reduce任务 + mrStage = REDUCE.getStage(); + } else if (isALeastOneReduceTask(jobTasks)) { + // 若存在2个以上的reduce任务则开启merge reduce任务 + mrStage = MERGE_REDUCE.getStage(); + } else { + return false; + } + + // 开启reduce or mergeReduce阶段 + try { + ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO); + reduceTaskDTO.setMrStage(mrStage); + ActorRef actorRef = ActorGenerator.jobReduceActor(); + actorRef.tell(reduceTaskDTO, actorRef); + return true; + } catch (Exception e) { + SnailJobLog.LOCAL.error("tell reduce actor error", e); } return false; @@ -145,6 +153,12 @@ public class JobTaskBatchHandler { .count(); } + private static boolean isALeastOneReduceTask(final List jobTasks) { + return jobTasks.stream() + .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage()) + .count() > 1; + } + /** * 开启常驻任务 *