fix(sj_1.1.0): 添加动态分片的停止功能

This commit is contained in:
opensnail 2024-06-13 18:29:18 +08:00
parent 3f33255792
commit 100cd1271b
4 changed files with 54 additions and 14 deletions

View File

@ -82,7 +82,6 @@ public class CacheRegisterTable implements Lifecycle {
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())
.eq(ServerNode::getNamespaceId, namespaceId)
.eq(ServerNode::getGroupName, groupName)
.eq(ServerNode::getHostId, hostId)
@ -115,7 +114,6 @@ public class CacheRegisterTable implements Lifecycle {
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())
.eq(ServerNode::getNamespaceId, namespaceId)
.eq(ServerNode::getGroupName, groupName));
for (final ServerNode node : serverNodes) {

View File

@ -166,7 +166,10 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
// 节点数量不一致触发
|| isNodeSizeNotEqual(concurrentMap.size(), remotePods.size())
// 判断远程节点是不是和本地节点一致的如果不一致则重新分配
|| isNodeNotMatch(remoteHostIds, localHostIds)) {
|| isNodeNotMatch(remoteHostIds, localHostIds)
// 检查当前节点的消费桶是否为空为空则重新负载
|| checkConsumerBucket(remoteHostIds)
) {
// 删除本地缓存以下线的节点信息
removeNode(concurrentMap, remoteHostIds, localHostIds);
@ -223,9 +226,19 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
localHostIds,
remoteHostIds);
}
// 若在线节点小于总的Bucket数量且当前节点无任何分桶则需要重新负载
if (CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= systemProperties.getBucketTotal()) {
return true;
}
return b;
}
public boolean checkConsumerBucket(Set<String> remoteHostIds) {
return CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= systemProperties.getBucketTotal();
}
private boolean isNodeSizeNotEqual(int localNodeSize, int remoteNodeSize) {
boolean b = localNodeSize != remoteNodeSize;
if (b) {
@ -236,15 +249,4 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
return b;
}
private boolean isGroupSizeNotEqual(List<GroupConfig> removeGroupConfig, Set<String> allGroup) {
boolean b = allGroup.size() != removeGroupConfig.size();
if (b) {
SnailJobLog.LOCAL.info("若存在远程和本地缓存的组的数量不一致则触发rebalance. localGroupSize:[{}] remoteGroupSize:[{}]",
allGroup.size(),
removeGroupConfig.size());
}
return b;
}
}

View File

@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
@ -132,6 +133,8 @@ public class JobExecutorActor extends AbstractActor {
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
instanceGenerateContext.setMapName("ROOT_TASK");
// TODO 此处需要判断任务类型
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollUtil.isEmpty(taskList)) {
return;

View File

@ -0,0 +1,37 @@
package com.aizuda.snailjob.server.job.task.support.stop;
import akka.actor.ActorRef;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.RealStopTaskInstanceDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-06-13
* @since : sj_1.1.0
*/
@Component
public class MapReduceTaskStopHandler extends AbstractJobTaskStopHandler {
@Override
public JobTaskTypeEnum getTaskType() {
return JobTaskTypeEnum.MAP_REDUCE;
}
@Override
protected void doStop(final TaskStopJobContext context) {
for (final JobTask jobTask : context.getJobTasks()) {
RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context);
taskInstanceDTO.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor();
actorRef.tell(taskInstanceDTO, actorRef);
}
}
}