diff --git a/docs/demo.sql b/docs/demo.sql index 1498ad0..2a38e66 100644 --- a/docs/demo.sql +++ b/docs/demo.sql @@ -12,4 +12,12 @@ CREATE TABLE fail_order `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`) -); \ No newline at end of file +); + +-- 手机号表 +CREATE TABLE `phone_number` ( + `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键', + `phone_number` varchar(20) NOT NULL DEFAULT '' COMMENT '手机号', + `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='手机号表' diff --git a/pom.xml b/pom.xml index 81e595a..51efe03 100644 --- a/pom.xml +++ b/pom.xml @@ -44,17 +44,17 @@ com.aizuda snail-job-client-starter - 1.1.0-beta1 + 1.1.0-beta2 com.aizuda snail-job-client-retry-core - 1.1.0-beta1 + 1.1.0-beta2 com.aizuda snail-job-client-job-core - 1.1.0-beta1 + 1.1.0-beta2 com.googlecode.aviator @@ -111,6 +111,16 @@ okhttp 4.2.0 + + com.alibaba + easyexcel + 3.1.3 + + + com.alibaba + fastjson + 1.2.83 + diff --git a/src/main/java/com/example/snailjob/bo/PhoneNumberBo.java b/src/main/java/com/example/snailjob/bo/PhoneNumberBo.java new file mode 100644 index 0000000..2b79e6d --- /dev/null +++ b/src/main/java/com/example/snailjob/bo/PhoneNumberBo.java @@ -0,0 +1,18 @@ +package com.example.snailjob.bo; + +import com.alibaba.excel.annotation.ExcelProperty; +import lombok.Data; + +/** + * excel表格手机号BO + * + * @author JiChenWang + * @since 2024/6/27 20:28 + */ +@Data +public class PhoneNumberBo { + + @ExcelProperty(value = "手机号码", index = 0) + private String phoneNumber; + +} diff --git a/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java new file mode 100644 index 0000000..b808eb7 --- /dev/null +++ b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java @@ -0,0 +1,39 @@ +package com.example.snailjob.bo; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * 手机号检测BO + * + * @author JiChenWang + * @since 2024/6/27 20:50 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PhoneNumberCheckBo { + + @Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE) + private Long total = 0L; + + @Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE) + private Long error = 0L; + + @Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE) + private Long success = 0L; + + @Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE) + private List checkErrors = new ArrayList<>(); + +// @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE) +// private List checkSuccessPhoneNumberList = new ArrayList<>(); + +} diff --git a/src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java b/src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java new file mode 100644 index 0000000..fe36b1d --- /dev/null +++ b/src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java @@ -0,0 +1,15 @@ +package com.example.snailjob.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.example.snailjob.po.PhoneNumberPo; +import org.springframework.stereotype.Repository; + +/** + * 手机号mapper + * + * @author JiChenWang + * @since 2024/6/30 11:55 + */ +@Repository +public interface PhoneNumberBaseMapper extends BaseMapper { +} diff --git a/src/main/java/com/example/snailjob/dao/PhoneNumberDao.java b/src/main/java/com/example/snailjob/dao/PhoneNumberDao.java new file mode 100644 index 0000000..18d71dd --- /dev/null +++ b/src/main/java/com/example/snailjob/dao/PhoneNumberDao.java @@ -0,0 +1,34 @@ +package com.example.snailjob.dao; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.example.snailjob.po.PhoneNumberPo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * TODO + * + * @author JiChenWang + * @since 2024/6/30 11:58 + */ +@Service +public class PhoneNumberDao extends ServiceImpl { + + @Autowired + private PhoneNumberBaseMapper phoneNumberBaseMapper; + + /** + * 批量保存手机号信息 + * + * @param phoneNumberPoList 手机号po列表 + * @return Boolean 保存成功标识:true-成功、false-失败 + * @author JichenWang + * @since 2024/6/30 12:03 + */ + public Boolean insertBatch (List phoneNumberPoList) { + return this.saveBatch(phoneNumberPoList); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java new file mode 100644 index 0000000..b275ad9 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapJobExecutor.java @@ -0,0 +1,40 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-06-26 + */ +@Component +@JobExecutor(name = "testAnnoMapJobExecutor") +public class TestAnnoMapJobExecutor { + + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); + return mapHandler.doMap(Lists.newArrayList("1", "2", "3"), "MONTH_MAP"); + } + + @MapExecutor(taskName = "MONTH_MAP") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); + return ExecuteResult.success(123); + } + + @MapExecutor(taskName = "LAST_MAP") + public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); + return ExecuteResult.success(); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java new file mode 100644 index 0000000..75aeb12 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestAnnoMapReduceJobExecutor.java @@ -0,0 +1,69 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; +import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author: opensnail + * @date : 2024-06-26 + */ +@Component +@JobExecutor(name = "testAnnoMapReduceJobExecutor") +public class TestAnnoMapReduceJobExecutor { + + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); + mapArgs.appendContext("name", "zsg"); + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4", "5", "6"), "MONTH_MAP"); + } + + @MapExecutor(taskName = "MONTH_MAP") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + System.out.println(mapArgs); + System.out.println(mapArgs.getWfContext()); + mapArgs.appendContext("age", "111"); + return ExecuteResult.success(Integer.parseInt((String) mapArgs.getMapResult()) * 2); + } + + @ReduceExecutor + public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) { + System.out.println(mapReduceArgs); + System.out.println(mapReduceArgs.getWfContext()); + return ExecuteResult.success( + mapReduceArgs.getMapResult() + .stream() + .map(String::valueOf) + .map(Integer::parseInt) + .mapToInt(Integer::intValue).sum()); + } + + /** + * 当只有一个reduce任务时无此执行器 + */ + @MergeReduceExecutor + public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) { + System.out.println(mergeReduceArgs); + System.out.println(mergeReduceArgs.getWfContext()); + return ExecuteResult.success( + mergeReduceArgs.getReduces() + .stream() + .map(String::valueOf) + .map(Integer::parseInt) + .mapToInt(Integer::intValue).sum()); + } +} diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java new file mode 100644 index 0000000..482e6c4 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java @@ -0,0 +1,106 @@ +package com.example.snailjob.job; + +import cn.hutool.core.util.ObjectUtil; +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.alibaba.excel.EasyExcel; +import com.example.snailjob.bo.PhoneNumberBo; +import com.example.snailjob.bo.PhoneNumberCheckBo; +import com.example.snailjob.dao.PhoneNumberDao; +import com.example.snailjob.listener.PhoneNumberExcelListener; +import com.example.snailjob.po.PhoneNumberPo; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * 解析excel文件中的手机号码,并将错误的手机号分片入库 + * + * @author JiChenWang + * @since 2024/6/30 10:37 + */ +@Slf4j +@Component +@JobExecutor(name = "testExcelAnalyseMapJobExecutor") +public class TestExcelAnalyseMapJobExecutor { + + private final Integer BATCH_SIZE = 100; + + @Autowired + private PhoneNumberDao phoneNumberDao; + + /** + * 读取手机号文件总行数,并进行分组 + * 比如文档中的手机号总量为307条,每100条一个分组,分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}] + * + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/30 10:48 + */ + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler> mapHandler) { + List> ranges = null; + // 先获取文件总行数,便于分组 + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); + + // 设置区间范围 + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE); + } catch (Exception e) { + log.error("文件读取异常", e.getMessage()); + } + return mapHandler.doMap(ranges, "TWO_MAP"); + + } + + /** + * + * + * @param mapArgs + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/30 11:05 + */ + @MapExecutor(taskName = "TWO_MAP") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + // 获取本次要处理的区间 + final List mapResult = (List) mapArgs.getMapResult(); + log.info("本次要处理的区间为:{}", mapResult); + + // 按照处理区间,去读取数据 + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync(); + } catch (Exception e) { + log.error("文件读取异常:", e.getMessage()); + } + + // 如果正确手机号不为空,则入库 + if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckErrors())) { + + List numberPos = new ArrayList<>(); + for (String no : phoneNumberCheckBo.getCheckErrors()) { + PhoneNumberPo numberPo = new PhoneNumberPo(); + numberPo.setPhoneNumber(no); + numberPos.add(numberPo); + } + phoneNumberDao.insertBatch(numberPos); + } + + return ExecuteResult.success(phoneNumberCheckBo.getError()); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java new file mode 100644 index 0000000..944f362 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java @@ -0,0 +1,132 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; +import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.alibaba.excel.EasyExcel; +import com.alibaba.fastjson.JSONArray; +import com.example.snailjob.bo.PhoneNumberBo; +import com.example.snailjob.bo.PhoneNumberCheckBo; +import com.example.snailjob.listener.PhoneNumberExcelListener; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * 解析校验Excel中的手机号,统计出错手机号数量,并返回错误手机号详情 + * + * @author JichenWang + * @since 2024/6/27 19:52 + */ +@Slf4j +@Component +@JobExecutor(name = "TestExcelAnalyseMapReduceJobExecutor") +public class TestExcelAnalyseMapReduceJobExecutor { + + private final Integer BATCH_SIZE = 100; + + /** + * 处理手机号文件信息,将文档中的手机号进行分组 + * 比如文档中的手机号总量为307条,每100条一个分组,分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}] + * + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/29 14:03 + */ + @MapExecutor + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler> mapHandler) { + List> ranges = null; + // 先获取文件总行数,便于分组 + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); + + // 设置区间范围 + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE); + } catch (Exception e) { + log.error("文件读取异常", e.getMessage()); + } + return mapHandler.doMap(ranges, "TWO_MAP"); + } + + /** + * 处理每个分组内容,如读取{0,99}区间的手机号,并解析 + * + * @return ExecuteResult + * @author JichenWang + * @since 2024/6/29 14:04 + */ + @MapExecutor(taskName = "TWO_MAP") + public ExecuteResult monthMapExecute(MapArgs mapArgs) { + // 获取本次要处理的区间 + final List mapResult = (List) mapArgs.getMapResult(); + log.info("本次要处理的区间为:{}", mapResult); + + // 按照处理区间,去读取数据 + final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo(); + try { + @Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx"); + PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE); + EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync(); + } catch (Exception e) { + log.error("文件读取异常:", e.getMessage()); + } + + return ExecuteResult.success(phoneNumberCheckBo); + } + + + @ReduceExecutor + public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) { + log.info("WJC Test reduceExecute, 参数为:{}", mapReduceArgs.getMapResult()); + final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mapReduceArgs.getMapResult().toString()); + return ExecuteResult.success(phoneNumberCheckBo); + } + + /** + * 当只有一个reduce任务时无此执行器 + */ + @MergeReduceExecutor + public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) { + final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mergeReduceArgs.getReduces().toString()); + log.info("WJC 最终检测结果为:{}", phoneNumberCheckBo); + return ExecuteResult.success(phoneNumberCheckBo); + } + + /** + * 构造汇总手机号校验结果BO + * + * @param phoneNumberCheckBoStr 手机号校验BO字符串 + * @return PhoneNumberCheckBo 汇总手机号校验结果BO + * @author JichenWang + * @since 2024/6/29 14:24 + */ + private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) { + final List phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class); + // 获取校验总数 + final long checkTotalNum = phoneNumberCheckBoList.get(0).getTotal(); + // 汇总校验失败数量 + final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getError).sum(); + // 汇总校验成功数量 + final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getSuccess).sum(); + // 汇总错误手机号 + final List errorPhoneNumberList = new ArrayList<>(); + phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrors())); + + // 汇总手机号校验结果 + return PhoneNumberCheckBo.builder().total(checkTotalNum).error(checkErrorNum).success(checkSuccessNum).checkErrors(errorPhoneNumberList).build(); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java new file mode 100644 index 0000000..bec993c --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java @@ -0,0 +1,161 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; + +import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +/** + * 以下是一个统计某电商公司商家的一年的营业额的计算过程 + * + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Component +public class TestMapJobExecutor extends AbstractMapExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); + if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) { + Map map = mapEnum.getMap(); + MapEnum nextMap = mapEnum.getNextMap(); + String nextName = null; + if (Objects.nonNull(nextMap)) { + nextName = nextMap.name(); + } + + return mapHandler.doMap(map.map(mapArgs), nextName); + } + + // 未找到map的任务,则说明当前需要进行处理 + JsonNode json = JsonUtil.toJson(mapArgs.getMapResult()); + SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", json); + // 获取最后一次map的信息. + SubTask subTask = JsonUtil.parseObject(json.toString(), SubTask.class); + // 此处可以统计数据或者做其他的事情 + // 模拟统计营业额 + int turnover = new Random().nextInt(1000000); + return ExecuteResult.success(turnover); + + } + + @Getter + private enum MapEnum { + LAST_MAP(null, null), + QUARTER_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), QUARTER_MAP), + ; + + private final Map map; + private final MapEnum nextMap; + MapEnum(Map map, MapEnum nextMap) { + this.map = map; + this.nextMap = nextMap; + } + + public static MapEnum ofMap(String taskName) { + for (final MapEnum value : MapEnum.values()) { + if (value.name().equals(taskName)) { + return value; + } + } + + return null; + } + + } + + private static class RootMap implements Map { + + @Override + public List map(MapArgs args) { + // 第一层按照商家ID分片 + // 假设总共有一百万商家 每个分片处理10万商家 + List> ranges = doSharding(1L, 1000000L, 100000); + return ranges; + } + } + + public static class QuarterMap implements Map { + + @Override + public List map(MapArgs args) { + + // 第二层按照月分片 + // 4个季度 + JsonNode json = JsonUtil.toJson(args.getMapResult()); + List list = new ArrayList<>(); + for (JsonNode jsonNode : json) { + long id = jsonNode.asLong(); + for (int i = 1; i <= 4; i++) { + list.add(new TestMapReduceJobExecutor.QuarterMap.SubTask(id, i)); + } + } + return list; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class SubTask { + // 商家id + private Long id; + + // 需要处理的月份 + private Integer quarter; + + } + } + + interface Map{ + List map(MapArgs args); + } + + public static List> doSharding(Long min, Long max, int interval) { + + if (max.equals(min)) { + return new ArrayList<>(); + } + + // 总数量 + long total = max - min + 1; + + // 计算分页总页数 + Long totalPages = total / interval; + if (total % interval != 0) { + totalPages++; + } + + List> partitions = new ArrayList<>(); + for (Long index = 0L; index < totalPages; index++) { + + // 计算起始点 因为是从min开始所以每次需要加上一个min + Long start = index * interval + min; + + // 结算结束点 若最后一个 start + interval - 1 > max 取max + // 减一是保证 [start, end] 都是闭区间 + Long end = Math.min(start + interval - 1, max); + partitions.add(Lists.newArrayList(start, end)); + } + + return partitions; + } +} diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java new file mode 100644 index 0000000..d3fbbe9 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -0,0 +1,175 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Random; + + +/** + * 以下是一个统计某电商公司商家的一年的营业额的计算过程 + * + * @author: opensnail + * @date : 2024-06-13 + * @since : sj_1.1.0 + */ +@Component +public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName()); + if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) { + Map map = mapEnum.getMap(); + MapEnum nextMap = mapEnum.getNextMap(); + String nextName = null; + if (Objects.nonNull(nextMap)) { + nextName = nextMap.name(); + } + + return mapHandler.doMap(map.map(mapArgs), nextName); + } + + // 未找到map的任务,则说明当前需要进行处理 + JsonNode json = JsonUtil.toJson(mapArgs.getMapResult()); + SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", json); + // 获取最后一次map的信息. + SubTask subTask = JsonUtil.parseObject(json.toString(), SubTask.class); + // 此处可以统计数据或者做其他的事情 + // 模拟统计营业额 + int turnover = new Random().nextInt(1000000); + return ExecuteResult.success(turnover); + + } + + @Override + protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) { + return ExecuteResult.success(reduceArgs.getMapResult()); + } + + @Override + protected ExecuteResult doMergeReduceExecute(MergeReduceArgs reduceArgs) { + List reduces = reduceArgs.getReduces(); + SnailJobLog.LOCAL.info("merge reduce {}", reduces); + return ExecuteResult.success(111); + } + + @Getter + private enum MapEnum { + LAST_MAP(null, null), + QUARTER_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), QUARTER_MAP), + ; + + private final Map map; + private final MapEnum nextMap; + MapEnum(Map map, MapEnum nextMap) { + this.map = map; + this.nextMap = nextMap; + } + + public static MapEnum ofMap(String taskName) { + for (final MapEnum value : MapEnum.values()) { + if (value.name().equals(taskName)) { + return value; + } + } + + return null; + } + + } + + private static class RootMap implements Map { + + @Override + public List map(MapArgs args) { + // 第一层按照商家ID分片 + // 假设总共有一百万商家 每个分片处理10万商家 + List> ranges = doSharding(1L, 1000000L, 100000); + return ranges; + } + } + + public static class QuarterMap implements Map { + + @Override + public List map(MapArgs args) { + + // 第二层按照月分片 + // 4个季度 + JsonNode json = JsonUtil.toJson(args.getMapResult()); + List list = new ArrayList<>(); + for (JsonNode jsonNode : json) { + long id = jsonNode.asLong(); + for (int i = 1; i <= 4; i++) { + list.add(new SubTask(id, i)); + } + } + return list; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class SubTask { + // 商家id + private Long id; + + // 需要处理的月份 + private Integer quarter; + + } + } + + interface Map{ + List map(MapArgs args); + } + + public static List> doSharding(Long min, Long max, int interval) { + + if (max.equals(min)) { + return new ArrayList<>(); + } + + // 总数量 + long total = max - min + 1; + + // 计算分页总页数 + Long totalPages = total / interval; + if (total % interval != 0) { + totalPages++; + } + + List> partitions = new ArrayList<>(); + for (Long index = 0L; index < totalPages; index++) { + + // 计算起始点 因为是从min开始所以每次需要加上一个min + Long start = index * interval + min; + + // 结算结束点 若最后一个 start + interval - 1 > max 取max + // 减一是保证 [start, end] 都是闭区间 + Long end = Math.min(start + interval - 1, max); + partitions.add(Lists.newArrayList(start, end)); + } + + return partitions; + } +} diff --git a/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java b/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java new file mode 100644 index 0000000..fe4d855 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java @@ -0,0 +1,33 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author opensnail + * @date 2024-07-07 12:06:57 + * @since sj_1.1.0 + */ +@Component +public class TestMyMapExecutor extends AbstractMapExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) { + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP"); + } + + return ExecuteResult.success(mapArgs.getMapResult()); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java b/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java new file mode 100644 index 0000000..8ba41dc --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java @@ -0,0 +1,45 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author opensnail + * @date 2024-07-07 12:06:57 + * @since sj_1.1.0 + */ +@Component +public class TestMyMapReduceExecutor extends AbstractMapReduceExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) { + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP"); + } + + return ExecuteResult.success(mapArgs.getMapResult()); + } + + @Override + protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) { + List mapResult = (List) reduceArgs.getMapResult(); + return ExecuteResult.success(mapResult.stream().mapToInt(Integer::parseInt).sum()); + } + + @Override + protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) { + List reduces = (List) mergeReduceArgs.getReduces(); + return ExecuteResult.success(reduces.stream().mapToInt(Integer::parseInt).sum()); + } + + +} diff --git a/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java b/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java index b82bb91..c1cdf84 100644 --- a/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestPartitionJobExecutor.java @@ -17,7 +17,7 @@ import org.springframework.stereotype.Component; public class TestPartitionJobExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) { - if (jobArgs.getArgsStr().equals("1")) { + if (jobArgs.getJobParams().equals("1")) { throw new NullPointerException("分区空指针抛异常了"); } FailOrderPo failOrderPo = new FailOrderPo(); diff --git a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java index 4914da1..540b6ea 100644 --- a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java @@ -8,6 +8,8 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.example.snailjob.po.FailOrderPo; import org.springframework.stereotype.Component; +import java.util.Random; + /** * @author www.byteblogs.com * @date 2023-09-28 22:54:07 @@ -18,11 +20,13 @@ import org.springframework.stereotype.Component; public class TestWorkflowAnnoJobExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException { -// for (int i = 0; i < 30; i++) { -// SnailJobLog.REMOTE.info("任务执行开始. [{}]", i + "" + JsonUtil.toJsonString(jobArgs)); -// } FailOrderPo failOrderPo = new FailOrderPo(); failOrderPo.setOrderId("xiaowoniu"); + // 测试上下文传递 + int i = new Random().nextInt(1000); + jobArgs.appendContext("name" + i, "小蜗牛" + i); + jobArgs.appendContext("age", i); + SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext()); return ExecuteResult.success(failOrderPo); } diff --git a/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java new file mode 100644 index 0000000..ddceecd --- /dev/null +++ b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java @@ -0,0 +1,76 @@ +package com.example.snailjob.listener; + +import cn.hutool.core.lang.Validator; +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.excel.context.AnalysisContext; +import com.alibaba.excel.event.AnalysisEventListener; +import com.example.snailjob.bo.PhoneNumberBo; +import com.example.snailjob.bo.PhoneNumberCheckBo; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +/** + * 手机号excel解析 Listener + * + * @author JiChenWang + * @since 2024/6/27 20:38 + */ +@Slf4j +public class PhoneNumberExcelListener extends AnalysisEventListener { + + /** 手机号校验BO **/ + private PhoneNumberCheckBo phoneNumberCheckBo; + + /** 是否第一次读取Excel **/ + private Boolean firstReadStatus = false; + + /** 读取批次大小 **/ + private Integer batchSize = 100; + + /** 已读取的数据数量 **/ + private Integer cacheSize = 0; + + @Override + public void invokeHeadMap(Map headMap, AnalysisContext context) { + this.phoneNumberCheckBo.setTotal(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1))); + } + + @Override + public void invoke(PhoneNumberBo phoneNumberBo, AnalysisContext context) { + // 如果是第一次读该文件,已读取的数量已超过读取批次大小,直接返回 + if (firstReadStatus || cacheSize >= batchSize) { + return; + } + + cacheSize++; + + if (ObjectUtil.isEmpty(phoneNumberBo.getPhoneNumber())) { + return; + } + + // 校验手机号 + log.info("本次校验的手机号为: {}", phoneNumberBo.getPhoneNumber()); + Boolean validateStatus = Validator.isMobile(phoneNumberBo.getPhoneNumber()); + if (validateStatus) { + this.phoneNumberCheckBo.setSuccess(this.phoneNumberCheckBo.getSuccess() + 1); +// final PhoneNumberPo phoneNumberPo = PhoneNumberPo.builder().phoneNumber(phoneNumberBo.getPhoneNumber()).createTime(LocalDateTime.now()).build(); +// this.phoneNumberCheckBo.getCheckSuccessPhoneNumberList().add(phoneNumberPo); + } else { + this.phoneNumberCheckBo.setError(this.phoneNumberCheckBo.getError() + 1); + this.phoneNumberCheckBo.getCheckErrors().add(phoneNumberBo.getPhoneNumber()); + } + } + + @Override + public void doAfterAllAnalysed(AnalysisContext context) { + + } + + public PhoneNumberExcelListener(PhoneNumberCheckBo phoneNumberCheckBo, Boolean firstReadStatus, Integer batchSize) { + this.phoneNumberCheckBo = phoneNumberCheckBo; + this.firstReadStatus = firstReadStatus; + this.batchSize = batchSize; + } + +} diff --git a/src/main/java/com/example/snailjob/po/PhoneNumberPo.java b/src/main/java/com/example/snailjob/po/PhoneNumberPo.java new file mode 100644 index 0000000..6372220 --- /dev/null +++ b/src/main/java/com/example/snailjob/po/PhoneNumberPo.java @@ -0,0 +1,39 @@ +package com.example.snailjob.po; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * phone_number + * + * @author JiChenWang + * @since 2024/6/30 11:48 + */ +@TableName("phone_number") +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class PhoneNumberPo { + + /** + * 主键 + */ + private Long id; + + /** + * 手机号 + */ + private String phoneNumber; + + /** + * 创建时间 + */ + private LocalDateTime createTime; + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a14b5f4..e38365f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -9,7 +9,7 @@ spring: active: dev datasource: name: snail_job - url: jdbc:mysql://localhost:3306/snail_job_260?useSSL=false&characterEncoding=utf8&useUnicode=true + url: jdbc:mysql://localhost:3306/snail_job?useSSL=false&characterEncoding=utf8&useUnicode=true username: root password: root type: com.zaxxer.hikari.HikariDataSource diff --git a/src/main/resources/doc/number.xlsx b/src/main/resources/doc/number.xlsx new file mode 100644 index 0000000..4653ed0 Binary files /dev/null and b/src/main/resources/doc/number.xlsx differ