fix(sj_1.1.0-beta2): 修复reduce分片算法错误问题
This commit is contained in:
parent
6dae8aace5
commit
bfadfe6162
@ -136,10 +136,8 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 这里需要判断是否是map
|
// 这里需要判断是否是map
|
||||||
// 平均分配map集合, 若reduceParallel > allMapJobTasks.size(), 则取allMapJobTasks.size()作为分片数
|
|
||||||
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
|
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
|
||||||
int size = (allMapJobTasks.size() + reduceParallel - 1) / reduceParallel;
|
List<List<String>> partition = averageAlgorithm(allMapJobTasks, reduceParallel);
|
||||||
List<List<String>> partition = Lists.partition(allMapJobTasks, size);
|
|
||||||
|
|
||||||
jobTasks = new ArrayList<>(partition.size());
|
jobTasks = new ArrayList<>(partition.size());
|
||||||
final List<JobTask> finalJobTasks = jobTasks;
|
final List<JobTask> finalJobTasks = jobTasks;
|
||||||
@ -238,4 +236,28 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
|
|
||||||
return jobTasks;
|
return jobTasks;
|
||||||
}
|
}
|
||||||
|
private List<List<String>> averageAlgorithm(List<String> allMapJobTasks, int shard) {
|
||||||
|
|
||||||
|
// 最多分片数为allMapJobTasks.size()
|
||||||
|
shard = Math.min(allMapJobTasks.size(), shard);
|
||||||
|
int totalSize = allMapJobTasks.size();
|
||||||
|
List<Integer> partitionSizes = new ArrayList<>();
|
||||||
|
int quotient = totalSize / shard;
|
||||||
|
int remainder = totalSize % shard;
|
||||||
|
|
||||||
|
for (int i = 0; i < shard; i++) {
|
||||||
|
partitionSizes.add(quotient + (i < remainder ? 1 : 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<List<String>> partitions = new ArrayList<>();
|
||||||
|
int currentIndex = 0;
|
||||||
|
|
||||||
|
for (int size : partitionSizes) {
|
||||||
|
int endIndex = Math.min(currentIndex + size, totalSize);
|
||||||
|
partitions.add(new ArrayList<>(allMapJobTasks.subList(currentIndex, endIndex)));
|
||||||
|
currentIndex = endIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
return partitions;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user