From e05bc9067ffdcf6d912857ed252d61ca11100852 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 14 Jun 2024 23:32:29 +0800 Subject: [PATCH 01/13] =?UTF-8?q?=E6=B5=8B=E8=AF=95map=20reduce?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/job/MapReduceJobExecutor.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java diff --git a/src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java new file mode 100644 index 0000000..56b9172 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java @@ -0,0 +1,36 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.model.JobContext; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Component +public class MapReduceJobExecutor extends AbstractMapReduceExecutor { + + @Override + public ExecuteResult doJobExecute(final MapReduceArgs mapReduceArgs) { + if (mapReduceArgs.getMapName().equals(SystemConstants.MAP_ROOT)) { + doMapExecute(Lists.newArrayList(Lists.newArrayList(1, 2, 3), Lists.newArrayList(4, 5, 6)), + "TWO_MAP"); + return ExecuteResult.success(); + } else { + return ExecuteResult.success(1); + } + + } + + @Override + protected ExecuteResult doReduceExecute(final JobContext jobContext, final JobArgs jobArgs) { + return null; + } +} From b72899eafae058475402dfec78a5959d9d25932e Mon Sep 17 00:00:00 2001 From: zhengweilin Date: Sun, 16 Jun 2024 21:31:27 +0800 Subject: [PATCH 02/13] =?UTF-8?q?=E6=96=B0=E5=A2=9EMapReduce=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/job/MapReduceJobExecutor.java | 36 ---------------- .../job/TestMapReduceJobExecutor.java | 41 +++++++++++++++++++ 2 files changed, 41 insertions(+), 36 deletions(-) delete mode 100644 src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java create mode 100644 src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java diff --git a/src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java deleted file mode 100644 index 56b9172..0000000 --- a/src/main/java/com/example/snailjob/job/MapReduceJobExecutor.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.example.snailjob.job; - -import com.aizuda.snailjob.client.job.core.dto.JobArgs; -import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; -import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; -import com.aizuda.snailjob.client.model.ExecuteResult; -import com.aizuda.snailjob.common.core.constant.SystemConstants; -import com.aizuda.snailjob.common.core.model.JobContext; -import com.google.common.collect.Lists; -import org.springframework.stereotype.Component; - -/** - * @author: opensnail - * @date : 2024-06-13 - * @since : sj_1.1.0 - */ -@Component -public class MapReduceJobExecutor extends AbstractMapReduceExecutor { - - @Override - public ExecuteResult doJobExecute(final MapReduceArgs mapReduceArgs) { - if (mapReduceArgs.getMapName().equals(SystemConstants.MAP_ROOT)) { - doMapExecute(Lists.newArrayList(Lists.newArrayList(1, 2, 3), Lists.newArrayList(4, 5, 6)), - "TWO_MAP"); - return ExecuteResult.success(); - } else { - return ExecuteResult.success(1); - } - - } - - @Override - protected ExecuteResult doReduceExecute(final JobContext jobContext, final JobArgs jobArgs) { - return null; - } -} diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java new file mode 100644 index 0000000..a4eeeaf --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -0,0 +1,41 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + + +/** + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Component +public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs) { + if (mapArgs.getTaskName().equals(SystemConstants.MAP_ROOT)) { + doMap(Lists.newArrayList(Lists.newArrayList(1, 2, 3), Lists.newArrayList(4, 5, 6)), "TWO_MAP"); + return ExecuteResult.success(); + } else if (mapArgs.getTaskName().equals("TWO_MAP")) { + return ExecuteResult.success(mapArgs.getArgsStr()); + } else { + return ExecuteResult.success(); + } + } + + @Override + protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) { + return ExecuteResult.success(reduceArgs.getMapResult()); + } + + @Override + protected ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs) { + return ExecuteResult.success(reduceArgs.getMapResult()); + } +} From 7f2edf0c5ff462238878e8d19b34ea4c90c8448b Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 20 Jun 2024 17:57:39 +0800 Subject: [PATCH 03/13] =?UTF-8?q?=E8=A1=A5=E5=85=85MapReduce=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/TestMapReduceJobExecutor.java | 134 ++++++++++++++++-- .../job/TestWorkflowAnnoJobExecutor.java | 9 +- 2 files changed, 132 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java index a4eeeaf..66db437 100644 --- a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -4,12 +4,23 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; import com.aizuda.snailjob.client.model.ExecuteResult; -import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.example.snailjob.job.TestMapReduceJobExecutor.MonthMap.SubTask; import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Random; + /** + * 以下是一个统计某电商公司商家的一年的营业额的计算过程 + * * @author: opensnail * @date : 2024-06-13 * @since : sj_1.1.0 @@ -19,14 +30,22 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Override public ExecuteResult doJobMapExecute(MapArgs mapArgs) { - if (mapArgs.getTaskName().equals(SystemConstants.MAP_ROOT)) { - doMap(Lists.newArrayList(Lists.newArrayList(1, 2, 3), Lists.newArrayList(4, 5, 6)), "TWO_MAP"); - return ExecuteResult.success(); - } else if (mapArgs.getTaskName().equals("TWO_MAP")) { - return ExecuteResult.success(mapArgs.getArgsStr()); - } else { - return ExecuteResult.success(); + MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); + if (Objects.nonNull(mapEnum)) { + Map map = mapEnum.getMap(); + return doMap(map.map(mapArgs), mapEnum.name()); } + + // 未找到map的任务,则说明当前需要进行处理 + String mapResult = mapArgs.getMapResult(); + + // 获取最后一次map的信息. + SubTask subTask = JsonUtil.parseObject(mapResult, SubTask.class); + // 此处可以统计数据或者做其他的事情 + // 模拟统计营业额 + int turnover = new Random().nextInt(1000000); + return ExecuteResult.success(turnover); + } @Override @@ -38,4 +57,103 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { protected ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs) { return ExecuteResult.success(reduceArgs.getMapResult()); } + + @Getter + private enum MapEnum { + MAP_ROOT(new RootMap()), + MONTH_MAP(new MonthMap()) + ; + + private final Map map; + + MapEnum(Map map) { + this.map = map; + } + + public static MapEnum ofMap(String taskName) { + for (final MapEnum value : MapEnum.values()) { + if (value.name().equals(taskName)) { + return value; + } + } + + return null; + } + + } + + private static class RootMap implements Map { + + @Override + public List map(MapArgs args) { + // 第一层按照商家ID分片 + // 假设总共有一百万商家 每个分片处理10万商家 + List> ranges = doSharding(1L, 1000000L, 100000); + return ranges; + } + } + + public static class MonthMap implements Map { + + @Override + public List map(MapArgs args) { + + // 第二层按照月分片 + // 4个季度 + List lists = JsonUtil.parseList(args.getMapResult(), Long.class); + List list = new ArrayList<>(); + for (final Long id : lists) { + for (int i = 1; i <= 4; i++) { + list.add(new SubTask(id, i)); + } + } + + return list; + } + + @Data + @AllArgsConstructor + public static class SubTask { + // 商家id + private Long id; + + // 需要处理的月份 + private Integer quarter; + + } + } + + interface Map{ + List map(MapArgs args); + } + + public static List> doSharding(Long min, Long max, int interval) { + + if (max.equals(min)) { + return new ArrayList<>(); + } + + // 总数量 + long total = max - min + 1; + + // 计算分页总页数 + Long totalPages = total / interval; + if (total % interval != 0) { + totalPages++; + } + + List> partitions = new ArrayList<>(); + for (Long index = 0L; index < totalPages; index++) { + + // 计算起始点 因为是从min开始所以每次需要加上一个min + Long start = index * interval + min; + + // 结算结束点 若最后一个 start + interval - 1 > max 取max + // 减一是保证 [start, end] 都是闭区间 + Long end = Math.min(start + interval - 1, max); + partitions.add(Lists.newArrayList(start, end)); + } + + return partitions; + } } diff --git a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java index 4914da1..4ddc70e 100644 --- a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java @@ -8,6 +8,8 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.example.snailjob.po.FailOrderPo; import org.springframework.stereotype.Component; +import java.util.Random; + /** * @author www.byteblogs.com * @date 2023-09-28 22:54:07 @@ -18,11 +20,12 @@ import org.springframework.stereotype.Component; public class TestWorkflowAnnoJobExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException { -// for (int i = 0; i < 30; i++) { -// SnailJobLog.REMOTE.info("任务执行开始. [{}]", i + "" + JsonUtil.toJsonString(jobArgs)); -// } FailOrderPo failOrderPo = new FailOrderPo(); failOrderPo.setOrderId("xiaowoniu"); + // 测试上下文传递 + int i = new Random().nextInt(1000); + jobArgs.appendContext("name" + i, "小蜗牛" + i); + jobArgs.appendContext("age", 18); return ExecuteResult.success(failOrderPo); } From f8bd156675f06f3c2d643629683eb93034c18169 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 21 Jun 2024 16:33:42 +0800 Subject: [PATCH 04/13] =?UTF-8?q?=E4=BC=98=E5=8C=96MapReduce=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/TestMapReduceJobExecutor.java | 36 +++++++++++++------ .../job/TestWorkflowAnnoJobExecutor.java | 2 +- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java index 66db437..44e0086 100644 --- a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -1,15 +1,18 @@ package com.example.snailjob.job; import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.example.snailjob.job.TestMapReduceJobExecutor.MonthMap.SubTask; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -31,14 +34,20 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Override public ExecuteResult doJobMapExecute(MapArgs mapArgs) { MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); - if (Objects.nonNull(mapEnum)) { + if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) { Map map = mapEnum.getMap(); - return doMap(map.map(mapArgs), mapEnum.name()); + MapEnum nextMap = mapEnum.getNextMap(); + String nextName = null; + if (Objects.nonNull(nextMap)) { + nextName = nextMap.name(); + } + + return doMap(map.map(mapArgs), nextName); } // 未找到map的任务,则说明当前需要进行处理 String mapResult = mapArgs.getMapResult(); - + SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", mapResult); // 获取最后一次map的信息. SubTask subTask = JsonUtil.parseObject(mapResult, SubTask.class); // 此处可以统计数据或者做其他的事情 @@ -54,20 +63,24 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { } @Override - protected ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs) { - return ExecuteResult.success(reduceArgs.getMapResult()); + protected ExecuteResult doMergeReduceExecute(MergeReduceArgs reduceArgs) { + List reduces = reduceArgs.getReduces(); + SnailJobLog.LOCAL.info("merge reduce {}", reduces); + return ExecuteResult.success(111); } @Getter private enum MapEnum { - MAP_ROOT(new RootMap()), - MONTH_MAP(new MonthMap()) + LAST_MAP(null, null), + MONTH_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), MONTH_MAP), ; private final Map map; - - MapEnum(Map map) { + private final MapEnum nextMap; + MapEnum(Map map, MapEnum nextMap) { this.map = map; + this.nextMap = nextMap; } public static MapEnum ofMap(String taskName) { @@ -93,7 +106,7 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { } } - public static class MonthMap implements Map { + public static class QuarterMap implements Map { @Override public List map(MapArgs args) { @@ -113,6 +126,7 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Data @AllArgsConstructor + @NoArgsConstructor public static class SubTask { // 商家id private Long id; diff --git a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java index 4ddc70e..95a82af 100644 --- a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java @@ -25,7 +25,7 @@ public class TestWorkflowAnnoJobExecutor { // 测试上下文传递 int i = new Random().nextInt(1000); jobArgs.appendContext("name" + i, "小蜗牛" + i); - jobArgs.appendContext("age", 18); + jobArgs.appendContext("age", i); return ExecuteResult.success(failOrderPo); } From 56bcff8908a554433c64b37c77cf9efd3d958e74 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Tue, 25 Jun 2024 00:01:33 +0800 Subject: [PATCH 05/13] =?UTF-8?q?=E8=B0=83=E8=AF=95map?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 6 +- .../snailjob/job/TestMapJobExecutor.java | 146 ++++++++++++++++++ 2 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/example/snailjob/job/TestMapJobExecutor.java diff --git a/pom.xml b/pom.xml index d4b5773..716b2d1 100644 --- a/pom.xml +++ b/pom.xml @@ -44,17 +44,17 @@ com.aizuda snail-job-client-starter - 1.0.0 + 1.1.0-beta2 com.aizuda snail-job-client-retry-core - 1.0.0 + 1.1.0-beta2 com.aizuda snail-job-client-job-core - 1.0.0 + 1.1.0-beta2 com.googlecode.aviator diff --git a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java new file mode 100644 index 0000000..70528f3 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java @@ -0,0 +1,146 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +/** + * 以下是一个统计某电商公司商家的一年的营业额的计算过程 + * + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Component +public class TestMapJobExecutor extends AbstractMapExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs) { + MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); + if (Objects.nonNull(mapEnum)) { + Map map = mapEnum.getMap(); + return doMap(map.map(mapArgs), mapEnum.name()); + } + + // 未找到map的任务,则说明当前需要进行处理 + String mapResult = mapArgs.getMapResult(); + + // 获取最后一次map的信息. + MonthMap.SubTask subTask = JsonUtil.parseObject(mapResult, MonthMap.SubTask.class); + // 此处可以统计数据或者做其他的事情 + // 模拟统计营业额 + int turnover = new Random().nextInt(1000000); + return ExecuteResult.success(turnover); + + } + + @Getter + private enum MapEnum { + MAP_ROOT(new RootMap()), + MONTH_MAP(new MonthMap()) + ; + + private final Map map; + + MapEnum(Map map) { + this.map = map; + } + + public static MapEnum ofMap(String taskName) { + for (final MapEnum value : MapEnum.values()) { + if (value.name().equals(taskName)) { + return value; + } + } + + return null; + } + + } + + private static class RootMap implements Map { + + @Override + public List map(MapArgs args) { + // 第一层按照商家ID分片 + // 假设总共有一百万商家 每个分片处理10万商家 + List> ranges = doSharding(1L, 1000000L, 100000); + return ranges; + } + } + + public static class MonthMap implements Map { + + @Override + public List map(MapArgs args) { + + // 第二层按照月分片 + // 4个季度 + List lists = JsonUtil.parseList(args.getMapResult(), Long.class); + List list = new ArrayList<>(); + for (final Long id : lists) { + for (int i = 1; i <= 4; i++) { + list.add(new SubTask(id, i)); + } + } + + return list; + } + + @Data + @AllArgsConstructor + public static class SubTask { + // 商家id + private Long id; + + // 需要处理的月份 + private Integer quarter; + + } + } + + interface Map{ + List map(MapArgs args); + } + + public static List> doSharding(Long min, Long max, int interval) { + + if (max.equals(min)) { + return new ArrayList<>(); + } + + // 总数量 + long total = max - min + 1; + + // 计算分页总页数 + Long totalPages = total / interval; + if (total % interval != 0) { + totalPages++; + } + + List> partitions = new ArrayList<>(); + for (Long index = 0L; index < totalPages; index++) { + + // 计算起始点 因为是从min开始所以每次需要加上一个min + Long start = index * interval + min; + + // 结算结束点 若最后一个 start + interval - 1 > max 取max + // 减一是保证 [start, end] 都是闭区间 + Long end = Math.min(start + interval - 1, max); + partitions.add(Lists.newArrayList(start, end)); + } + + return partitions; + } +} From 866470391755a4844b9cad5df64003eab24f3f89 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Wed, 26 Jun 2024 09:14:25 +0800 Subject: [PATCH 06/13] =?UTF-8?q?=E4=BC=98=E5=8C=96Map=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/job/TestMapJobExecutor.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java index 70528f3..f029639 100644 --- a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java @@ -1,13 +1,17 @@ package com.example.snailjob.job; import com.aizuda.snailjob.client.job.core.dto.MapArgs; + import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -28,16 +32,22 @@ public class TestMapJobExecutor extends AbstractMapExecutor { @Override public ExecuteResult doJobMapExecute(MapArgs mapArgs) { MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); - if (Objects.nonNull(mapEnum)) { - Map map = mapEnum.getMap(); - return doMap(map.map(mapArgs), mapEnum.name()); + if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) { + Map map = mapEnum.getMap(); + MapEnum nextMap = mapEnum.getNextMap(); + String nextName = null; + if (Objects.nonNull(nextMap)) { + nextName = nextMap.name(); + } + + return doMap(map.map(mapArgs), nextName); } // 未找到map的任务,则说明当前需要进行处理 String mapResult = mapArgs.getMapResult(); - + SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", mapResult); // 获取最后一次map的信息. - MonthMap.SubTask subTask = JsonUtil.parseObject(mapResult, MonthMap.SubTask.class); + SubTask subTask = JsonUtil.parseObject(mapResult, SubTask.class); // 此处可以统计数据或者做其他的事情 // 模拟统计营业额 int turnover = new Random().nextInt(1000000); @@ -47,14 +57,16 @@ public class TestMapJobExecutor extends AbstractMapExecutor { @Getter private enum MapEnum { - MAP_ROOT(new RootMap()), - MONTH_MAP(new MonthMap()) + LAST_MAP(null, null), + MONTH_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), MONTH_MAP), ; private final Map map; - - MapEnum(Map map) { + private final MapEnum nextMap; + MapEnum(Map map, MapEnum nextMap) { this.map = map; + this.nextMap = nextMap; } public static MapEnum ofMap(String taskName) { @@ -69,7 +81,7 @@ public class TestMapJobExecutor extends AbstractMapExecutor { } - private static class RootMap implements Map { + private static class RootMap implements Map { @Override public List map(MapArgs args) { @@ -80,7 +92,7 @@ public class TestMapJobExecutor extends AbstractMapExecutor { } } - public static class MonthMap implements Map { + public static class QuarterMap implements Map { @Override public List map(MapArgs args) { @@ -88,10 +100,10 @@ public class TestMapJobExecutor extends AbstractMapExecutor { // 第二层按照月分片 // 4个季度 List lists = JsonUtil.parseList(args.getMapResult(), Long.class); - List list = new ArrayList<>(); + List list = new ArrayList<>(); for (final Long id : lists) { for (int i = 1; i <= 4; i++) { - list.add(new SubTask(id, i)); + list.add(new TestMapReduceJobExecutor.QuarterMap.SubTask(id, i)); } } @@ -100,6 +112,7 @@ public class TestMapJobExecutor extends AbstractMapExecutor { @Data @AllArgsConstructor + @NoArgsConstructor public static class SubTask { // 商家id private Long id; From 2e6503f403466477e0271600fc39f56b357ef217 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Wed, 26 Jun 2024 18:11:49 +0800 Subject: [PATCH 07/13] =?UTF-8?q?=E4=BC=98=E5=8C=96Map=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/job/TestAnnoMapJobExecutor.java | 41 ++++++++++++++ .../job/TestAnnoMapReduceJobExecutor.java | 55 +++++++++++++++++++ .../snailjob/job/TestMapJobExecutor.java | 5 +- .../job/TestMapReduceJobExecutor.java | 5 +- .../job/TestPartitionJobExecutor.java | 2 +- 5 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java create mode 100644 src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java new file mode 100644 index 0000000..9c5de47 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java @@ -0,0 +1,41 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; +import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-06-26 + */ +@Component +@JobExecutor(name = "testAnnoMapJobExecutor") +public class TestAnnoMapJobExecutor { + +// @MapExecutor +// public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { +// System.out.println(mapArgs); +// return mapHandler.doMap(Lists.newArrayList("aaa"), "MONTH_MAP"); +// } +// +// @MapExecutor(taskName = "MONTH_MAP") +// public ExecuteResult monthMapExecute(MapArgs mapArgs) { +// System.out.println(mapArgs); +// return ExecuteResult.success(123); +// } +// +// @MapExecutor(taskName = "LAST_MAP") +// public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) { +// System.out.println(mapArgs); +// return ExecuteResult.success(); +// } + +} diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java new file mode 100644 index 0000000..27a28b1 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java @@ -0,0 +1,55 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; +import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-06-26 + */ +@Component +@JobExecutor(name = "testAnnoMapReduceJobExecutor") +public class TestAnnoMapReduceJobExecutor { + + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + System.out.println(mapArgs); + return mapHandler.doMap(Lists.newArrayList("aaa"), "MONTH_MAP"); + } + + @MapExecutor(taskName = "MONTH_MAP") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + System.out.println(mapArgs); + return ExecuteResult.success(123); + } + + @MapExecutor(taskName = "LAST_MAP") + public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + System.out.println(mapArgs); + return ExecuteResult.success(); + } + + @ReduceExecutor + public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) { + System.out.println(mapReduceArgs); + return ExecuteResult.success(); + } + + /** + * 当只有一个reduce任务时无此执行器 + */ + @MergeReduceExecutor + public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) { + System.out.println(mergeReduceArgs); + return ExecuteResult.success(); + } +} diff --git a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java index f029639..5bfa072 100644 --- a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java @@ -1,5 +1,6 @@ package com.example.snailjob.job; +import com.aizuda.snailjob.client.job.core.MapHandler; import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; @@ -30,7 +31,7 @@ import java.util.Random; public class TestMapJobExecutor extends AbstractMapExecutor { @Override - public ExecuteResult doJobMapExecute(MapArgs mapArgs) { + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) { Map map = mapEnum.getMap(); @@ -40,7 +41,7 @@ public class TestMapJobExecutor extends AbstractMapExecutor { nextName = nextMap.name(); } - return doMap(map.map(mapArgs), nextName); + return mapHandler.doMap(map.map(mapArgs), nextName); } // 未找到map的任务,则说明当前需要进行处理 diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java index 44e0086..7092834 100644 --- a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -1,5 +1,6 @@ package com.example.snailjob.job; +import com.aizuda.snailjob.client.job.core.MapHandler; import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; @@ -32,7 +33,7 @@ import java.util.Random; public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Override - public ExecuteResult doJobMapExecute(MapArgs mapArgs) { + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) { Map map = mapEnum.getMap(); @@ -42,7 +43,7 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { nextName = nextMap.name(); } - return doMap(map.map(mapArgs), nextName); + return mapHandler.doMap(map.map(mapArgs), nextName); } // 未找到map的任务,则说明当前需要进行处理 diff --git a/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java b/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java index b82bb91..c1cdf84 100644 --- a/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java @@ -17,7 +17,7 @@ import org.springframework.stereotype.Component; public class TestPartitionJobExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) { - if (jobArgs.getArgsStr().equals("1")) { + if (jobArgs.getJobParams().equals("1")) { throw new NullPointerException("分区空指针抛异常了"); } FailOrderPo failOrderPo = new FailOrderPo(); From 550e263d9935b5af7fe4ee852d33376c90349b38 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Wed, 26 Jun 2024 23:41:54 +0800 Subject: [PATCH 08/13] =?UTF-8?q?=E8=B0=83=E8=AF=95map?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/job/TestAnnoMapJobExecutor.java | 38 +++++++++---------- .../job/TestAnnoMapReduceJobExecutor.java | 12 ++++-- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java index 9c5de47..d013471 100644 --- a/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java @@ -3,11 +3,7 @@ package com.example.snailjob.job; import com.aizuda.snailjob.client.job.core.MapHandler; import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; -import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; -import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; import com.aizuda.snailjob.client.job.core.dto.MapArgs; -import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; -import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.model.ExecuteResult; import com.google.common.collect.Lists; import org.springframework.stereotype.Component; @@ -20,22 +16,22 @@ import org.springframework.stereotype.Component; @JobExecutor(name = "testAnnoMapJobExecutor") public class TestAnnoMapJobExecutor { -// @MapExecutor -// public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { -// System.out.println(mapArgs); -// return mapHandler.doMap(Lists.newArrayList("aaa"), "MONTH_MAP"); -// } -// -// @MapExecutor(taskName = "MONTH_MAP") -// public ExecuteResult monthMapExecute(MapArgs mapArgs) { -// System.out.println(mapArgs); -// return ExecuteResult.success(123); -// } -// -// @MapExecutor(taskName = "LAST_MAP") -// public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) { -// System.out.println(mapArgs); -// return ExecuteResult.success(); -// } + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + System.out.println(mapArgs); + return mapHandler.doMap(Lists.newArrayList("1", "2", "3"), "MONTH_MAP"); + } + + @MapExecutor(taskName = "MONTH_MAP1") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + System.out.println(mapArgs); + return ExecuteResult.success(123); + } + + @MapExecutor(taskName = "LAST_MAP") + public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + System.out.println(mapArgs); + return ExecuteResult.success(); + } } diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java index 27a28b1..34158b7 100644 --- a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java @@ -9,9 +9,13 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.google.common.collect.Lists; import org.springframework.stereotype.Component; +import java.util.List; +import java.util.stream.Collectors; + /** * @author: opensnail * @date : 2024-06-26 @@ -23,13 +27,14 @@ public class TestAnnoMapReduceJobExecutor { @MapExecutor public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { System.out.println(mapArgs); - return mapHandler.doMap(Lists.newArrayList("aaa"), "MONTH_MAP"); + return mapHandler.doMap(Lists.newArrayList(1, 2, 3, 4, 5, 6), "MONTH_MAP"); } @MapExecutor(taskName = "MONTH_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { System.out.println(mapArgs); - return ExecuteResult.success(123); + String mapResult = mapArgs.getMapResult(); + return ExecuteResult.success(mapResult); } @MapExecutor(taskName = "LAST_MAP") @@ -41,7 +46,8 @@ public class TestAnnoMapReduceJobExecutor { @ReduceExecutor public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) { System.out.println(mapReduceArgs); - return ExecuteResult.success(); + List mapResult = (List) mapReduceArgs.getMapResult(); + return ExecuteResult.success(mapResult.stream().map(Integer::parseInt).mapToInt(Integer::intValue).sum()); } /** From b170277f0f0677b3bfabca6c48fe06ce5ffec3cb Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 27 Jun 2024 11:02:53 +0800 Subject: [PATCH 09/13] =?UTF-8?q?=E4=BC=98=E5=8C=96Map=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/TestAnnoMapReduceJobExecutor.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java index 34158b7..06c42cf 100644 --- a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java @@ -27,27 +27,24 @@ public class TestAnnoMapReduceJobExecutor { @MapExecutor public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { System.out.println(mapArgs); - return mapHandler.doMap(Lists.newArrayList(1, 2, 3, 4, 5, 6), "MONTH_MAP"); + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4", "5", "6"), "MONTH_MAP"); } @MapExecutor(taskName = "MONTH_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { System.out.println(mapArgs); - String mapResult = mapArgs.getMapResult(); - return ExecuteResult.success(mapResult); - } - - @MapExecutor(taskName = "LAST_MAP") - public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) { - System.out.println(mapArgs); - return ExecuteResult.success(); + return ExecuteResult.success(Integer.parseInt((String) mapArgs.getMapResult()) * 2); } @ReduceExecutor public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) { System.out.println(mapReduceArgs); - List mapResult = (List) mapReduceArgs.getMapResult(); - return ExecuteResult.success(mapResult.stream().map(Integer::parseInt).mapToInt(Integer::intValue).sum()); + return ExecuteResult.success( + mapReduceArgs.getMapResult() + .stream() + .map(String::valueOf) + .map(Integer::parseInt) + .mapToInt(Integer::intValue).sum()); } /** @@ -56,6 +53,11 @@ public class TestAnnoMapReduceJobExecutor { @MergeReduceExecutor public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) { System.out.println(mergeReduceArgs); - return ExecuteResult.success(); + return ExecuteResult.success( + mergeReduceArgs.getReduces() + .stream() + .map(String::valueOf) + .map(Integer::parseInt) + .mapToInt(Integer::intValue).sum()); } } From d29337bd12ce67e2e3ef21e9522dde969268fd73 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 27 Jun 2024 22:01:37 +0800 Subject: [PATCH 10/13] =?UTF-8?q?=E8=B0=83=E8=AF=95map?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../example/snailjob/job/TestMapJobExecutor.java | 13 +++++++------ .../snailjob/job/TestMapReduceJobExecutor.java | 14 ++++++++------ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java index 5bfa072..40fa226 100644 --- a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java @@ -8,6 +8,7 @@ import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; @@ -45,10 +46,10 @@ public class TestMapJobExecutor extends AbstractMapExecutor { } // 未找到map的任务,则说明当前需要进行处理 - String mapResult = mapArgs.getMapResult(); - SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", mapResult); + JsonNode json = JsonUtil.toJson(mapArgs.getMapResult()); + SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", json); // 获取最后一次map的信息. - SubTask subTask = JsonUtil.parseObject(mapResult, SubTask.class); + SubTask subTask = JsonUtil.parseObject(json.toString(), SubTask.class); // 此处可以统计数据或者做其他的事情 // 模拟统计营业额 int turnover = new Random().nextInt(1000000); @@ -100,14 +101,14 @@ public class TestMapJobExecutor extends AbstractMapExecutor { // 第二层按照月分片 // 4个季度 - List lists = JsonUtil.parseList(args.getMapResult(), Long.class); + JsonNode json = JsonUtil.toJson(args.getMapResult()); List list = new ArrayList<>(); - for (final Long id : lists) { + for (JsonNode jsonNode : json) { + long id = jsonNode.asLong(); for (int i = 1; i <= 4; i++) { list.add(new TestMapReduceJobExecutor.QuarterMap.SubTask(id, i)); } } - return list; } diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java index 7092834..7fc4d9e 100644 --- a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -1,5 +1,6 @@ package com.example.snailjob.job; +import cn.hutool.core.util.ByteUtil; import com.aizuda.snailjob.client.job.core.MapHandler; import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; @@ -9,6 +10,7 @@ import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; @@ -47,10 +49,10 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { } // 未找到map的任务,则说明当前需要进行处理 - String mapResult = mapArgs.getMapResult(); - SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", mapResult); + JsonNode json = JsonUtil.toJson(mapArgs.getMapResult()); + SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", json); // 获取最后一次map的信息. - SubTask subTask = JsonUtil.parseObject(mapResult, SubTask.class); + SubTask subTask = JsonUtil.parseObject(json.toString(), SubTask.class); // 此处可以统计数据或者做其他的事情 // 模拟统计营业额 int turnover = new Random().nextInt(1000000); @@ -114,14 +116,14 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { // 第二层按照月分片 // 4个季度 - List lists = JsonUtil.parseList(args.getMapResult(), Long.class); + JsonNode json = JsonUtil.toJson(args.getMapResult()); List list = new ArrayList<>(); - for (final Long id : lists) { + for (JsonNode jsonNode : json) { + long id = jsonNode.asLong(); for (int i = 1; i <= 4; i++) { list.add(new SubTask(id, i)); } } - return list; } From ccfdc14f74b6eb343e6df71e561d2a541634658a Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 28 Jun 2024 15:02:15 +0800 Subject: [PATCH 11/13] =?UTF-8?q?=E4=BC=98=E5=8C=96Map=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/example/snailjob/job/TestAnnoMapJobExecutor.java | 5 ++++- .../example/snailjob/job/TestAnnoMapReduceJobExecutor.java | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java index d013471..b275ad9 100644 --- a/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java @@ -19,18 +19,21 @@ public class TestAnnoMapJobExecutor { @MapExecutor public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); return mapHandler.doMap(Lists.newArrayList("1", "2", "3"), "MONTH_MAP"); } - @MapExecutor(taskName = "MONTH_MAP1") + @MapExecutor(taskName = "MONTH_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); return ExecuteResult.success(123); } @MapExecutor(taskName = "LAST_MAP") public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) { System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); return ExecuteResult.success(); } diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java index 06c42cf..75aeb12 100644 --- a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java @@ -27,18 +27,23 @@ public class TestAnnoMapReduceJobExecutor { @MapExecutor public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); + mapArgs.appendContext("name", "zsg"); return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4", "5", "6"), "MONTH_MAP"); } @MapExecutor(taskName = "MONTH_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); + mapArgs.appendContext("age", "111"); return ExecuteResult.success(Integer.parseInt((String) mapArgs.getMapResult()) * 2); } @ReduceExecutor public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) { System.out.println(mapReduceArgs); + System.out.println(mapReduceArgs.getWfContext()); return ExecuteResult.success( mapReduceArgs.getMapResult() .stream() @@ -53,6 +58,7 @@ public class TestAnnoMapReduceJobExecutor { @MergeReduceExecutor public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) { System.out.println(mergeReduceArgs); + System.out.println(mergeReduceArgs.getWfContext()); return ExecuteResult.success( mergeReduceArgs.getReduces() .stream() From 53306f0eac1b5a156872b17febed8d7ca12ca717 Mon Sep 17 00:00:00 2001 From: jcwang812 Date: Mon, 1 Jul 2024 01:07:51 +0000 Subject: [PATCH 12/13] =?UTF-8?q?!3=20mapReduce=E6=89=8B=E6=9C=BA=E5=8F=B7?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E8=A7=A3=E6=9E=90Demo=E6=8F=90=E4=BA=A4=20*?= =?UTF-8?q?=20=E6=89=8B=E6=9C=BA=E5=8F=B7excel=E6=96=87=E4=BB=B6=E6=A0=A1?= =?UTF-8?q?=E9=AA=8C=E5=B9=B6=E5=85=A5=E5=BA=93--map=20=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=20*=20=E6=89=8B=E6=9C=BA=E5=8F=B7excel=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C--map=20reduce=20=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/demo.sql | 10 +- pom.xml | 10 ++ .../example/snailjob/bo/PhoneNumberBo.java | 18 +++ .../snailjob/bo/PhoneNumberCheckBo.java | 40 ++++++ .../snailjob/dao/PhoneNumberBaseMapper.java | 15 ++ .../example/snailjob/dao/PhoneNumberDao.java | 34 +++++ .../job/TestExcelAnalyseMapJobExecutor.java | 97 +++++++++++++ .../TestExcelAnalyseMapReduceJobExecutor.java | 132 ++++++++++++++++++ .../listener/PhoneNumberExcelListener.java | 78 +++++++++++ .../example/snailjob/po/PhoneNumberPo.java | 39 ++++++ src/main/resources/doc/number.xlsx | Bin 0 -> 11457 bytes 11 files changed, 472 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/example/snailjob/bo/PhoneNumberBo.java create mode 100644 src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java create mode 100644 src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java create mode 100644 src/main/java/com/example/snailjob/dao/PhoneNumberDao.java create mode 100644 src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java create mode 100644 src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java create mode 100644 src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java create mode 100644 src/main/java/com/example/snailjob/po/PhoneNumberPo.java create mode 100644 src/main/resources/doc/number.xlsx diff --git a/docs/demo.sql b/docs/demo.sql index 1498ad0..2a38e66 100644 --- a/docs/demo.sql +++ b/docs/demo.sql @@ -12,4 +12,12 @@ CREATE TABLE fail_order `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`) -); \ No newline at end of file +); + +-- 手机号表 +CREATE TABLE `phone_number` ( + `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键', + `phone_number` varchar(20) NOT NULL DEFAULT '' COMMENT '手机号', + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='手机号表' diff --git a/pom.xml b/pom.xml index 716b2d1..51efe03 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,16 @@ okhttp 4.2.0 + + com.alibaba + easyexcel + 3.1.3 + + + com.alibaba + fastjson + 1.2.83 + diff --git a/src/main/java/com/example/snailjob/bo/PhoneNumberBo.java b/src/main/java/com/example/snailjob/bo/PhoneNumberBo.java new file mode 100644 index 0000000..2b79e6d --- /dev/null +++ b/src/main/java/com/example/snailjob/bo/PhoneNumberBo.java @@ -0,0 +1,18 @@ +package com.example.snailjob.bo; + +import com.alibaba.excel.annotation.ExcelProperty; +import lombok.Data; + +/** + * excel表格手机号BO + * + * @author JiChenWang + * @since 2024/6/27 20:28 + */ +@Data +public class PhoneNumberBo { + + @ExcelProperty(value = "手机号码", index = 0) + private String phoneNumber; + +} diff --git a/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java new file mode 100644 index 0000000..caa8adb --- /dev/null +++ b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java @@ -0,0 +1,40 @@ +package com.example.snailjob.bo; + +import com.example.snailjob.po.PhoneNumberPo; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * 手机号检测BO + * + * @author JiChenWang + * @since 2024/6/27 20:50 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PhoneNumberCheckBo { + + @Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE) + private Long checkTotalNum = 0L; + + @Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE) + private Long checkErrorNum = 0L; + + @Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE) + private Long checkSuccessNum = 0L; + + @Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE) + private List checkErrorPhoneNumberList = new ArrayList<>(); + + @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE) + private List checkSuccessPhoneNumberList = new ArrayList<>(); + +} diff --git a/src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java b/src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java new file mode 100644 index 0000000..fe36b1d --- /dev/null +++ b/src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java @@ -0,0 +1,15 @@ +package com.example.snailjob.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.example.snailjob.po.PhoneNumberPo; +import org.springframework.stereotype.Repository; + +/** + * 手机号mapper + * + * @author JiChenWang + * @since 2024/6/30 11:55 + */ +@Repository +public interface PhoneNumberBaseMapper extends BaseMapper { +} diff --git a/src/main/java/com/example/snailjob/dao/PhoneNumberDao.java b/src/main/java/com/example/snailjob/dao/PhoneNumberDao.java new file mode 100644 index 0000000..18d71dd --- /dev/null +++ b/src/main/java/com/example/snailjob/dao/PhoneNumberDao.java @@ -0,0 +1,34 @@ +package com.example.snailjob.dao; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.example.snailjob.po.PhoneNumberPo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * TODO + * + * @author JiChenWang + * @since 2024/6/30 11:58 + */ +@Service +public class PhoneNumberDao extends ServiceImpl { + + @Autowired + private PhoneNumberBaseMapper phoneNumberBaseMapper; + + /** + * 批量保存手机号信息 + * + * @param phoneNumberPoList 手机号po列表 + * @return Boolean 保存成功标识:true-成功、false-失败 + * @author JichenWang + * @since 2024/6/30 12:03 + */ + public Boolean insertBatch (List phoneNumberPoList) { + return this.saveBatch(phoneNumberPoList); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java new file mode 100644 index 0000000..3b577dd --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java @@ -0,0 +1,97 @@ +package com.example.snailjob.job; + +import cn.hutool.core.util.ObjectUtil; +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.alibaba.excel.EasyExcel; +import com.example.snailjob.bo.PhoneNumberBo; +import com.example.snailjob.bo.PhoneNumberCheckBo; +import com.example.snailjob.dao.PhoneNumberDao; +import com.example.snailjob.listener.PhoneNumberExcelListener; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.InputStream; +import java.util.List; + +/** + * 解析手机号excel文件,并将正确的手机号分片入库 + * + * @author JiChenWang + * @since 2024/6/30 10:37 + */ +@Slf4j +@Component +@JobExecutor(name = "testExcelAnalyseMapJobExecutor") +public class TestExcelAnalyseMapJobExecutor { + + private final Integer BATCH_SIZE = 100; + + @Autowired + private PhoneNumberDao phoneNumberDao; + + /** + * 读取手机号文件总行数,并进行分组 + * 比如文档中的手机号总量为307条,每100条一个分组,分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}] + * + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/30 10:48 + */ + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + List> ranges = null; + // 先获取文件总行数,便于分组 + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); + + // 设置区间范围 + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); + } catch (Exception e) { + log.error("文件读取异常", e.getMessage()); + } + return mapHandler.doMap(ranges, "MONTH_MAP"); + + } + + /** + * + * + * @param mapArgs + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/30 11:05 + */ + @MapExecutor(taskName = "MONTH_MAP") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + // 获取本次要处理的区间 + final List mapResult = (List) mapArgs.getMapResult(); + log.info("本次要处理的区间为:{}", mapResult); + + // 按照处理区间,去读取数据 + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync(); + } catch (Exception e) { + log.error("文件读取异常:", e.getMessage()); + } + + // 如果正确手机号不为空,则入库 + if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckSuccessPhoneNumberList())) { + phoneNumberDao.insertBatch(phoneNumberCheckBo.getCheckSuccessPhoneNumberList()); + } + + return ExecuteResult.success(phoneNumberCheckBo.getCheckSuccessNum()); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java new file mode 100644 index 0000000..aabf428 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java @@ -0,0 +1,132 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; +import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.alibaba.excel.EasyExcel; +import com.alibaba.fastjson.JSONArray; +import com.example.snailjob.bo.PhoneNumberBo; +import com.example.snailjob.bo.PhoneNumberCheckBo; +import com.example.snailjob.listener.PhoneNumberExcelListener; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * 解析校验Excel中的手机号,统计出错手机号数量,并返回错误手机号详情 + * + * @author JichenWang + * @since 2024/6/27 19:52 + */ +@Slf4j +@Component +@JobExecutor(name = "TestExcelAnalyseMapReduceJobExecutor") +public class TestExcelAnalyseMapReduceJobExecutor { + + private final Integer BATCH_SIZE = 100; + + /** + * 处理手机号文件信息,将文档中的手机号进行分组 + * 比如文档中的手机号总量为307条,每100条一个分组,分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}] + * + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/29 14:03 + */ + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + List> ranges = null; + // 先获取文件总行数,便于分组 + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); + + // 设置区间范围 + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); + } catch (Exception e) { + log.error("文件读取异常", e.getMessage()); + } + return mapHandler.doMap(ranges, "MONTH_MAP"); + } + + /** + * 处理每个分组内容,如读取{0,99}区间的手机号,并解析 + * + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/29 14:04 + */ + @MapExecutor(taskName = "MONTH_MAP") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + // 获取本次要处理的区间 + final List mapResult = (List) mapArgs.getMapResult(); + log.info("本次要处理的区间为:{}", mapResult); + + // 按照处理区间,去读取数据 + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync(); + } catch (Exception e) { + log.error("文件读取异常:", e.getMessage()); + } + + return ExecuteResult.success(phoneNumberCheckBo); + } + + + @ReduceExecutor + public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) { + log.info("WJC Test reduceExecute, 参数为:{}", mapReduceArgs.getMapResult()); + final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mapReduceArgs.getMapResult().toString()); + return ExecuteResult.success(phoneNumberCheckBo); + } + + /** + * 当只有一个reduce任务时无此执行器 + */ + @MergeReduceExecutor + public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) { + final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mergeReduceArgs.getReduces().toString()); + log.info("WJC 最终检测结果为:{}", phoneNumberCheckBo); + return ExecuteResult.success(phoneNumberCheckBo); + } + + /** + * 构造汇总手机号校验结果BO + * + * @param phoneNumberCheckBoStr 手机号校验BO字符串 + * @return PhoneNumberCheckBo 汇总手机号校验结果BO + * @author JichenWang + * @since 2024/6/29 14:24 + */ + private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) { + final List phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class); + // 获取校验总数 + final long checkTotalNum = phoneNumberCheckBoList.get(0).getCheckTotalNum(); + // 汇总校验失败数量 + final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckErrorNum).sum(); + // 汇总校验成功数量 + final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckSuccessNum).sum(); + // 汇总错误手机号 + final List errorPhoneNumberList = new ArrayList<>(); + phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrorPhoneNumberList())); + + // 汇总手机号校验结果 + return PhoneNumberCheckBo.builder().checkTotalNum(checkTotalNum).checkErrorNum(checkErrorNum).checkSuccessNum(checkSuccessNum).checkErrorPhoneNumberList(errorPhoneNumberList).build(); + } + +} diff --git a/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java new file mode 100644 index 0000000..62bf08f --- /dev/null +++ b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java @@ -0,0 +1,78 @@ +package com.example.snailjob.listener; + +import cn.hutool.core.lang.Validator; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.excel.context.AnalysisContext; +import com.alibaba.excel.event.AnalysisEventListener; +import com.example.snailjob.bo.PhoneNumberBo; +import com.example.snailjob.bo.PhoneNumberCheckBo; +import com.example.snailjob.po.PhoneNumberPo; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; +import java.util.Map; + +/** + * 手机号excel解析 Listener + * + * @author JiChenWang + * @since 2024/6/27 20:38 + */ +@Slf4j +public class PhoneNumberExcelListener extends AnalysisEventListener { + + /** 手机号校验BO **/ + private PhoneNumberCheckBo phoneNumberCheckBo; + + /** 是否第一次读取Excel **/ + private Boolean firstReadStatus = false; + + /** 读取批次大小 **/ + private Integer batchSize = 100; + + /** 已读取的数据数量 **/ + private Integer cacheSize = 0; + + @Override + public void invokeHeadMap(Map headMap, AnalysisContext context) { + this.phoneNumberCheckBo.setCheckTotalNum(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1))); + } + + @Override + public void invoke(PhoneNumberBo phoneNumberBo, AnalysisContext context) { + // 如果是第一次读该文件,已读取的数量已超过读取批次大小,直接返回 + if (firstReadStatus || cacheSize >= batchSize) { + return; + } + + cacheSize++; + + if (ObjectUtil.isEmpty(phoneNumberBo.getPhoneNumber())) { + return; + } + + // 校验手机号 + log.info("本次校验的手机号为: {}", phoneNumberBo.getPhoneNumber()); + Boolean validateStatus = Validator.isMobile(phoneNumberBo.getPhoneNumber()); + if (validateStatus) { + this.phoneNumberCheckBo.setCheckSuccessNum(this.phoneNumberCheckBo.getCheckSuccessNum() + 1); + final PhoneNumberPo phoneNumberPo = PhoneNumberPo.builder().phoneNumber(phoneNumberBo.getPhoneNumber()).createTime(LocalDateTime.now()).build(); + this.phoneNumberCheckBo.getCheckSuccessPhoneNumberList().add(phoneNumberPo); + } else { + this.phoneNumberCheckBo.setCheckErrorNum(this.phoneNumberCheckBo.getCheckErrorNum() + 1); + this.phoneNumberCheckBo.getCheckErrorPhoneNumberList().add(phoneNumberBo.getPhoneNumber()); + } + } + + @Override + public void doAfterAllAnalysed(AnalysisContext context) { + + } + + public PhoneNumberExcelListener(PhoneNumberCheckBo phoneNumberCheckBo, Boolean firstReadStatus, Integer batchSize) { + this.phoneNumberCheckBo = phoneNumberCheckBo; + this.firstReadStatus = firstReadStatus; + this.batchSize = batchSize; + } + +} diff --git a/src/main/java/com/example/snailjob/po/PhoneNumberPo.java b/src/main/java/com/example/snailjob/po/PhoneNumberPo.java new file mode 100644 index 0000000..6372220 --- /dev/null +++ b/src/main/java/com/example/snailjob/po/PhoneNumberPo.java @@ -0,0 +1,39 @@ +package com.example.snailjob.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * phone_number + * + * @author JiChenWang + * @since 2024/6/30 11:48 + */ +@TableName("phone_number") +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class PhoneNumberPo { + + /** + * 主键 + */ + private Long id; + + /** + * 手机号 + */ + private String phoneNumber; + + /** + * 创建时间 + */ + private LocalDateTime createTime; + +} diff --git a/src/main/resources/doc/number.xlsx b/src/main/resources/doc/number.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..4653ed0c4290157a2a9f5ee2cd07e6878be029be GIT binary patch literal 11457 zcmeHtbzD_T_x_=!yFox&q`ON+IFyt$f^>H`NFxf;4H67B;LW$ImV#mziZ| z%Eaz*Z~1rNJbW(lr8TC#;FVUInb~z(=sA`|CcZguDNg^42glD|(=uzTO8=1dS|zk_ zzs@hJx}?0UZ5^AmjfBL}_L>?y8F-{(?8JI{k;ciO1iPvhC9cN{;xdIKruYORzDuc7 z9ctL~?NGD=&3yjO>85Y;YIE3p-5-asmIiT0%*_}*boZB-rQyRLc+3zu>*gp@sVg%Y zC_(Md;=v6L4Aq>YFJHmVu?4-Y$C&M#N33n&y)VZ_QuZiIE0{x!@A$ARTMv4wr97Qa`v2d0hg?Gp46}=E$XeI zJkbNIN8fTf(jF`D^FH+V&g<-|=KG?s^@_~XI(HCh8UD%1HQo{STFw3!39PO!R5V_} zTi-g@1U)tfRWrZpm^#L%}WPz+h1=3Q_-pJB{mF4F8 zzjXXB&cVOjdSSGjLJJ#O(9W}q;LhXG*|%5{U>EUcwNxtJzS1+86_MFAq_d6mWLPRB z0dP`2_1+iV)3ZX6>z!0bi(G}_ICz58l`cg=@n?2UNDt}k1XDsL9~L#nDMlNgOxuA!3CM~LbEZWM$2#K_V@=x(6FA0t#u zCrwX|?(NlG7|Xf!Su|yzp{>A;BrUMS$E1>K2Q6&V8n(~NQiSkg8#bAjC3_o zy6p+Ca~H8Q`B-e$;;j9qcQU^2rhv?FaaQDyi3uSeH$jCwT-`dY)BUTWN%R6^H+J~pQ%E>h`DuqIu8MV1-2*EjosW)BaoRDCzvC~H$| zY>hd@5PeikZ`3pA$kfUWMH{@L+_)z*SPU}gBQ4f@l)nKnvWIR4e%MEBo_jkm`RwG$ zt^ym{)s@^!0e96NFynjmADq5VwT7gA;zry%ee2(AhaoSC^473_W@Pt^%+qv3eDabk zUN(Op(nw!Op&HJ!Ir_n7INtesKE2(bS(}`Vfzfp)E4dt!akKm(w>Iui?b}$D2E8I= zo!+rWbudg#nNd29c5nA19UoR^cx9VVE(;6sJss@CjHJK!YRHsMuwYWfk;Jh&e8Yv? zq%^Mr^jffLb0-aq7>RJ^ude^WNMdz8XnktUh|_tH7ojPedn<09AEFa1tB}>O^whB+ zyY6R0IP^uS8YL~5iThawPW_`qiCBg~LG0nBH>a2!xj9|v{>zqd6U9Dqc&$`1^(r>JG8$Z+0Ha~>1KRoG%@X+;MUED3oW?@?n-F*gc5#*{^mqGK@^}%Iob+wsm0%YZ9&eb)@LNlkb&1 zsPsMWkePZvfNEU!hP~;k)3qeBL20UwQWE3fnJ>Mh5Q&L~VYj)EC{Qw5ZokBp+M-#{bFVn;ombNHS;_uHg39yOh_>wm+v;c4vp~&R$tNsv;hq4DwEC+If?Pt5H|%f}Lvp5o!)c{+uL@ zC&>SnvmMCf$|t}hJ^%>>Lchyd2S+zcBZr&EJ4ChDd|C{vf$$8YXAb{z!rqFSvDFUE z4-Z8;bG!ml7QY>(UKE)!q{ZqLo#u{Ne_$qaMwB`l#f|h`7elM?Jm^@R{ zVBC5%Bk`;2)|QZN6(iYVgNrNUg$VYC=Hi}txmh}v zAw(bCo;W$GP890s4vm(Hp$%4H@_DADa!O5}A>$%$z7fFZpQ9Q3keA+fqd>@n(JNuQ zHMN>Xm=b1h|Lw)Vp+EI@tT)}zz^G$Sp(kIcJ*`}*QpG#Ec5#bmS$hOl$VhH*n{>|$ zs)ss#WfW=VHEFOR{C3xu8<$b(V{y1mC&i^a))rY{#aeL^w5?3djWJcW{5tL$o^ZN& z$sb)l;mW~GRQH!I5(J5&?($X-*2}IcHJ~Ixp{e6uU^_{7{6|;}X%s zH&(G9sSG^T^(J%orFamFzN#Yg;>?+5Yeb0Ze4Ug1P8j}$*rU9ZlWbrhF|Ztpi_veg ze`>IZFn78-*Xr3+SBNC!7$uq#N0wV)-BpSA8MBh;m`CpH&?x|N4ps1f=R$CnR!-#F zz)AM5l=D8igbDUplP6b-PGERRyvNB`=sL7vUe+|(laE_gvClA1HBWIK^ozK5cYgr}Nn7TlE^itmZi@%-L98Ha^j9724>^F(Bsi754%8TdCdLW9nzkR~9 z)PrO5lY82b7g>WoCrVT6h~X14vGiOdvR|}FtVHJ9xH>!;y)#7_RAw#+<|XTF^gvJ4 zk9WqK@5bzllcVEibqJy^QnRxi-iV~~*PQgFCb&x_pf*Rx=`|?s38zF%B+v}U`8e<~ zaCs^`dmn*qglv6^`sl)?a^8F6!BjN%n^L5`&eCVKc)Fu58sw?G-W99bmIJSkk!ZYaJLFmBuLc9gI_AW`Ee+o z`OC+r4M{Ls{v?7TkC+10Ls(Yku(Oyca!JdVs2Zrfq8?B!gvaR(iF(_e8(>=*NvK%A zqo#6O~3kJ@C2u7zYVN1&M*5vo5J z3wke{Mr6kk!+gzQdWhb+Bo!sUeeG(0ofm^CDg?dO;i*zH>e|XMH1EDVJk0a)`RRAO zim7*X_^sS=eb`lhG2QcgFecr{E6H8mZab_a-RF9_V~0ZYVmZp@J|CotUw~~v66e)| zPFu$uIzbC8`Oy;Gl>tu6J!~-SlZ@%i8dhplS0}%G)bXZDR2#dO#Du#;7nMdE-mR{I$PYrah8EG;c9ZX8YueXDfah!Rr8&{~F4*qQWxU)@l!5pQ!`;y9xW zP0}JfSi!3UgWb7s+e_k6?sTNF`=j7;+cfRhr9%i&!X)T z)rQ4(>YuR>n7*)Wm+TBav80C51hb^phOpyKz6zeb*T$LGOA{))S-Xg(gy%M%=-70e zP%k;c*$P8T+Ts=dl4+)&&OceKI9J-uKi}s240aDVVe;Hgoc>;|dT#HlaGmKCzVd9& zbj^?T;p|U3Oe0EN_H}|0$a%^=(@4GeVbvy^id$K3n zp&xBB%VYP$(y}imbLqX#*ZPQ>ml7lMd2lcjxlSenCD7AOU?mAe{?z`M*R1|N)yAe$ ze{o@v$^R&m6}5;UxHhjZhQd~WeHHc5Ddpm1yPRumtWtUgs{9+z&p zO}XS?7!>MHbD`3EUl&(ZCULI>jiPFO`2+kricVBzQsv&4ITU=E3_Qn-;}+&pD1OB! zJ;uLLSHS<-{Nmb6wcLy~eVk*p;;S3;lKZ(kbA_yKgP2i_i<@IV{oKS}B9_eYivOZc zW~~V9oM}58(a09VY>ajV$OPN-D~T+Db@#itpL{Vp*u+~>riewEW!5j{pX9nftRu!9 zU>|C!$qrLk8}rI@tLtbt%PZGb)9D>~4cp~ehgaO538qzcEJ}0PQCoKsbu6eY1jD`G z3@6f;H70%TMD$z_Ljo_&I8b*oQGL)+eNb^dK2!6l-3khu6+XV}%F(wc^((>RQ{q9! zWbapB;pagT>mrI#`VybI$8dg3gi$nL<)n;qRGzG{b2E1`D-f3Yz)T(CSRTnX#O6=9 z@JO&X?p&k2CNX$~Ljh_4CAH^Vj~vIgc=8~x+K-M-d7xF$`n1CQUOW>um&GaW52BV1 zS41fFh!$a%a~!U5qBaYfC3>DFPvN(qTx{$8sZH;GAbtcAJjZg$?n)e?u)0uIQYVV3^j-9a{W#P=Sep zNMSOmDRYBGD4_uEC{|(CgkrSY0nzNcNp9Q2^%2LIwGF{#^iO+|BV`YBtS{x6?Tal~ zpcpu#ZZbkkBVKRdn%J1rzjDZF)XBGmm6t!ChHyT*ho&&Xb7JN6lu!6)8NFy{YM-W) z*rN@Z)RVLCGj*fJNL7A9?zTRBc}hPNn^U=^6c2KE-!fW#A%IeslVFRuIml~X%_>b` z)Wnu(F>p6~Q{^M(NKjk8K;2?1j62tdj%=uGH#I%Tt|rT^I)bZi&IsS!){Xi+JKxcx zm4r%7K$zH&&T8Irch!roN+8jaYHTO`w=brjBbF#<@rCzgY zj`W~LKbyaL)kBJe4Dx9GuTTC>mn`0hypo@%u2P{z(&CYm6OCp)a^g8%824`0jV<#VPo zq>T?|s7YuBG8y=}(ObD0qCSOLDLAWcT6RX73_no$+(oZ1%$(VrH%LGVZwG4pdFEV{ z+fz)%9C+EtA=S#HpP(HdXW7OzzNu1tzy~k6e&vd17RJs!A_)3u!T7ByBsFL(ost0W z_{ThVnS)6ZLDe^|s%afUs6osUIu zwyPlHv&*DmY3Fu#K3SaGdm+PcQ>f;ftUB5FD7UolQD#R`eI~K3D8YpxuWK^G4Qq(n z`Hl89=EWOn$YyHUe|lb`p#&nSNHhG*Jx8fv6=#f)cd%S;Z2w`>BqriHPAtlgDzTfrtLJp z*QMKsH<*=@H?m{Y?wKsn>%kLR&7kQJxz4D|Yg-igr6z`%j(8CSnbtnH?EYonm%>Y; zK-jOZZ!eB1T4CB!SdXg=ATYC@Gx5aeM0MCy@KH|mxukV>2vj=yp5h$=a=c}6xYWVb#7j#yJ$Whx%)19bM>be z&l(%7S#25{8tN{0Pj-HGU+bkEt@Iz9?yaI`_qkqlbx`O%uUt$te{vdDiQlinH1%Th zrrNg6grW6QwqkJ?Kd;C&SJUn+tVc~~n1YFeRnK5@qmPHL;6IT)Xu_`#-coC zCT#2WjW^5i1}*^HuPV?vd*^EHt>T#~s`q$Vqh+et<~CrA9_HVLJzNdwEWx;nyrjU=7Q1Z*05R{Z-@kKB7_biRLT^p`vkLg_aYHl! zc*p7HFNSaV9-wA+%{8JhpB1}p1^^ZA0CfQbAC`~kA@#qUx?}R_hGkl@&0j44ibVWf z1X*uXC8G2f_zOisA`98&MTcb!G+`;H;SWToX<#{8DG{deZn<}1XO!?8Y*(ld^kSGs@@T?dJdzV*^fCLhvXZ#MBnCt|g0UD?+&0SY z=A<(7EFu*R^%rV#cma=4EqV$<4F-(#y)?L=qcuM&WM|?qhUvCk@t>C$M0snhU!<|+ z%Hj8hPCcwE86l)Rb|d*gn0V@ICzA>Qt-pDyd;IJP}G@)U(^_NKV}c9C^0VTEJVn!i0x5WrwJ1;eqCYVgwvKW z-h5Ue5Nd#BsgJIe>Z%m+CXByBZFvHgX1e>3nl4Pd{Pm$FAV)dXoKy+ODO&04Xr(@8 z@2o7TEb6R8_)!_#Bjt(RMmf_=QdJXTplGe1s-*+?UsW(Nf=0BTRGsKpoXr zho4vlI;^z6lm_JIo2PE%2W<2&Z{+V)mmt3Jo}v8KOj2DFs;_9PPp&=N%J!_fgrL~k zQ@JF)}U^J3d28<_HPX}>C`gOkrHMssj*z+0Rn;A>eM#^GAxBL z7WVcqO4ci)jUVR#L-zXh+BN_Y&=TWf-YlXKZFSyp(&uRQEQP5Xkqn5!gk$}OPw@gW zP%VDAx9qPvf@b+^=%ffK*VKw<0N&S{-D~I+2r1XqScc)BqZP0ee%b)&9(!Hk1kn4V zrlg@bfB{jTaBSixN}J6BwHR4MA9U0$#>MdhDp5DK1WcJ5)*ig(bdmec!doM=@Kq-U z3TAU*A`FYMR`!L0Ew+h_O?EIlx&@oUO@^|YMO*lMypw$9hG3Dj9S~2%; zoffyUBX6PL>efp8BC4>anH=sL8%;As+Ijz<)iupzZkNd8sV_Gey}Z*u%#&-U$6Tu- zS*j+dtpLz9cR=8#^#4_c{VJnx%WXUMiSEHzRByxWXqFlchjGL0`@u^WTw^obiM1X2 z)cJOK3`t*gZtl*L$8fvsM_BGHAGf|{k|QM4LcSMw&D^m7$p$&Z-)h1;RN1EnIpe+D z9!jdEbBG6cU>OGP7+`OlWa{}Hd3Uh#l7eBmpW6Fw4E~GS)ht((?v$Kw1=wOMsZhti zLJR860!ap0#E(sJYt+J9RM{s6-L;Pm$hnm`n-%HARDesIna0gb$>-K;>dKwv##oxB z0B$p#hzfNyEyfMEa0+L;ok%o;o2GIq{^-u7vWS1V1;h)25%3pl@{${D>a~!u;;t!L z^_y5jFenZxF#G3dQrg*c?XpwLri(6C&7Aj~k>`NXC27I#S__%8 zrniazrHD4iQdBkFb!!Lkk0785ys`C5{KgKU7kbpbv!GioyvM*m$WOKUpVf==Bw@c`qJeik_i&lV27^;l8mm%BVO=dmhd=?z{)ll?+BT?8 z-6BYl7lZb317f5`fyb_S+EHy<%J&3Kp7%Z;4_pF>)8s;1n$L#cRnn`sR8H4t9w#MO zT>#7B|J>XPPs+m`0KEDE{P2Juu({=6s&8*(sO)HOW^Ho2Ud|Q8E)U0!^~8f_`=y+M zv9ur7p1>2D4~}@YAPbl13dnuIw8@b7jMxZ-8Q%z=K^2<0+3t0J{(OCX?);NXsyzDv zl^yDMIU<@}{Mnaeg0%ZPpRJoh+_x_i5;P}bzBKFTs$+9=_Y5^>B<*13kTSR@=E!QN zIz1e;h&2j`Mm^OD#xcfTX5$4D1B_lL3xuM1BQem)8T}2OHQC zHne%AXm4Zd!1~I@-ssm^rvGgO14APsS`Xa9hSxTa&?wpEk$jct5L5{lE6k6#spyH? zwMAeV6aBc>%VXI2Q!L|2LW+YA|7x@&^fS{AE(BY~m;ps%+0QTx71}$HtA2{QObn*Y zi;aGp2}cC^%F3!VhLy`DWH#zs2^+EcYzS-2(A30Al8wg_iJRNc;Te6L%=k!6_S|An znM(};eYS@~_XK|H2Y~5hIy09g|a`$yYzx}&R-b{$M zR!DMhaE!w{T39`Q)cdHV9W*Ql8!yc9CBxr6%8g+5eE3*ygyn;5Zad;ogi*f?z5!gT z*-W>h>ex3UHWzgF=sY)XIy&4^|DZaQ?EgxDM69hL&HX z^rtrjT>~L@}?%*taTmks~3(Gp1^C1)ctW zhBwLf2y_v$4~&(-?!_22l#zS@lOC%!v8q_j|IlxQGS0z_!H$T_1zu({=V=#!ry1u wKlMSNJ{l0{AM@?+>VJPU{#9Lt{x9mk9+Zk;1R%N~5DM`11s=6b4{vJ!2cXw&761SM literal 0 HcmV?d00001 From 92ee029073962a3fd2875218eb1ca94376b4cb91 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sun, 7 Jul 2024 13:46:01 +0800 Subject: [PATCH 13/13] =?UTF-8?q?=E4=BC=98=E5=8C=96mapreduce=E7=9A=84demo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/bo/PhoneNumberCheckBo.java | 13 +++--- .../job/TestExcelAnalyseMapJobExecutor.java | 25 +++++++---- .../TestExcelAnalyseMapReduceJobExecutor.java | 18 ++++---- .../snailjob/job/TestMapJobExecutor.java | 4 +- .../job/TestMapReduceJobExecutor.java | 5 +-- .../snailjob/job/TestMyMapExecutor.java | 33 ++++++++++++++ .../snailjob/job/TestMyMapReduceExecutor.java | 45 +++++++++++++++++++ .../job/TestWorkflowAnnoJobExecutor.java | 1 + .../listener/PhoneNumberExcelListener.java | 14 +++--- src/main/resources/application.yml | 2 +- 10 files changed, 122 insertions(+), 38 deletions(-) create mode 100644 src/main/java/com/example/snailjob/job/TestMyMapExecutor.java create mode 100644 src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java diff --git a/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java index caa8adb..b808eb7 100644 --- a/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java +++ b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java @@ -1,6 +1,5 @@ package com.example.snailjob.bo; -import com.example.snailjob.po.PhoneNumberPo; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AllArgsConstructor; import lombok.Builder; @@ -23,18 +22,18 @@ import java.util.List; public class PhoneNumberCheckBo { @Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE) - private Long checkTotalNum = 0L; + private Long total = 0L; @Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE) - private Long checkErrorNum = 0L; + private Long error = 0L; @Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE) - private Long checkSuccessNum = 0L; + private Long success = 0L; @Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE) - private List checkErrorPhoneNumberList = new ArrayList<>(); + private List checkErrors = new ArrayList<>(); - @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE) - private List checkSuccessPhoneNumberList = new ArrayList<>(); +// @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE) +// private List checkSuccessPhoneNumberList = new ArrayList<>(); } diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java index 3b577dd..482e6c4 100644 --- a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java @@ -11,16 +11,18 @@ import com.example.snailjob.bo.PhoneNumberBo; import com.example.snailjob.bo.PhoneNumberCheckBo; import com.example.snailjob.dao.PhoneNumberDao; import com.example.snailjob.listener.PhoneNumberExcelListener; +import com.example.snailjob.po.PhoneNumberPo; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.InputStream; +import java.util.ArrayList; import java.util.List; /** - * 解析手机号excel文件,并将正确的手机号分片入库 + * 解析excel文件中的手机号码,并将错误的手机号分片入库 * * @author JiChenWang * @since 2024/6/30 10:37 @@ -44,7 +46,7 @@ public class TestExcelAnalyseMapJobExecutor { * @since 2024/6/30 10:48 */ @MapExecutor - public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler> mapHandler) { List> ranges = null; // 先获取文件总行数,便于分组 try { @@ -54,11 +56,11 @@ public class TestExcelAnalyseMapJobExecutor { EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); // 设置区间范围 - ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE); } catch (Exception e) { log.error("文件读取异常", e.getMessage()); } - return mapHandler.doMap(ranges, "MONTH_MAP"); + return mapHandler.doMap(ranges, "TWO_MAP"); } @@ -70,7 +72,7 @@ public class TestExcelAnalyseMapJobExecutor { * @author JichenWang * @since 2024/6/30 11:05 */ - @MapExecutor(taskName = "MONTH_MAP") + @MapExecutor(taskName = "TWO_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { // 获取本次要处理的区间 final List mapResult = (List) mapArgs.getMapResult(); @@ -87,11 +89,18 @@ public class TestExcelAnalyseMapJobExecutor { } // 如果正确手机号不为空,则入库 - if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckSuccessPhoneNumberList())) { - phoneNumberDao.insertBatch(phoneNumberCheckBo.getCheckSuccessPhoneNumberList()); + if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckErrors())) { + + List numberPos = new ArrayList<>(); + for (String no : phoneNumberCheckBo.getCheckErrors()) { + PhoneNumberPo numberPo = new PhoneNumberPo(); + numberPo.setPhoneNumber(no); + numberPos.add(numberPo); + } + phoneNumberDao.insertBatch(numberPos); } - return ExecuteResult.success(phoneNumberCheckBo.getCheckSuccessNum()); + return ExecuteResult.success(phoneNumberCheckBo.getError()); } } diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java index aabf428..944f362 100644 --- a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java @@ -44,7 +44,7 @@ public class TestExcelAnalyseMapReduceJobExecutor { * @since 2024/6/29 14:03 */ @MapExecutor - public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler> mapHandler) { List> ranges = null; // 先获取文件总行数,便于分组 try { @@ -54,11 +54,11 @@ public class TestExcelAnalyseMapReduceJobExecutor { EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); // 设置区间范围 - ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE); } catch (Exception e) { log.error("文件读取异常", e.getMessage()); } - return mapHandler.doMap(ranges, "MONTH_MAP"); + return mapHandler.doMap(ranges, "TWO_MAP"); } /** @@ -68,7 +68,7 @@ public class TestExcelAnalyseMapReduceJobExecutor { * @author JichenWang * @since 2024/6/29 14:04 */ - @MapExecutor(taskName = "MONTH_MAP") + @MapExecutor(taskName = "TWO_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { // 获取本次要处理的区间 final List mapResult = (List) mapArgs.getMapResult(); @@ -116,17 +116,17 @@ public class TestExcelAnalyseMapReduceJobExecutor { private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) { final List phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class); // 获取校验总数 - final long checkTotalNum = phoneNumberCheckBoList.get(0).getCheckTotalNum(); + final long checkTotalNum = phoneNumberCheckBoList.get(0).getTotal(); // 汇总校验失败数量 - final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckErrorNum).sum(); + final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getError).sum(); // 汇总校验成功数量 - final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckSuccessNum).sum(); + final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getSuccess).sum(); // 汇总错误手机号 final List errorPhoneNumberList = new ArrayList<>(); - phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrorPhoneNumberList())); + phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrors())); // 汇总手机号校验结果 - return PhoneNumberCheckBo.builder().checkTotalNum(checkTotalNum).checkErrorNum(checkErrorNum).checkSuccessNum(checkSuccessNum).checkErrorPhoneNumberList(errorPhoneNumberList).build(); + return PhoneNumberCheckBo.builder().total(checkTotalNum).error(checkErrorNum).success(checkSuccessNum).checkErrors(errorPhoneNumberList).build(); } } diff --git a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java index 40fa226..bec993c 100644 --- a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java @@ -60,8 +60,8 @@ public class TestMapJobExecutor extends AbstractMapExecutor { @Getter private enum MapEnum { LAST_MAP(null, null), - MONTH_MAP(new QuarterMap(), LAST_MAP), - MAP_ROOT(new RootMap(), MONTH_MAP), + QUARTER_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), QUARTER_MAP), ; private final Map map; diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java index 7fc4d9e..d3fbbe9 100644 --- a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -1,6 +1,5 @@ package com.example.snailjob.job; -import cn.hutool.core.util.ByteUtil; import com.aizuda.snailjob.client.job.core.MapHandler; import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; @@ -75,8 +74,8 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Getter private enum MapEnum { LAST_MAP(null, null), - MONTH_MAP(new QuarterMap(), LAST_MAP), - MAP_ROOT(new RootMap(), MONTH_MAP), + QUARTER_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), QUARTER_MAP), ; private final Map map; diff --git a/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java b/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java new file mode 100644 index 0000000..fe4d855 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java @@ -0,0 +1,33 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author opensnail + * @date 2024-07-07 12:06:57 + * @since sj_1.1.0 + */ +@Component +public class TestMyMapExecutor extends AbstractMapExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) { + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP"); + } + + return ExecuteResult.success(mapArgs.getMapResult()); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java b/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java new file mode 100644 index 0000000..8ba41dc --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java @@ -0,0 +1,45 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author opensnail + * @date 2024-07-07 12:06:57 + * @since sj_1.1.0 + */ +@Component +public class TestMyMapReduceExecutor extends AbstractMapReduceExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) { + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP"); + } + + return ExecuteResult.success(mapArgs.getMapResult()); + } + + @Override + protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) { + List mapResult = (List) reduceArgs.getMapResult(); + return ExecuteResult.success(mapResult.stream().mapToInt(Integer::parseInt).sum()); + } + + @Override + protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) { + List reduces = (List) mergeReduceArgs.getReduces(); + return ExecuteResult.success(reduces.stream().mapToInt(Integer::parseInt).sum()); + } + + +} diff --git a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java index 95a82af..540b6ea 100644 --- a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java @@ -26,6 +26,7 @@ public class TestWorkflowAnnoJobExecutor { int i = new Random().nextInt(1000); jobArgs.appendContext("name" + i, "小蜗牛" + i); jobArgs.appendContext("age", i); + SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext()); return ExecuteResult.success(failOrderPo); } diff --git a/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java index 62bf08f..ddceecd 100644 --- a/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java +++ b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java @@ -6,10 +6,8 @@ import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.event.AnalysisEventListener; import com.example.snailjob.bo.PhoneNumberBo; import com.example.snailjob.bo.PhoneNumberCheckBo; -import com.example.snailjob.po.PhoneNumberPo; import lombok.extern.slf4j.Slf4j; -import java.time.LocalDateTime; import java.util.Map; /** @@ -35,7 +33,7 @@ public class PhoneNumberExcelListener extends AnalysisEventListener headMap, AnalysisContext context) { - this.phoneNumberCheckBo.setCheckTotalNum(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1))); + this.phoneNumberCheckBo.setTotal(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1))); } @Override @@ -55,12 +53,12 @@ public class PhoneNumberExcelListener extends AnalysisEventListener