fix(sj_1.1.0-beta2): 修复reduce分片算法错误问题
This commit is contained in:
parent
d03ac24715
commit
6dae8aace5
@ -131,10 +131,15 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
|
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
|
||||||
);
|
);
|
||||||
|
|
||||||
// 这里需要判断是否是map
|
if (CollUtil.isEmpty(jobTasks)) {
|
||||||
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
|
return Lists.newArrayList();
|
||||||
|
}
|
||||||
|
|
||||||
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
|
// 这里需要判断是否是map
|
||||||
|
// 平均分配map集合, 若reduceParallel > allMapJobTasks.size(), 则取allMapJobTasks.size()作为分片数
|
||||||
|
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
|
||||||
|
int size = (allMapJobTasks.size() + reduceParallel - 1) / reduceParallel;
|
||||||
|
List<List<String>> partition = Lists.partition(allMapJobTasks, size);
|
||||||
|
|
||||||
jobTasks = new ArrayList<>(partition.size());
|
jobTasks = new ArrayList<>(partition.size());
|
||||||
final List<JobTask> finalJobTasks = jobTasks;
|
final List<JobTask> finalJobTasks = jobTasks;
|
||||||
|
Loading…
Reference in New Issue
Block a user