fix(sj_1.1.0): 优化map reduce

This commit is contained in:
opensnail 2024-06-21 17:53:11 +08:00
parent 94759361ef
commit 05f6b2ab09
15 changed files with 165 additions and 43 deletions

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.client.job.core.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.List;
/**
* Task执行结果
*
* @author: opensnail
* @date : 2024-06-12 13:59
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class MergeReduceArgs extends JobArgs {
private List<?> reduces;
}

View File

@ -1,6 +1,5 @@
package com.aizuda.snailjob.client.job.core.dto;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -18,7 +17,4 @@ public class ReduceArgs extends JobArgs {
private List<?> mapResult;
public List<?> getMapResult() {
return JsonUtil.parseList(getArgsStr(), List.class);
}
}

View File

@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -60,8 +59,10 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
.contains(jobContext.getTaskType())) {
if (MapReduceStageEnum.MAP.getStage() == jobContext.getMrStage()) {
jobArgs = buildMapJobArgs(jobContext);
} else {
} else if (MapReduceStageEnum.REDUCE.getStage() == jobContext.getMrStage()) {
jobArgs = buildReduceJobArgs(jobContext);
} else {
jobArgs = buildMergeReduceJobArgs(jobContext);
}
} else {
@ -118,7 +119,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
MapArgs jobArgs = new MapArgs();
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
jobArgs.setJobParams(jobArgsHolder.getJobParams());
jobArgs.setMapResult(jobArgsHolder.getMapResult());
jobArgs.setMapResult(jobArgsHolder.getMaps());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskName(jobContext.getTaskName());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
@ -129,7 +130,24 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
ReduceArgs jobArgs = new ReduceArgs();
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
jobArgs.setJobParams(jobArgsHolder.getJobParams());
jobArgs.setMapResult(JsonUtil.parseList(jobArgsHolder.getMapResult(), List.class));
String maps = jobArgsHolder.getMaps();
if (StrUtil.isNotBlank(maps)) {
jobArgs.setMapResult(JsonUtil.parseList(maps, Object.class));
}
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
jobArgs.setWfContext(jobContext.getWfContext());
return jobArgs;
}
private static JobArgs buildMergeReduceJobArgs(JobContext jobContext) {
MergeReduceArgs jobArgs = new MergeReduceArgs();
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
jobArgs.setJobParams(jobArgsHolder.getJobParams());
String reduces = jobArgsHolder.getReduces();
if (StrUtil.isNotBlank(reduces)) {
jobArgs.setReduces(JsonUtil.parseList(reduces, Object.class));
}
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
jobArgs.setWfContext(jobContext.getWfContext());

View File

@ -9,6 +9,7 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.NettyResult;
@ -67,7 +68,7 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
// 2. 同步发送请求
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);
if (result.getData()) {
if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) {
SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId());
} else {
throw new SnailJobMapReduceException("map failed for task: " + nextTaskName);

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.job.core.executor;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
@ -27,7 +28,7 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
return this.doReduceExecute(reduceArgs);
} else if (jobContext.getMrStage().equals(MapReduceStageEnum.MERGE_REDUCE.getStage())) {
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
MergeReduceArgs reduceArgs = (MergeReduceArgs) jobArgs;
return this.doMergeReduceExecute(reduceArgs);
}
@ -36,5 +37,5 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
protected abstract ExecuteResult doReduceExecute(ReduceArgs reduceArgs);
protected abstract ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs);
protected abstract ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs);
}

View File

@ -21,11 +21,11 @@ public class JobArgsHolder {
/**
* 动态分片 map节点的结果
*/
private String mapResult;
private String maps;
/**
* 动态分片 reduce执行的结果
*/
private String reduceResult;
private String reduces;
}

View File

@ -1,8 +1,12 @@
package com.aizuda.snailjob.template.datasource.persistence.mapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* <p>
@ -15,4 +19,5 @@ import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface JobTaskMapper extends BaseMapper<JobTask> {
int insertBatch(@Param("list") List<JobTask> list);
}

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper">
<!-- 定义批量新增的 SQL 映射 -->
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_job_task (namespace_id, group_name, job_id, task_batch_id, parent_id, task_status,
retry_count, mr_stage, leaf, task_name, client_info, wf_context, args_str, result_message, args_type, ext_attrs,
create_dt, update_dt)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId},
#{item.groupName},
#{item.jobId},
#{item.taskBatchId},
#{item.parentId},
#{item.taskStatus},
#{item.retryCount},
#{item.mrStage},
#{item.leaf},
#{item.taskName},
#{item.clientInfo},
#{item.wfContext},
#{item.argsStr},
#{item.resultMessage},
#{item.argsType},
#{item.extAttrs},
#{item.createDt},
#{item.updateDt}
)
</foreach>
</insert>
</mapper>

