diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java index 0c801e23..4a59f00f 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java @@ -22,14 +22,19 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor { public ExecuteResult doJobExecute(final JobArgs jobArgs) { JobContext jobContext = JobContextManager.getJobContext(); if (jobContext.getMrStage().equals(MapReduceStageEnum.MAP.getStage())) { - return super.doJobExecute(jobArgs); - } else if(jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) { + return super.doJobExecute(jobArgs); + } else if (jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) { ReduceArgs reduceArgs = (ReduceArgs) jobArgs; - return doReduceExecute(reduceArgs); + return this.doReduceExecute(reduceArgs); + } else if (jobContext.getMrStage().equals(MapReduceStageEnum.MERGE_REDUCE.getStage())) { + ReduceArgs reduceArgs = (ReduceArgs) jobArgs; + return this.doMergeReduceExecute(reduceArgs); } throw new SnailJobMapReduceException("非法的MapReduceStage"); } protected abstract ExecuteResult doReduceExecute(ReduceArgs reduceArgs); + + protected abstract ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs); }