From bfadfe61626b4d2f5b7ca3de753b8b4c3f47cb22 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Fri, 28 Jun 2024 15:45:47 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0-beta2):=20=E4=BF=AE=E5=A4=8Dredu?= =?UTF-8?q?ce=E5=88=86=E7=89=87=E7=AE=97=E6=B3=95=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/MapReduceTaskGenerator.java | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 7de1fd40..548899e0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -136,10 +136,8 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { } // 这里需要判断是否是map - // 平均分配map集合, 若reduceParallel > allMapJobTasks.size(), 则取allMapJobTasks.size()作为分片数 List allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage); - int size = (allMapJobTasks.size() + reduceParallel - 1) / reduceParallel; - List> partition = Lists.partition(allMapJobTasks, size); + List> partition = averageAlgorithm(allMapJobTasks, reduceParallel); jobTasks = new ArrayList<>(partition.size()); final List finalJobTasks = jobTasks; @@ -238,4 +236,28 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { return jobTasks; } + private List> averageAlgorithm(List allMapJobTasks, int shard) { + + // 最多分片数为allMapJobTasks.size() + shard = Math.min(allMapJobTasks.size(), shard); + int totalSize = allMapJobTasks.size(); + List partitionSizes = new ArrayList<>(); + int quotient = totalSize / shard; + int remainder = totalSize % shard; + + for (int i = 0; i < shard; i++) { + partitionSizes.add(quotient + (i < remainder ? 1 : 0)); + } + + List> partitions = new ArrayList<>(); + int currentIndex = 0; + + for (int size : partitionSizes) { + int endIndex = Math.min(currentIndex + size, totalSize); + partitions.add(new ArrayList<>(allMapJobTasks.subList(currentIndex, endIndex))); + currentIndex = endIndex; + } + + return partitions; + } }