View File

@ -42,7 +42,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
private static final String CALLBACK_TIMEOUT = "10";
private final RestTemplate restTemplate;
private final JobTaskMapper jobTaskMapper;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
@ -70,8 +69,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
invokeCallback(context);
}
// 执行下一个节点
workflowTaskExecutor(context);
// ToDo 执行下一个节点
// workflowTaskExecutor(context);
}

View File

@ -55,6 +55,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
int operationReason = JobOperationReasonEnum.NONE.getReason();
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
String message = StrUtil.EMPTY;
String wfContext = "";
Boolean result = (Boolean) Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE);
@ -66,27 +67,30 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
} else {
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) {
try {
// 这里重新加载一次最新的上下文
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, context.getWorkflowTaskBatchId())
);
if (Objects.isNull(workflowTaskBatch)) {
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
} else {
wfContext = workflowTaskBatch.getWfContext();
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), workflowTaskBatch.getWfContext())).orElse(Boolean.FALSE);
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), wfContext)).orElse(Boolean.FALSE);
if (!result) {
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
}
}
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e);
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), wfContext, e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
@ -98,16 +102,13 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
}
}
// if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus && result) {
// workflowTaskExecutor(context);
// }
// 回传执行结果
context.setEvaluationResult(result);
context.setTaskBatchStatus(taskBatchStatus);
context.setOperationReason(operationReason);
context.setJobTaskStatus(jobTaskStatus);
context.setLogMessage(message);
context.setWfContext(wfContext);
}

View File

@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
@ -20,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
/**
@ -65,11 +67,16 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setArgsType(context.getArgsType());
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setParentId(0L);
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setRetryCount(0);
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
clientInfoSet.add(address);
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
return jobTasks;
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
@ -54,7 +55,7 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(serverNode));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
@ -28,6 +29,7 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.*;
/**
@ -98,7 +100,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setReduceResult(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
jobArgsHolder.setReduces(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
@ -139,22 +141,23 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setMapResult(JsonUtil.toJsonString(partition.get(index)));
jobArgsHolder.setJobParams(StrUtil.isBlank(context.getArgsStr()) ? null : context.getArgsStr());
jobArgsHolder.setMaps(JsonUtil.toJsonString(partition.get(index)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage());
jobTask.setTaskName("REDUCE_TASK");
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
// () -> new SnailJobServerException("新增任务实例失败"));
jobTask.setParentId(0L);
jobTask.setRetryCount(0);
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
finalJobTasks.add(jobTask);
}
Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insert(finalJobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insertBatch(finalJobTasks), () -> new SnailJobServerException("新增任务实例失败"));
}
});
@ -192,19 +195,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setMapResult(JsonUtil.toJsonString(mapSubTask.get(index)));
jobArgsHolder.setMaps(JsonUtil.toJsonString(mapSubTask.get(index)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
jobTask.setTaskName(context.getTaskName());
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId());
jobTask.setRetryCount(0);
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
// () -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
// 更新父节点的为非叶子节点
if (CollUtil.isNotEmpty(parentJobTasks)) {

View File

@ -3,8 +3,10 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -22,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -76,17 +79,21 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(argsStrs.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
jobTask.setParentId(0L);
jobTask.setRetryCount(0);
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
return jobTasks;

View File

@ -135,13 +135,39 @@ public class JobTaskBatchHandler {
*/
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
Integer mrStage = null;
if (isAllMapTask(jobTasks)) {
int reduceCount = 0;
int mapCount = 0;
for (final JobTask jobTask : jobTasks) {
if (Objects.isNull(jobTask.getMrStage())) {
continue;
}
// 存在MERGE_REDUCE任务了不需要生成
if (MERGE_REDUCE.getStage() == jobTask.getMrStage()) {
return false;
}
// REDUCE任务累加
if (REDUCE.getStage() == jobTask.getMrStage()) {
reduceCount++;
continue;
}
// MAP任务累加
if (MAP.getStage() == jobTask.getMrStage()) {
mapCount++;
}
}
// 若存在2个以上的reduce任务则开启merge reduce任务
if (reduceCount > 1) {
mrStage = MERGE_REDUCE.getStage();
} else if (mapCount == jobTasks.size()) {
// 若都是MAP任务则开启Reduce任务
mrStage = REDUCE.getStage();
} else if (isALeastOneReduceTask(jobTasks)) {
// 若存在2个以上的reduce任务则开启merge reduce任务
mrStage = MERGE_REDUCE.getStage();
} else {
// 若既不是MAP也是不REDUCE则是其他类型的任务直接返回即可
return false;
}