feat(sj_1.1.0): 完成merge reduce 任务

This commit is contained in:
opensnail 2024-06-16 10:54:07 +08:00
parent 7c84e9645b
commit 6fa8f49daa
6 changed files with 76 additions and 22 deletions

View File

@ -13,4 +13,5 @@ public class ReduceTaskDTO {
private Long workflowTaskBatchId;
private Long taskBatchId;
private Long jobId;
private Integer mrStage;
}

View File

@ -49,6 +49,10 @@ public class JobExecutorResultActor extends AbstractActor {
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
SnailJobLog.LOCAL.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
try {
Assert.notNull(result.getTaskId(), ()-> new SnailJobServerException("taskId can not be null"));
Assert.notNull(result.getJobId(), ()-> new SnailJobServerException("jobId can not be null"));
Assert.notNull(result.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null"));
JobTask jobTask = new JobTask();
jobTask.setTaskStatus(result.getTaskStatus());
if (Objects.nonNull(result.getResult())) {

View File

@ -2,11 +2,13 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
@ -50,14 +52,17 @@ public class ReduceActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask));
try {
Assert.notNull(reduceTask.getMrStage(), ()-> new SnailJobServerException("mrStage can not be null"));
Assert.notNull(reduceTask.getJobId(), ()-> new SnailJobServerException("jobId can not be null"));
Assert.notNull(reduceTask.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null"));
String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId());
distributedLockHandler.lockWithDisposableAndRetry(() -> {
doReduce(reduceTask);
}, MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()),
Duration.ofSeconds(1),
Duration.ofSeconds(1),
3);
}, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 3);
} catch (Exception e) {
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
}
@ -71,11 +76,11 @@ public class ReduceActor extends AbstractActor {
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId)
.eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage())
.eq(JobTask::getMrStage, reduceTask.getMrStage())
);
if (CollUtil.isNotEmpty(jobTasks)) {
// 说明已经创建了reduce任务了
// 说明已经创建了reduce 或者 merge reduce 任务了
return;
}
@ -89,7 +94,7 @@ public class ReduceActor extends AbstractActor {
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setMrStage(MapReduceStageEnum.REDUCE.getStage());
context.setMrStage(reduceTask.getMrStage());
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());

View File

@ -28,6 +28,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.*;
import java.util.stream.Collectors;
/**
* 生成Map Reduce任务
@ -72,10 +73,39 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
// REDUCE任务
return createReduceJobTasks(context, nodeInfoList, serverNodes);
}
case MERGE_REDUCE -> {
// REDUCE任务
return createMergeReduceJobTasks(context, nodeInfoList, serverNodes);
}
default -> throw new SnailJobServerException("Map reduce stage is not existed");
}
}
private List<JobTask> createMergeReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList, Set<RegisterNodeInfo> serverNodes) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(0);
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage());
jobTask.setTaskName("MERGE_REDUCE_TASK");
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
return Lists.newArrayList(jobTask);
}
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
Set<RegisterNodeInfo> serverNodes) {

View File

@ -77,7 +77,7 @@ public class DistributedLockHandler {
}
}
SnailJobLog.LOCAL.error("lock execute error. lockName:[{}]", lockName, throwable);
SnailJobLog.LOCAL.warn("lock execute error. lockName:[{}]", lockName, throwable);
} finally {
if (lock) {
lockProvider.unlock();

View File

@ -4,10 +4,7 @@ import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
@ -43,7 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*;
/**
* @author: opensnail
@ -92,6 +89,7 @@ public class JobTaskBatchHandler {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
// 此时中断批次完成需要开启reduce任务
return false;
}
}
@ -124,16 +122,26 @@ public class JobTaskBatchHandler {
* @return true-需要reduce false-不需要reduce
*/
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
// 判断是否是mapreduce任务
Integer mrStage = null;
if (isAllMapTask(jobTasks)) {
// 开启reduce阶段
try {
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
return true;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
// 若都是MAP任务则开启Reduce任务
mrStage = REDUCE.getStage();
} else if (isALeastOneReduceTask(jobTasks)) {
// 若存在2个以上的reduce任务则开启merge reduce任务
mrStage = MERGE_REDUCE.getStage();
} else {
return false;
}
// 开启reduce or mergeReduce阶段
try {
ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO);
reduceTaskDTO.setMrStage(mrStage);
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(reduceTaskDTO, actorRef);
return true;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
return false;
@ -145,6 +153,12 @@ public class JobTaskBatchHandler {
.count();
}
private static boolean isALeastOneReduceTask(final List<JobTask> jobTasks) {
return jobTasks.stream()
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
.count() > 1;
}
/**
* 开启常驻任务
*