diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index 2e87c4c66..5871a5483 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -14,7 +14,9 @@ import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFacto import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; +import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import lombok.RequiredArgsConstructor; @@ -22,6 +24,8 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.text.MessageFormat; +import java.time.Duration; import java.util.List; /** @@ -35,14 +39,21 @@ import java.util.List; @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @RequiredArgsConstructor public class ReduceActor extends AbstractActor { + private static final String KEY = "job_generate_reduce_{0}_{1}"; + private final DistributedLockHandler distributedLockHandler; private final JobMapper jobMapper; + + @Override public Receive createReceive() { return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> { try { - doReduce(reduceTask); + distributedLockHandler.lockWithDisposableAndRetry(() -> { + doReduce(reduceTask); + }, MessageFormat.format(KEY, reduceTask.getTaskBatchId(), + reduceTask.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); } catch (Exception e) { SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 974419e34..891c501b8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -22,6 +22,9 @@ import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.Nullable; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; import java.util.ArrayList; import java.util.List; @@ -40,6 +43,7 @@ import java.util.Set; public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { private final JobTaskMapper jobTaskMapper; + private final TransactionTemplate transactionTemplate; @Override public JobTaskTypeEnum getTaskInstanceType() { @@ -61,30 +65,38 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { switch (context.getMrStage()) { case MAP -> { // MAP任务 - List newArrayList = createMapJobTasks(context, nodeInfoList, serverNodes); - if (newArrayList != null) { - return newArrayList; - } + return createMapJobTasks(context, nodeInfoList, serverNodes); } case REDUCE -> { - createReduceJobTasks(context, nodeInfoList, serverNodes); + // REDUCE任务 + return createReduceJobTasks(context, nodeInfoList, serverNodes); } default -> throw new SnailJobServerException("Map reduce stage is not existed"); } - - return Lists.newArrayList(); } - private void createReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, + private List createReduceJobTasks(JobTaskGenerateContext context, List nodeInfoList, Set serverNodes) { // TODO reduce阶段的并行度 int reduceParallel = 10; - List allMapJobTasks = StreamUtils.toList(jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - ), JobTask::getResultMessage); + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() + .select(JobTask::getResultMessage, JobTask::getExtAttrs) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); + + // 若存在已经生成的reduce任务不需要重新生成 + boolean existedReduce = jobTasks.stream() + .filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs())) + .map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class)) + .anyMatch(jobTaskExtAttrsDTO -> MapReduceStageEnum.REDUCE.name().equals(jobTaskExtAttrsDTO.getMrStage())); + if (existedReduce) { + SnailJobLog.LOCAL.warn("The reduce task already exists. taskBatchId:[{}]", context.getTaskBatchId()); + return Lists.newArrayList(); + } + + // 这里需要判断是否是map + List allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage); List> partition = Lists.partition(allMapJobTasks, reduceParallel); @@ -93,21 +105,30 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType()); jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name()); - List jobTasks = new ArrayList<>(partition.size()); - for (int index = 0; index < partition.size(); index++) { - RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size()); - // 新增任务实例 - JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); - jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); - jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index))); - jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); - jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString()); - Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); - jobTasks.add(jobTask); - } + jobTasks = new ArrayList<>(partition.size()); + + final List finalJobTasks = jobTasks; + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + for (int index = 0; index < partition.size(); index++) { + RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size()); + // 新增任务实例 + JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); + jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); + jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index))); + jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); + jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString()); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), + () -> new SnailJobServerException("新增任务实例失败")); + finalJobTasks.add(jobTask); + } + } + }); + + return finalJobTasks; } private @Nullable List createMapJobTasks(final JobTaskGenerateContext context, @@ -122,22 +143,29 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTaskExtAttrsDTO.setMapName(context.getMapName()); jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType()); jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name()); - List jobTasks = new ArrayList<>(mapSubTask.size()); - for (int index = 0; index < mapSubTask.size(); index++) { - RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size()); - // 新增任务实例 - JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); - jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); - jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index))); - jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); - jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString()); - Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); - jobTasks.add(jobTask); - } + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + + for (int index = 0; index < mapSubTask.size(); index++) { + RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size()); + // 新增任务实例 + JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); + jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); + jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index))); + jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); + jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString()); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), + () -> new SnailJobServerException("新增任务实例失败")); + jobTasks.add(jobTask); + } + } + }); + return jobTasks; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 45b02b9a2..4f71c5fa4 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -70,7 +70,7 @@ public class JobTaskBatchHandler { List jobTasks = jobTaskMapper.selectList( new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getResultMessage, JobTask::getExtAttrs) + .select(JobTask::getTaskStatus, JobTask::getExtAttrs) .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks)); @@ -99,24 +99,8 @@ public class JobTaskBatchHandler { } else { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); - // 判断是否是mapreduce任务 - // todo 此处待优化 - JobTask firstJobTask = jobTasks.get(0); - String extAttrs = firstJobTask.getExtAttrs(); - if (StrUtil.isNotBlank(extAttrs)) { - JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(), - JobTaskExtAttrsDTO.class); - Integer taskType = jobTaskExtAttrsDTO.getTaskType(); - if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) { - // 开启reduce阶段 - try { - ActorRef actorRef = ActorGenerator.jobReduceActor(); - actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); - return false; - } catch (Exception e) { - SnailJobLog.LOCAL.error("tell reduce actor error", e); - } - } + if (needReduceTask(completeJobBatchDTO, jobTasks)) { + return false; } } @@ -140,6 +124,36 @@ public class JobTaskBatchHandler { } + /** + * 若需要执行reduce则返回false 不需要更新批次状态, 否则需要更新批次状态 + * + * @param completeJobBatchDTO 需要执行批次完成所需的参数信息 + * @param jobTasks 任务项列表 + * @return true-需要reduce false-不需要reduce + */ + private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List jobTasks) { + // 判断是否是mapreduce任务 + // todo 此处待优化 + JobTask firstJobTask = jobTasks.get(0); + String extAttrs = firstJobTask.getExtAttrs(); + if (StrUtil.isNotBlank(extAttrs)) { + JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class); + Integer taskType = jobTaskExtAttrsDTO.getTaskType(); + if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) { + // 开启reduce阶段 + try { + ActorRef actorRef = ActorGenerator.jobReduceActor(); + actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); + return true; + } catch (Exception e) { + SnailJobLog.LOCAL.error("tell reduce actor error", e); + } + } + } + + return false; + } + private static boolean isAllMapTask(final List jobTasks) { return jobTasks.size() == jobTasks.stream() .filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))