fix(sj_1.1.0): 支持工作流全局上下文

This commit is contained in:
opensnail 2024-06-17 18:32:04 +08:00
parent e79e81cacc
commit e394d65ec0

View File

@ -22,14 +22,19 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
public ExecuteResult doJobExecute(final JobArgs jobArgs) { public ExecuteResult doJobExecute(final JobArgs jobArgs) {
JobContext jobContext = JobContextManager.getJobContext(); JobContext jobContext = JobContextManager.getJobContext();
if (jobContext.getMrStage().equals(MapReduceStageEnum.MAP.getStage())) { if (jobContext.getMrStage().equals(MapReduceStageEnum.MAP.getStage())) {
return super.doJobExecute(jobArgs); return super.doJobExecute(jobArgs);
} else if(jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) { } else if (jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) {
ReduceArgs reduceArgs = (ReduceArgs) jobArgs; ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
return doReduceExecute(reduceArgs); return this.doReduceExecute(reduceArgs);
} else if (jobContext.getMrStage().equals(MapReduceStageEnum.MERGE_REDUCE.getStage())) {
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
return this.doMergeReduceExecute(reduceArgs);
} }
throw new SnailJobMapReduceException("非法的MapReduceStage"); throw new SnailJobMapReduceException("非法的MapReduceStage");
} }
protected abstract ExecuteResult doReduceExecute(ReduceArgs reduceArgs); protected abstract ExecuteResult doReduceExecute(ReduceArgs reduceArgs);
protected abstract ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs);
} }