fix(sj_1.1.0): 优化reduce任务生成

This commit is contained in:
opensnail 2024-06-14 23:21:05 +08:00
parent d9e4d5cde4
commit 4c8b1415bb
3 changed files with 115 additions and 62 deletions

View File

@ -14,7 +14,9 @@ import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFacto
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import lombok.RequiredArgsConstructor;
@ -22,6 +24,8 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.List;
/**
@ -35,14 +39,21 @@ import java.util.List;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor
public class ReduceActor extends AbstractActor {
private static final String KEY = "job_generate_reduce_{0}_{1}";
private final DistributedLockHandler distributedLockHandler;
private final JobMapper jobMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
try {
doReduce(reduceTask);
distributedLockHandler.lockWithDisposableAndRetry(() -> {
doReduce(reduceTask);
}, MessageFormat.format(KEY, reduceTask.getTaskBatchId(),
reduceTask.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
} catch (Exception e) {
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
}

View File

@ -22,6 +22,9 @@ import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.Nullable;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.List;
@ -40,6 +43,7 @@ import java.util.Set;
public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private final JobTaskMapper jobTaskMapper;
private final TransactionTemplate transactionTemplate;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
@ -61,30 +65,38 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
switch (context.getMrStage()) {
case MAP -> {
// MAP任务
List<JobTask> newArrayList = createMapJobTasks(context, nodeInfoList, serverNodes);
if (newArrayList != null) {
return newArrayList;
}
return createMapJobTasks(context, nodeInfoList, serverNodes);
}
case REDUCE -> {
createReduceJobTasks(context, nodeInfoList, serverNodes);
// REDUCE任务
return createReduceJobTasks(context, nodeInfoList, serverNodes);
}
default -> throw new SnailJobServerException("Map reduce stage is not existed");
}
return Lists.newArrayList();
}
private void createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
Set<RegisterNodeInfo> serverNodes) {
// TODO reduce阶段的并行度
int reduceParallel = 10;
List<String> allMapJobTasks = StreamUtils.toList(jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
), JobTask::getResultMessage);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getExtAttrs)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
// 若存在已经生成的reduce任务不需要重新生成
boolean existedReduce = jobTasks.stream()
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
.map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class))
.anyMatch(jobTaskExtAttrsDTO -> MapReduceStageEnum.REDUCE.name().equals(jobTaskExtAttrsDTO.getMrStage()));
if (existedReduce) {
SnailJobLog.LOCAL.warn("The reduce task already exists. taskBatchId:[{}]", context.getTaskBatchId());
return Lists.newArrayList();
}
// 这里需要判断是否是map
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
@ -93,21 +105,30 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name());
List<JobTask> jobTasks = new ArrayList<>(partition.size());
for (int index = 0; index < partition.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
jobTasks = new ArrayList<>(partition.size());
final List<JobTask> finalJobTasks = jobTasks;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
for (int index = 0; index < partition.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
finalJobTasks.add(jobTask);
}
}
});
return finalJobTasks;
}
private @Nullable List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
@ -122,22 +143,29 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTaskExtAttrsDTO.setMapName(context.getMapName());
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name());
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
for (int index = 0; index < mapSubTask.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
for (int index = 0; index < mapSubTask.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
}
});
return jobTasks;
}

View File

@ -70,7 +70,7 @@ public class JobTaskBatchHandler {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getResultMessage, JobTask::getExtAttrs)
.select(JobTask::getTaskStatus, JobTask::getExtAttrs)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
@ -99,24 +99,8 @@ public class JobTaskBatchHandler {
} else {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
// 判断是否是mapreduce任务
// todo 此处待优化
JobTask firstJobTask = jobTasks.get(0);
String extAttrs = firstJobTask.getExtAttrs();
if (StrUtil.isNotBlank(extAttrs)) {
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(),
JobTaskExtAttrsDTO.class);
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
// 开启reduce阶段
try {
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
return false;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
}
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
return false;
}
}
@ -140,6 +124,36 @@ public class JobTaskBatchHandler {
}
/**
* 若需要执行reduce则返回false 不需要更新批次状态 否则需要更新批次状态
*
* @param completeJobBatchDTO 需要执行批次完成所需的参数信息
* @param jobTasks 任务项列表
* @return true-需要reduce false-不需要reduce
*/
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
// 判断是否是mapreduce任务
// todo 此处待优化
JobTask firstJobTask = jobTasks.get(0);
String extAttrs = firstJobTask.getExtAttrs();
if (StrUtil.isNotBlank(extAttrs)) {
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
// 开启reduce阶段
try {
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
return true;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
}
}
return false;
}
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
return jobTasks.size() == jobTasks.stream()
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))