diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java index 4d1a853d0..ec518a8b1 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheRegisterTable.java @@ -82,7 +82,6 @@ public class CacheRegisterTable implements Lifecycle { ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); List serverNodes = serverNodeMapper.selectList( new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) .eq(ServerNode::getNamespaceId, namespaceId) .eq(ServerNode::getGroupName, groupName) .eq(ServerNode::getHostId, hostId) @@ -115,7 +114,6 @@ public class CacheRegisterTable implements Lifecycle { ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); List serverNodes = serverNodeMapper.selectList( new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) .eq(ServerNode::getNamespaceId, namespaceId) .eq(ServerNode::getGroupName, groupName)); for (final ServerNode node : serverNodes) { diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java index 0d1a787c9..7d558f8cf 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ServerNodeBalance.java @@ -169,7 +169,10 @@ public class ServerNodeBalance implements Lifecycle, Runnable { // 节点数量不一致触发 || isNodeSizeNotEqual(concurrentMap.size(), remotePods.size()) // 判断远程节点是不是和本地节点一致的,如果不一致则重新分配 - || isNodeNotMatch(remoteHostIds, localHostIds)) { + || isNodeNotMatch(remoteHostIds, localHostIds) + // 检查当前节点的消费桶是否为空,为空则重新负载 + || checkConsumerBucket(remoteHostIds) + ) { // 删除本地缓存以下线的节点信息 removeNode(concurrentMap, remoteHostIds, localHostIds); @@ -226,9 +229,19 @@ public class ServerNodeBalance implements Lifecycle, Runnable { localHostIds, remoteHostIds); } + + // 若在线节点小于总的Bucket数量且当前节点无任何分桶,则需要重新负载 + if (CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= systemProperties.getBucketTotal()) { + return true; + } + return b; } + public boolean checkConsumerBucket(Set remoteHostIds) { + return CollUtil.isEmpty(DistributeInstance.INSTANCE.getConsumerBucket()) && remoteHostIds.size() <= systemProperties.getBucketTotal(); + } + private boolean isNodeSizeNotEqual(int localNodeSize, int remoteNodeSize) { boolean b = localNodeSize != remoteNodeSize; if (b) { @@ -239,15 +252,4 @@ public class ServerNodeBalance implements Lifecycle, Runnable { return b; } - private boolean isGroupSizeNotEqual(List removeGroupConfig, Set allGroup) { - boolean b = allGroup.size() != removeGroupConfig.size(); - if (b) { - SnailJobLog.LOCAL.info("若存在远程和本地缓存的组的数量不一致则触发rebalance. localGroupSize:[{}] remoteGroupSize:[{}]", - allGroup.size(), - removeGroupConfig.size()); - } - return b; - } - - } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 996e10d35..8ffe492d9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; @@ -132,6 +133,8 @@ public class JobExecutorActor extends AbstractActor { JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); instanceGenerateContext.setMapName("ROOT_TASK"); + // TODO 此处需要判断任务类型 + instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP); List taskList = taskInstance.generate(instanceGenerateContext); if (CollUtil.isEmpty(taskList)) { return; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapReduceTaskStopHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapReduceTaskStopHandler.java new file mode 100644 index 000000000..250031220 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapReduceTaskStopHandler.java @@ -0,0 +1,37 @@ +package com.aizuda.snailjob.server.job.task.support.stop; + +import akka.actor.ActorRef; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.job.task.dto.RealStopTaskInstanceDTO; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Component +public class MapReduceTaskStopHandler extends AbstractJobTaskStopHandler { + + @Override + public JobTaskTypeEnum getTaskType() { + return JobTaskTypeEnum.MAP_REDUCE; + } + + @Override + protected void doStop(final TaskStopJobContext context) { + for (final JobTask jobTask : context.getJobTasks()) { + RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context); + taskInstanceDTO.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); + + ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor(); + actorRef.tell(taskInstanceDTO, actorRef); + } + } + + +}