feat(sj_1.1.0-beta2): 优化MapReduce客户端负载均衡算法

This commit is contained in:
opensnail 2024-07-02 23:30:27 +08:00
parent 8d5ae650b1
commit 88556aa7c1
8 changed files with 69 additions and 30 deletions

View File

@ -22,7 +22,6 @@ import java.util.stream.Stream;
@Component
@RequiredArgsConstructor
public class ClientNodeAllocateHandler {
private final AccessTemplate accessTemplate;
/**
* 获取分配的节点

View File

@ -67,7 +67,7 @@ public class ReduceActor extends AbstractActor {
String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId());
distributedLockHandler.lockWithDisposableAndRetry(() -> {
doReduce(reduceTask);
}, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 3);
}, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 6);
} catch (Exception e) {
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.executor.job;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
@ -32,6 +33,9 @@ public class BroadcastTaskJobExecutor extends AbstractJobExecutor {
List<JobTask> taskList = context.getTaskList();
for (JobTask jobTask : taskList) {
if (StrUtil.isBlank(jobTask.getClientInfo())) {
continue;
}
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.executor.job;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
@ -37,6 +38,9 @@ public class ClusterJobExecutor extends AbstractJobExecutor {
}
JobTask jobTask = taskList.get(0);
if (StrUtil.isBlank(jobTask.getClientInfo())) {
return;
}
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.executor.job;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
@ -28,6 +29,9 @@ public class MapReduceJobExecutor extends AbstractJobExecutor {
protected void doExecute(final JobExecutorContext context) {
List<JobTask> taskList = context.getTaskList();
for (final JobTask jobTask : taskList) {
if (StrUtil.isBlank(jobTask.getClientInfo())) {
return;
}
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.executor.job;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
@ -31,6 +32,9 @@ public class ShardingJobExecutor extends AbstractJobExecutor {
List<JobTask> taskList = context.getTaskList();
for (int i = 0; i < taskList.size(); i++) {
JobTask jobTask = taskList.get(i);
if (StrUtil.isBlank(jobTask.getClientInfo())) {
return;
}
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
realJobExecutor.setShardingIndex(i);

View File

@ -12,9 +12,12 @@ import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.allocate.client.ClientLoadBalanceManager;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.MapReduceArgsStrDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
@ -48,6 +51,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private static final String REDUCE_TASK = "REDUCE_TASK";
private final JobTaskMapper jobTaskMapper;
private final TransactionTemplate transactionTemplate;
private final ClientNodeAllocateHandler clientNodeAllocateHandler;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
@ -56,34 +60,34 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
@Override
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
context.getNamespaceId());
if (CollUtil.isEmpty(serverNodes)) {
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
// Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
// context.getNamespaceId());
// if (CollUtil.isEmpty(serverNodes)) {
// SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
// return Lists.newArrayList();
// }
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
// List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage());
Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed"));
switch (Objects.requireNonNull(mapReduceStageEnum)) {
case MAP -> {
// MAP任务
return createMapJobTasks(context, nodeInfoList, serverNodes);
return createMapJobTasks(context);
}
case REDUCE -> {
// REDUCE任务
return createReduceJobTasks(context, nodeInfoList, serverNodes);
return createReduceJobTasks(context);
}
case MERGE_REDUCE -> {
// REDUCE任务
return createMergeReduceJobTasks(context, nodeInfoList, serverNodes);
return createMergeReduceJobTasks(context);
}
default -> throw new SnailJobServerException("Map reduce stage is not existed");
}
}
private List<JobTask> createMergeReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList, Set<RegisterNodeInfo> serverNodes) {
private List<JobTask> createMergeReduceJobTasks(JobTaskGenerateContext context) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
@ -92,16 +96,16 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(0);
Pair<String, Integer> clientInfo = getClientNodeInfo(context);
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setReduces(JsonUtil.toJsonString(StreamUtils.toList(jobTasks, JobTask::getResultMessage)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setTaskStatus(clientInfo.getValue());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage());
jobTask.setTaskName(MERGE_REDUCE_TASK);
@ -111,8 +115,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
return Lists.newArrayList(jobTask);
}
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
Set<RegisterNodeInfo> serverNodes) {
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context) {
int reduceParallel = 1;
String jobParams = null;
@ -142,21 +145,21 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTasks = new ArrayList<>(partition.size());
final List<JobTask> finalJobTasks = jobTasks;
String finalJobParams = jobParams;
final List<JobTask> finalJobTasks1 = jobTasks;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
for (int index = 0; index < partition.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
Pair<String, Integer> clientInfo = getClientNodeInfo(context);
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(finalJobParams);
jobArgsHolder.setMaps(partition.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setTaskStatus(clientInfo.getValue());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage());
jobTask.setTaskName(REDUCE_TASK);
@ -168,7 +171,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
finalJobTasks.add(jobTask);
}
batchSaveJobTasks(finalJobTasks1);
batchSaveJobTasks(finalJobTasks);
}
});
@ -176,8 +179,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
}
@NotNull
private List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
private List<JobTask> createMapJobTasks(final JobTaskGenerateContext context) {
List<?> mapSubTask = context.getMapSubTask();
if (CollUtil.isEmpty(mapSubTask)) {
SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId());
@ -198,17 +200,18 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
protected void doInTransactionWithoutResult(final TransactionStatus status) {
for (int index = 0; index < mapSubTask.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
Pair<String, Integer> clientInfo = getClientNodeInfo(context);
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setMaps(mapSubTask.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setTaskStatus(clientInfo.getValue());
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
jobTask.setTaskName(context.getTaskName());
jobTask.setLeaf(StatusEnum.YES.getStatus());
@ -236,6 +239,25 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
return jobTasks;
}
private Pair<String, Integer> getClientNodeInfo(JobTaskGenerateContext context) {
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(
context.getJobId().toString(),
context.getGroupName(),
context.getNamespaceId(),
ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType()
);
String clientInfo = StrUtil.EMPTY;
int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus();
if (Objects.isNull(serverNode)) {
JobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus();
} else {
clientInfo = ClientInfoUtils.generate(serverNode);
}
return Pair.of(clientInfo, JobTaskStatus);
}
private List<List<String>> averageAlgorithm(List<String> allMapJobTasks, int shard) {
// 最多分片数为allMapJobTasks.size()

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.generator.task;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import org.springframework.stereotype.Component;
@ -17,8 +18,9 @@ import java.util.List;
public class MapTaskGenerator extends MapReduceTaskGenerator {
public MapTaskGenerator(final JobTaskMapper jobTaskMapper,
final TransactionTemplate transactionTemplate) {
super(jobTaskMapper, transactionTemplate);
final TransactionTemplate transactionTemplate,
final ClientNodeAllocateHandler clientNodeAllocateHandler) {
super(jobTaskMapper, transactionTemplate, clientNodeAllocateHandler);
}
@Override