fix(sj_1.1.0): 优化reduce任务生成
This commit is contained in:
parent
a64ac93b06
commit
14497be6b9
@ -14,7 +14,9 @@ import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFacto
|
|||||||
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
|
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
|
||||||
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
|
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
|
||||||
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
|
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
|
||||||
|
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
@ -22,6 +24,8 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.text.MessageFormat;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -35,14 +39,21 @@ import java.util.List;
|
|||||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class ReduceActor extends AbstractActor {
|
public class ReduceActor extends AbstractActor {
|
||||||
|
private static final String KEY = "job_generate_reduce_{0}_{1}";
|
||||||
|
private final DistributedLockHandler distributedLockHandler;
|
||||||
private final JobMapper jobMapper;
|
private final JobMapper jobMapper;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
|
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
distributedLockHandler.lockWithDisposableAndRetry(() -> {
|
||||||
doReduce(reduceTask);
|
doReduce(reduceTask);
|
||||||
|
}, MessageFormat.format(KEY, reduceTask.getTaskBatchId(),
|
||||||
|
reduceTask.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
|
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,9 @@ import com.google.common.collect.Lists;
|
|||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.TransactionStatus;
|
||||||
|
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -40,6 +43,7 @@ import java.util.Set;
|
|||||||
public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
||||||
|
|
||||||
private final JobTaskMapper jobTaskMapper;
|
private final JobTaskMapper jobTaskMapper;
|
||||||
|
private final TransactionTemplate transactionTemplate;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JobTaskTypeEnum getTaskInstanceType() {
|
public JobTaskTypeEnum getTaskInstanceType() {
|
||||||
@ -61,30 +65,38 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
switch (context.getMrStage()) {
|
switch (context.getMrStage()) {
|
||||||
case MAP -> {
|
case MAP -> {
|
||||||
// MAP任务
|
// MAP任务
|
||||||
List<JobTask> newArrayList = createMapJobTasks(context, nodeInfoList, serverNodes);
|
return createMapJobTasks(context, nodeInfoList, serverNodes);
|
||||||
if (newArrayList != null) {
|
|
||||||
return newArrayList;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
case REDUCE -> {
|
case REDUCE -> {
|
||||||
createReduceJobTasks(context, nodeInfoList, serverNodes);
|
// REDUCE任务
|
||||||
|
return createReduceJobTasks(context, nodeInfoList, serverNodes);
|
||||||
}
|
}
|
||||||
default -> throw new SnailJobServerException("Map reduce stage is not existed");
|
default -> throw new SnailJobServerException("Map reduce stage is not existed");
|
||||||
}
|
}
|
||||||
|
|
||||||
return Lists.newArrayList();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
|
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
|
||||||
Set<RegisterNodeInfo> serverNodes) {
|
Set<RegisterNodeInfo> serverNodes) {
|
||||||
|
|
||||||
// TODO reduce阶段的并行度
|
// TODO reduce阶段的并行度
|
||||||
int reduceParallel = 10;
|
int reduceParallel = 10;
|
||||||
|
|
||||||
List<String> allMapJobTasks = StreamUtils.toList(jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||||
.select(JobTask::getResultMessage)
|
.select(JobTask::getResultMessage, JobTask::getExtAttrs)
|
||||||
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
|
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
|
||||||
), JobTask::getResultMessage);
|
|
||||||
|
// 若存在已经生成的reduce任务不需要重新生成
|
||||||
|
boolean existedReduce = jobTasks.stream()
|
||||||
|
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
|
||||||
|
.map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class))
|
||||||
|
.anyMatch(jobTaskExtAttrsDTO -> MapReduceStageEnum.REDUCE.name().equals(jobTaskExtAttrsDTO.getMrStage()));
|
||||||
|
if (existedReduce) {
|
||||||
|
SnailJobLog.LOCAL.warn("The reduce task already exists. taskBatchId:[{}]", context.getTaskBatchId());
|
||||||
|
return Lists.newArrayList();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 这里需要判断是否是map
|
||||||
|
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
|
||||||
|
|
||||||
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
|
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
|
||||||
|
|
||||||
@ -93,7 +105,12 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
|
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
|
||||||
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name());
|
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name());
|
||||||
|
|
||||||
List<JobTask> jobTasks = new ArrayList<>(partition.size());
|
jobTasks = new ArrayList<>(partition.size());
|
||||||
|
|
||||||
|
final List<JobTask> finalJobTasks = jobTasks;
|
||||||
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
|
@Override
|
||||||
|
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||||
for (int index = 0; index < partition.size(); index++) {
|
for (int index = 0; index < partition.size(); index++) {
|
||||||
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
|
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
|
||||||
// 新增任务实例
|
// 新增任务实例
|
||||||
@ -106,9 +123,13 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
|
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
|
||||||
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
||||||
() -> new SnailJobServerException("新增任务实例失败"));
|
() -> new SnailJobServerException("新增任务实例失败"));
|
||||||
jobTasks.add(jobTask);
|
finalJobTasks.add(jobTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return finalJobTasks;
|
||||||
|
}
|
||||||
|
|
||||||
private @Nullable List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
|
private @Nullable List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
|
||||||
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
|
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
|
||||||
@ -122,8 +143,12 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
jobTaskExtAttrsDTO.setMapName(context.getMapName());
|
jobTaskExtAttrsDTO.setMapName(context.getMapName());
|
||||||
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
|
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
|
||||||
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name());
|
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name());
|
||||||
|
|
||||||
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
|
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
|
||||||
|
|
||||||
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
|
@Override
|
||||||
|
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||||
|
|
||||||
for (int index = 0; index < mapSubTask.size(); index++) {
|
for (int index = 0; index < mapSubTask.size(); index++) {
|
||||||
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
|
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
|
||||||
// 新增任务实例
|
// 新增任务实例
|
||||||
@ -138,6 +163,9 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
() -> new SnailJobServerException("新增任务实例失败"));
|
() -> new SnailJobServerException("新增任务实例失败"));
|
||||||
jobTasks.add(jobTask);
|
jobTasks.add(jobTask);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
return jobTasks;
|
return jobTasks;
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,7 @@ public class JobTaskBatchHandler {
|
|||||||
|
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
||||||
new LambdaQueryWrapper<JobTask>()
|
new LambdaQueryWrapper<JobTask>()
|
||||||
.select(JobTask::getTaskStatus, JobTask::getResultMessage, JobTask::getExtAttrs)
|
.select(JobTask::getTaskStatus, JobTask::getExtAttrs)
|
||||||
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
|
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
|
||||||
|
|
||||||
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
|
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
|
||||||
@ -99,24 +99,8 @@ public class JobTaskBatchHandler {
|
|||||||
} else {
|
} else {
|
||||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
|
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
|
||||||
|
|
||||||
// 判断是否是mapreduce任务
|
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
|
||||||
// todo 此处待优化
|
|
||||||
JobTask firstJobTask = jobTasks.get(0);
|
|
||||||
String extAttrs = firstJobTask.getExtAttrs();
|
|
||||||
if (StrUtil.isNotBlank(extAttrs)) {
|
|
||||||
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(),
|
|
||||||
JobTaskExtAttrsDTO.class);
|
|
||||||
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
|
|
||||||
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
|
|
||||||
// 开启reduce阶段
|
|
||||||
try {
|
|
||||||
ActorRef actorRef = ActorGenerator.jobReduceActor();
|
|
||||||
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
|
|
||||||
return false;
|
return false;
|
||||||
} catch (Exception e) {
|
|
||||||
SnailJobLog.LOCAL.error("tell reduce actor error", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,6 +124,36 @@ public class JobTaskBatchHandler {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 若需要执行reduce则返回false 不需要更新批次状态, 否则需要更新批次状态
|
||||||
|
*
|
||||||
|
* @param completeJobBatchDTO 需要执行批次完成所需的参数信息
|
||||||
|
* @param jobTasks 任务项列表
|
||||||
|
* @return true-需要reduce false-不需要reduce
|
||||||
|
*/
|
||||||
|
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
|
||||||
|
// 判断是否是mapreduce任务
|
||||||
|
// todo 此处待优化
|
||||||
|
JobTask firstJobTask = jobTasks.get(0);
|
||||||
|
String extAttrs = firstJobTask.getExtAttrs();
|
||||||
|
if (StrUtil.isNotBlank(extAttrs)) {
|
||||||
|
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
|
||||||
|
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
|
||||||
|
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
|
||||||
|
// 开启reduce阶段
|
||||||
|
try {
|
||||||
|
ActorRef actorRef = ActorGenerator.jobReduceActor();
|
||||||
|
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
SnailJobLog.LOCAL.error("tell reduce actor error", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
|
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
|
||||||
return jobTasks.size() == jobTasks.stream()
|
return jobTasks.size() == jobTasks.stream()
|
||||||
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
|
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
|
||||||
|
Loading…
Reference in New Issue
Block a user