From f8bd156675f06f3c2d643629683eb93034c18169 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 21 Jun 2024 16:33:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96MapReduce=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/TestMapReduceJobExecutor.java | 36 +++++++++++++------ .../job/TestWorkflowAnnoJobExecutor.java | 2 +- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java index 66db437..44e0086 100644 --- a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -1,15 +1,18 @@ package com.example.snailjob.job; import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.example.snailjob.job.TestMapReduceJobExecutor.MonthMap.SubTask; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -31,14 +34,20 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Override public ExecuteResult doJobMapExecute(MapArgs mapArgs) { MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); - if (Objects.nonNull(mapEnum)) { + if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) { Map map = mapEnum.getMap(); - return doMap(map.map(mapArgs), mapEnum.name()); + MapEnum nextMap = mapEnum.getNextMap(); + String nextName = null; + if (Objects.nonNull(nextMap)) { + nextName = nextMap.name(); + } + + return doMap(map.map(mapArgs), nextName); } // 未找到map的任务,则说明当前需要进行处理 String mapResult = mapArgs.getMapResult(); - + SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", mapResult); // 获取最后一次map的信息. SubTask subTask = JsonUtil.parseObject(mapResult, SubTask.class); // 此处可以统计数据或者做其他的事情 @@ -54,20 +63,24 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { } @Override - protected ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs) { - return ExecuteResult.success(reduceArgs.getMapResult()); + protected ExecuteResult doMergeReduceExecute(MergeReduceArgs reduceArgs) { + List reduces = reduceArgs.getReduces(); + SnailJobLog.LOCAL.info("merge reduce {}", reduces); + return ExecuteResult.success(111); } @Getter private enum MapEnum { - MAP_ROOT(new RootMap()), - MONTH_MAP(new MonthMap()) + LAST_MAP(null, null), + MONTH_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), MONTH_MAP), ; private final Map map; - - MapEnum(Map map) { + private final MapEnum nextMap; + MapEnum(Map map, MapEnum nextMap) { this.map = map; + this.nextMap = nextMap; } public static MapEnum ofMap(String taskName) { @@ -93,7 +106,7 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { } } - public static class MonthMap implements Map { + public static class QuarterMap implements Map { @Override public List map(MapArgs args) { @@ -113,6 +126,7 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Data @AllArgsConstructor + @NoArgsConstructor public static class SubTask { // 商家id private Long id; diff --git a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java index 4ddc70e..95a82af 100644 --- a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java @@ -25,7 +25,7 @@ public class TestWorkflowAnnoJobExecutor { // 测试上下文传递 int i = new Random().nextInt(1000); jobArgs.appendContext("name" + i, "小蜗牛" + i); - jobArgs.appendContext("age", 18); + jobArgs.appendContext("age", i); return ExecuteResult.success(failOrderPo); }