diff --git a/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java index caa8adb..b808eb7 100644 --- a/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java +++ b/src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java @@ -1,6 +1,5 @@ package com.example.snailjob.bo; -import com.example.snailjob.po.PhoneNumberPo; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AllArgsConstructor; import lombok.Builder; @@ -23,18 +22,18 @@ import java.util.List; public class PhoneNumberCheckBo { @Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE) - private Long checkTotalNum = 0L; + private Long total = 0L; @Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE) - private Long checkErrorNum = 0L; + private Long error = 0L; @Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE) - private Long checkSuccessNum = 0L; + private Long success = 0L; @Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE) - private List checkErrorPhoneNumberList = new ArrayList<>(); + private List checkErrors = new ArrayList<>(); - @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE) - private List checkSuccessPhoneNumberList = new ArrayList<>(); +// @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE) +// private List checkSuccessPhoneNumberList = new ArrayList<>(); } diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java index 3b577dd..482e6c4 100644 --- a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapJobExecutor.java @@ -11,16 +11,18 @@ import com.example.snailjob.bo.PhoneNumberBo; import com.example.snailjob.bo.PhoneNumberCheckBo; import com.example.snailjob.dao.PhoneNumberDao; import com.example.snailjob.listener.PhoneNumberExcelListener; +import com.example.snailjob.po.PhoneNumberPo; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.InputStream; +import java.util.ArrayList; import java.util.List; /** - * 解析手机号excel文件,并将正确的手机号分片入库 + * 解析excel文件中的手机号码,并将错误的手机号分片入库 * * @author JiChenWang * @since 2024/6/30 10:37 @@ -44,7 +46,7 @@ public class TestExcelAnalyseMapJobExecutor { * @since 2024/6/30 10:48 */ @MapExecutor - public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler> mapHandler) { List> ranges = null; // 先获取文件总行数,便于分组 try { @@ -54,11 +56,11 @@ public class TestExcelAnalyseMapJobExecutor { EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); // 设置区间范围 - ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE); } catch (Exception e) { log.error("文件读取异常", e.getMessage()); } - return mapHandler.doMap(ranges, "MONTH_MAP"); + return mapHandler.doMap(ranges, "TWO_MAP"); } @@ -70,7 +72,7 @@ public class TestExcelAnalyseMapJobExecutor { * @author JichenWang * @since 2024/6/30 11:05 */ - @MapExecutor(taskName = "MONTH_MAP") + @MapExecutor(taskName = "TWO_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { // 获取本次要处理的区间 final List mapResult = (List) mapArgs.getMapResult(); @@ -87,11 +89,18 @@ public class TestExcelAnalyseMapJobExecutor { } // 如果正确手机号不为空,则入库 - if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckSuccessPhoneNumberList())) { - phoneNumberDao.insertBatch(phoneNumberCheckBo.getCheckSuccessPhoneNumberList()); + if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckErrors())) { + + List numberPos = new ArrayList<>(); + for (String no : phoneNumberCheckBo.getCheckErrors()) { + PhoneNumberPo numberPo = new PhoneNumberPo(); + numberPo.setPhoneNumber(no); + numberPos.add(numberPo); + } + phoneNumberDao.insertBatch(numberPos); } - return ExecuteResult.success(phoneNumberCheckBo.getCheckSuccessNum()); + return ExecuteResult.success(phoneNumberCheckBo.getError()); } } diff --git a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java index aabf428..944f362 100644 --- a/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestExcelAnalyseMapReduceJobExecutor.java @@ -44,7 +44,7 @@ public class TestExcelAnalyseMapReduceJobExecutor { * @since 2024/6/29 14:03 */ @MapExecutor - public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler> mapHandler) { List> ranges = null; // 先获取文件总行数,便于分组 try { @@ -54,11 +54,11 @@ public class TestExcelAnalyseMapReduceJobExecutor { EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); // 设置区间范围 - ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); + ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE); } catch (Exception e) { log.error("文件读取异常", e.getMessage()); } - return mapHandler.doMap(ranges, "MONTH_MAP"); + return mapHandler.doMap(ranges, "TWO_MAP"); } /** @@ -68,7 +68,7 @@ public class TestExcelAnalyseMapReduceJobExecutor { * @author JichenWang * @since 2024/6/29 14:04 */ - @MapExecutor(taskName = "MONTH_MAP") + @MapExecutor(taskName = "TWO_MAP") public ExecuteResult monthMapExecute(MapArgs mapArgs) { // 获取本次要处理的区间 final List mapResult = (List) mapArgs.getMapResult(); @@ -116,17 +116,17 @@ public class TestExcelAnalyseMapReduceJobExecutor { private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) { final List phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class); // 获取校验总数 - final long checkTotalNum = phoneNumberCheckBoList.get(0).getCheckTotalNum(); + final long checkTotalNum = phoneNumberCheckBoList.get(0).getTotal(); // 汇总校验失败数量 - final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckErrorNum).sum(); + final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getError).sum(); // 汇总校验成功数量 - final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckSuccessNum).sum(); + final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getSuccess).sum(); // 汇总错误手机号 final List errorPhoneNumberList = new ArrayList<>(); - phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrorPhoneNumberList())); + phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrors())); // 汇总手机号校验结果 - return PhoneNumberCheckBo.builder().checkTotalNum(checkTotalNum).checkErrorNum(checkErrorNum).checkSuccessNum(checkSuccessNum).checkErrorPhoneNumberList(errorPhoneNumberList).build(); + return PhoneNumberCheckBo.builder().total(checkTotalNum).error(checkErrorNum).success(checkSuccessNum).checkErrors(errorPhoneNumberList).build(); } } diff --git a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java index 40fa226..bec993c 100644 --- a/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapJobExecutor.java @@ -60,8 +60,8 @@ public class TestMapJobExecutor extends AbstractMapExecutor { @Getter private enum MapEnum { LAST_MAP(null, null), - MONTH_MAP(new QuarterMap(), LAST_MAP), - MAP_ROOT(new RootMap(), MONTH_MAP), + QUARTER_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), QUARTER_MAP), ; private final Map map; diff --git a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java index 7fc4d9e..d3fbbe9 100644 --- a/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestMapReduceJobExecutor.java @@ -1,6 +1,5 @@ package com.example.snailjob.job; -import cn.hutool.core.util.ByteUtil; import com.aizuda.snailjob.client.job.core.MapHandler; import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; @@ -75,8 +74,8 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor { @Getter private enum MapEnum { LAST_MAP(null, null), - MONTH_MAP(new QuarterMap(), LAST_MAP), - MAP_ROOT(new RootMap(), MONTH_MAP), + QUARTER_MAP(new QuarterMap(), LAST_MAP), + MAP_ROOT(new RootMap(), QUARTER_MAP), ; private final Map map; diff --git a/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java b/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java new file mode 100644 index 0000000..fe4d855 --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMyMapExecutor.java @@ -0,0 +1,33 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author opensnail + * @date 2024-07-07 12:06:57 + * @since sj_1.1.0 + */ +@Component +public class TestMyMapExecutor extends AbstractMapExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) { + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP"); + } + + return ExecuteResult.success(mapArgs.getMapResult()); + } + +} diff --git a/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java b/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java new file mode 100644 index 0000000..8ba41dc --- /dev/null +++ b/src/main/java/com/example/snailjob/job/TestMyMapReduceExecutor.java @@ -0,0 +1,45 @@ +package com.example.snailjob.job; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.google.common.collect.Lists; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author opensnail + * @date 2024-07-07 12:06:57 + * @since sj_1.1.0 + */ +@Component +public class TestMyMapReduceExecutor extends AbstractMapReduceExecutor { + + @Override + public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) { + if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) { + return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP"); + } + + return ExecuteResult.success(mapArgs.getMapResult()); + } + + @Override + protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) { + List mapResult = (List) reduceArgs.getMapResult(); + return ExecuteResult.success(mapResult.stream().mapToInt(Integer::parseInt).sum()); + } + + @Override + protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) { + List reduces = (List) mergeReduceArgs.getReduces(); + return ExecuteResult.success(reduces.stream().mapToInt(Integer::parseInt).sum()); + } + + +} diff --git a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java index 95a82af..540b6ea 100644 --- a/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java +++ b/src/main/java/com/example/snailjob/job/TestWorkflowAnnoJobExecutor.java @@ -26,6 +26,7 @@ public class TestWorkflowAnnoJobExecutor { int i = new Random().nextInt(1000); jobArgs.appendContext("name" + i, "小蜗牛" + i); jobArgs.appendContext("age", i); + SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext()); return ExecuteResult.success(failOrderPo); } diff --git a/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java index 62bf08f..ddceecd 100644 --- a/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java +++ b/src/main/java/com/example/snailjob/listener/PhoneNumberExcelListener.java @@ -6,10 +6,8 @@ import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.event.AnalysisEventListener; import com.example.snailjob.bo.PhoneNumberBo; import com.example.snailjob.bo.PhoneNumberCheckBo; -import com.example.snailjob.po.PhoneNumberPo; import lombok.extern.slf4j.Slf4j; -import java.time.LocalDateTime; import java.util.Map; /** @@ -35,7 +33,7 @@ public class PhoneNumberExcelListener extends AnalysisEventListener headMap, AnalysisContext context) { - this.phoneNumberCheckBo.setCheckTotalNum(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1))); + this.phoneNumberCheckBo.setTotal(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1))); } @Override @@ -55,12 +53,12 @@ public class PhoneNumberExcelListener extends AnalysisEventListener