From 43c03c7fe2d5162a0eb334205be49c9f6c136dd0 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Thu, 13 Jun 2024 18:29:18 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0):=20=E6=B7=BB=E5=8A=A0=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E5=88=86=E7=89=87=E7=9A=84=E5=81=9C=E6=AD=A2=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/cache/CacheRegisterTable.java | 2 - .../common/handler/ServerNodeBalance.java | 26 +++++++------ .../support/dispatch/JobExecutorActor.java | 3 ++ .../stop/MapReduceTaskStopHandler.java | 37 +++++++++++++++++++ 4 files changed, 54 insertions(+), 14 deletions(-) create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapReduceTaskStopHandler.java diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java index 4d1a853d0..ec518a8b1 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java @@ -82,7 +82,6 @@ public class CacheRegisterTable implements Lifecycle { ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); List serverNodes = serverNodeMapper.selectList( new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) .eq(ServerNode::getNamespaceId, namespaceId) .eq(ServerNode::getGroupName, groupName) .eq(ServerNode::getHostId, hostId) @@ -115,7 +114,6 @@ public class CacheRegisterTable implements Lifecycle { ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); List serverNodes = serverNodeMapper.selectList( new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) .eq(ServerNode::getNamespaceId, namespaceId) .eq(ServerNode::getGroupName, groupName)); for (final ServerNode node : serverNodes) { diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java index 0d1a787c9..7d558f8cf 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java @@ -169,7 +169,10 @@ public class ServerNodeBalance implements Lifecycle, Runnable { // 节点数量不一致触发 || isNodeSizeNotEqual(concurrentMap.size(), remotePods.size()) // 判断远程节点是不是和本地节点一致的,如果不一致则重新分配 - || isNodeNotMatch(remoteHostIds, localHostIds)) { + || isNodeNotMatch(remoteHostIds, localHostIds) + // 检查当前节点的消费桶是否为空,为空则重新负载 + || checkConsumerBucket(remoteHostIds) + ) { // 删除本地缓存以下线的节点信息 removeNode(concurrentMap, remoteHostIds, localHostIds); @@ -226,9 +229,19 @@ public class ServerNodeBalance implements Lifecycle, Runnable { localHostIds, remoteHostIds); } + + // 若在线节点小于总的Bucket数量且当前节点无任何分桶,则需要重新负载 + if (CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= systemProperties.getBucketTotal()) { + return true; + } + return b; } + public boolean checkConsumerBucket(Set remoteHostIds) { + return CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= systemProperties.getBucketTotal(); + } + private boolean isNodeSizeNotEqual(int localNodeSize, int remoteNodeSize) { boolean b = localNodeSize != remoteNodeSize; if (b) { @@ -239,15 +252,4 @@ public class ServerNodeBalance implements Lifecycle, Runnable { return b; } - private boolean isGroupSizeNotEqual(List removeGroupConfig, Set allGroup) { - boolean b = allGroup.size() != removeGroupConfig.size(); - if (b) { - SnailJobLog.LOCAL.info("若存在远程和本地缓存的组的数量不一致则触发rebalance. localGroupSize:[{}] remoteGroupSize:[{}]", - allGroup.size(), - removeGroupConfig.size()); - } - return b; - } - - } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 996e10d35..8ffe492d9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; @@ -132,6 +133,8 @@ public class JobExecutorActor extends AbstractActor { JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); instanceGenerateContext.setMapName("ROOT_TASK"); + // TODO 此处需要判断任务类型 + instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP); List taskList = taskInstance.generate(instanceGenerateContext); if (CollUtil.isEmpty(taskList)) { return; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapReduceTaskStopHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapReduceTaskStopHandler.java new file mode 100644 index 000000000..250031220 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapReduceTaskStopHandler.java @@ -0,0 +1,37 @@ +package com.aizuda.snailjob.server.job.task.support.stop; + +import akka.actor.ActorRef; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.job.task.dto.RealStopTaskInstanceDTO; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Component +public class MapReduceTaskStopHandler extends AbstractJobTaskStopHandler { + + @Override + public JobTaskTypeEnum getTaskType() { + return JobTaskTypeEnum.MAP_REDUCE; + } + + @Override + protected void doStop(final TaskStopJobContext context) { + for (final JobTask jobTask : context.getJobTasks()) { + RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context); + taskInstanceDTO.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); + + ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor(); + actorRef.tell(taskInstanceDTO, actorRef); + } + } + + +}