Merge branch 'refs/heads/dev_map_reduce'

# Conflicts:
#	pom.xml
This commit is contained in:
byteblogs168 2024-07-07 22:48:42 +08:00
commit 18e02accea
20 changed files with 1013 additions and 9 deletions

View File

@ -12,4 +12,12 @@ CREATE TABLE fail_order
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
);
);
-- 手机号表
CREATE TABLE `phone_number` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`phone_number` varchar(20) NOT NULL DEFAULT '' COMMENT '手机号',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='手机号表'

16
pom.xml
View File

@ -44,17 +44,17 @@
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-starter</artifactId>
<version>1.1.0-beta1</version>
<version>1.1.0-beta2</version>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-retry-core</artifactId>
<version>1.1.0-beta1</version>
<version>1.1.0-beta2</version>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-job-core</artifactId>
<version>1.1.0-beta1</version>
<version>1.1.0-beta2</version>
</dependency>
<dependency>
<groupId>com.googlecode.aviator</groupId>
@ -111,6 +111,16 @@
<artifactId>okhttp</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,18 @@
package com.example.snailjob.bo;
import com.alibaba.excel.annotation.ExcelProperty;
import lombok.Data;
/**
* excel表格手机号BO
*
* @author JiChenWang
* @since 2024/6/27 20:28
*/
@Data
public class PhoneNumberBo {
@ExcelProperty(value = "手机号码", index = 0)
private String phoneNumber;
}

View File

@ -0,0 +1,39 @@
package com.example.snailjob.bo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* 手机号检测BO
*
* @author JiChenWang
* @since 2024/6/27 20:50
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PhoneNumberCheckBo {
@Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long total = 0L;
@Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long error = 0L;
@Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long success = 0L;
@Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
private List<String> checkErrors = new ArrayList<>();
// @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
// private List<PhoneNumberPo> checkSuccessPhoneNumberList = new ArrayList<>();
}

View File

@ -0,0 +1,15 @@
package com.example.snailjob.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.snailjob.po.PhoneNumberPo;
import org.springframework.stereotype.Repository;
/**
* 手机号mapper
*
* @author JiChenWang
* @since 2024/6/30 11:55
*/
@Repository
public interface PhoneNumberBaseMapper extends BaseMapper<PhoneNumberPo> {
}

View File

@ -0,0 +1,34 @@
package com.example.snailjob.dao;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.snailjob.po.PhoneNumberPo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* TODO
*
* @author JiChenWang
* @since 2024/6/30 11:58
*/
@Service
public class PhoneNumberDao extends ServiceImpl<PhoneNumberBaseMapper, PhoneNumberPo> {
@Autowired
private PhoneNumberBaseMapper phoneNumberBaseMapper;
/**
* 批量保存手机号信息
*
* @param phoneNumberPoList 手机号po列表
* @return Boolean 保存成功标识true-成功false-失败
* @author JichenWang
* @since 2024/6/30 12:03
*/
public Boolean insertBatch (List<PhoneNumberPo> phoneNumberPoList) {
return this.saveBatch(phoneNumberPoList);
}
}

View File

@ -0,0 +1,40 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-06-26
*/
@Component
@JobExecutor(name = "testAnnoMapJobExecutor")
public class TestAnnoMapJobExecutor {
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
System.out.println(mapArgs);
System.out.println(mapArgs.getWfContext());
return mapHandler.doMap(Lists.newArrayList("1", "2", "3"), "MONTH_MAP");
}
@MapExecutor(taskName = "MONTH_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
System.out.println(mapArgs);
System.out.println(mapArgs.getWfContext());
return ExecuteResult.success(123);
}
@MapExecutor(taskName = "LAST_MAP")
public ExecuteResult lastMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
System.out.println(mapArgs);
System.out.println(mapArgs.getWfContext());
return ExecuteResult.success();
}
}

View File

@ -0,0 +1,69 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor;
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author: opensnail
* @date : 2024-06-26
*/
@Component
@JobExecutor(name = "testAnnoMapReduceJobExecutor")
public class TestAnnoMapReduceJobExecutor {
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
System.out.println(mapArgs);
System.out.println(mapArgs.getWfContext());
mapArgs.appendContext("name", "zsg");
return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4", "5", "6"), "MONTH_MAP");
}
@MapExecutor(taskName = "MONTH_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
System.out.println(mapArgs);
System.out.println(mapArgs.getWfContext());
mapArgs.appendContext("age", "111");
return ExecuteResult.success(Integer.parseInt((String) mapArgs.getMapResult()) * 2);
}
@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) {
System.out.println(mapReduceArgs);
System.out.println(mapReduceArgs.getWfContext());
return ExecuteResult.success(
mapReduceArgs.getMapResult()
.stream()
.map(String::valueOf)
.map(Integer::parseInt)
.mapToInt(Integer::intValue).sum());
}
/**
* 当只有一个reduce任务时无此执行器
*/
@MergeReduceExecutor
public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
System.out.println(mergeReduceArgs);
System.out.println(mergeReduceArgs.getWfContext());
return ExecuteResult.success(
mergeReduceArgs.getReduces()
.stream()
.map(String::valueOf)
.map(Integer::parseInt)
.mapToInt(Integer::intValue).sum());
}
}

View File

@ -0,0 +1,106 @@
package com.example.snailjob.job;
import cn.hutool.core.util.ObjectUtil;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.alibaba.excel.EasyExcel;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.dao.PhoneNumberDao;
import com.example.snailjob.listener.PhoneNumberExcelListener;
import com.example.snailjob.po.PhoneNumberPo;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* 解析excel文件中的手机号码并将错误的手机号分片入库
*
* @author JiChenWang
* @since 2024/6/30 10:37
*/
@Slf4j
@Component
@JobExecutor(name = "testExcelAnalyseMapJobExecutor")
public class TestExcelAnalyseMapJobExecutor {
private final Integer BATCH_SIZE = 100;
@Autowired
private PhoneNumberDao phoneNumberDao;
/**
* 读取手机号文件总行数并进行分组
* 比如文档中的手机号总量为307条每100条一个分组分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}]
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/30 10:48
*/
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler<List<Long>> mapHandler) {
List<List<Long>> ranges = null;
// 先获取文件总行数便于分组
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();
// 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE);
} catch (Exception e) {
log.error("文件读取异常", e.getMessage());
}
return mapHandler.doMap(ranges, "TWO_MAP");
}
/**
*
*
* @param mapArgs
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/30 11:05
*/
@MapExecutor(taskName = "TWO_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
log.info("本次要处理的区间为:{}", mapResult);
// 按照处理区间去读取数据
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync();
} catch (Exception e) {
log.error("文件读取异常:", e.getMessage());
}
// 如果正确手机号不为空则入库
if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckErrors())) {
List<PhoneNumberPo> numberPos = new ArrayList<>();
for (String no : phoneNumberCheckBo.getCheckErrors()) {
PhoneNumberPo numberPo = new PhoneNumberPo();
numberPo.setPhoneNumber(no);
numberPos.add(numberPo);
}
phoneNumberDao.insertBatch(numberPos);
}
return ExecuteResult.success(phoneNumberCheckBo.getError());
}
}

View File

@ -0,0 +1,132 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor;
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.alibaba.excel.EasyExcel;
import com.alibaba.fastjson.JSONArray;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.listener.PhoneNumberExcelListener;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* 解析校验Excel中的手机号统计出错手机号数量并返回错误手机号详情
*
* @author JichenWang
* @since 2024/6/27 19:52
*/
@Slf4j
@Component
@JobExecutor(name = "TestExcelAnalyseMapReduceJobExecutor")
public class TestExcelAnalyseMapReduceJobExecutor {
private final Integer BATCH_SIZE = 100;
/**
* 处理手机号文件信息将文档中的手机号进行分组
* 比如文档中的手机号总量为307条每100条一个分组分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}]
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/29 14:03
*/
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler<List<Long>> mapHandler) {
List<List<Long>> ranges = null;
// 先获取文件总行数便于分组
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();
// 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE);
} catch (Exception e) {
log.error("文件读取异常", e.getMessage());
}
return mapHandler.doMap(ranges, "TWO_MAP");
}
/**
* 处理每个分组内容如读取{0,99}区间的手机号并解析
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/29 14:04
*/
@MapExecutor(taskName = "TWO_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
log.info("本次要处理的区间为:{}", mapResult);
// 按照处理区间去读取数据
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync();
} catch (Exception e) {
log.error("文件读取异常:", e.getMessage());
}
return ExecuteResult.success(phoneNumberCheckBo);
}
@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) {
log.info("WJC Test reduceExecute, 参数为:{}", mapReduceArgs.getMapResult());
final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mapReduceArgs.getMapResult().toString());
return ExecuteResult.success(phoneNumberCheckBo);
}
/**
* 当只有一个reduce任务时无此执行器
*/
@MergeReduceExecutor
public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mergeReduceArgs.getReduces().toString());
log.info("WJC 最终检测结果为:{}", phoneNumberCheckBo);
return ExecuteResult.success(phoneNumberCheckBo);
}
/**
* 构造汇总手机号校验结果BO
*
* @param phoneNumberCheckBoStr 手机号校验BO字符串
* @return PhoneNumberCheckBo 汇总手机号校验结果BO
* @author JichenWang
* @since 2024/6/29 14:24
*/
private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) {
final List<PhoneNumberCheckBo> phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class);
// 获取校验总数
final long checkTotalNum = phoneNumberCheckBoList.get(0).getTotal();
// 汇总校验失败数量
final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getError).sum();
// 汇总校验成功数量
final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getSuccess).sum();
// 汇总错误手机号
final List<String> errorPhoneNumberList = new ArrayList<>();
phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrors()));
// 汇总手机号校验结果
return PhoneNumberCheckBo.builder().total(checkTotalNum).error(checkErrorNum).success(checkSuccessNum).checkErrors(errorPhoneNumberList).build();
}
}

View File

@ -0,0 +1,161 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
/**
* 以下是一个统计某电商公司商家的一年的营业额的计算过程
*
* @author: opensnail
* @date : 2024-06-13
* @since : sj_1.1.0
*/
@Component
public class TestMapJobExecutor extends AbstractMapExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName());
if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) {
Map map = mapEnum.getMap();
MapEnum nextMap = mapEnum.getNextMap();
String nextName = null;
if (Objects.nonNull(nextMap)) {
nextName = nextMap.name();
}
return mapHandler.doMap(map.map(mapArgs), nextName);
}
// 未找到map的任务则说明当前需要进行处理
JsonNode json = JsonUtil.toJson(mapArgs.getMapResult());
SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", json);
// 获取最后一次map的信息.
SubTask subTask = JsonUtil.parseObject(json.toString(), SubTask.class);
// 此处可以统计数据或者做其他的事情
// 模拟统计营业额
int turnover = new Random().nextInt(1000000);
return ExecuteResult.success(turnover);
}
@Getter
private enum MapEnum {
LAST_MAP(null, null),
QUARTER_MAP(new QuarterMap(), LAST_MAP),
MAP_ROOT(new RootMap(), QUARTER_MAP),
;
private final Map map;
private final MapEnum nextMap;
MapEnum(Map map, MapEnum nextMap) {
this.map = map;
this.nextMap = nextMap;
}
public static MapEnum ofMap(String taskName) {
for (final MapEnum value : MapEnum.values()) {
if (value.name().equals(taskName)) {
return value;
}
}
return null;
}
}
private static class RootMap implements Map {
@Override
public List map(MapArgs args) {
// 第一层按照商家ID分片
// 假设总共有一百万商家 每个分片处理10万商家
List<List<Long>> ranges = doSharding(1L, 1000000L, 100000);
return ranges;
}
}
public static class QuarterMap implements Map {
@Override
public List map(MapArgs args) {
// 第二层按照月分片
// 4个季度
JsonNode json = JsonUtil.toJson(args.getMapResult());
List<TestMapReduceJobExecutor.QuarterMap.SubTask> list = new ArrayList<>();
for (JsonNode jsonNode : json) {
long id = jsonNode.asLong();
for (int i = 1; i <= 4; i++) {
list.add(new TestMapReduceJobExecutor.QuarterMap.SubTask(id, i));
}
}
return list;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SubTask {
// 商家id
private Long id;
// 需要处理的月份
private Integer quarter;
}
}
interface Map{
List<Object> map(MapArgs args);
}
public static List<List<Long>> doSharding(Long min, Long max, int interval) {
if (max.equals(min)) {
return new ArrayList<>();
}
// 总数量
long total = max - min + 1;
// 计算分页总页数
Long totalPages = total / interval;
if (total % interval != 0) {
totalPages++;
}
List<List<Long>> partitions = new ArrayList<>();
for (Long index = 0L; index < totalPages; index++) {
// 计算起始点 因为是从min开始所以每次需要加上一个min
Long start = index * interval + min;
// 结算结束点 若最后一个 start + interval - 1 > max 取max
// 减一是保证 [start, end] 都是闭区间
Long end = Math.min(start + interval - 1, max);
partitions.add(Lists.newArrayList(start, end));
}
return partitions;
}
}

View File

@ -0,0 +1,175 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.example.snailjob.job.TestMapReduceJobExecutor.QuarterMap.SubTask;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
/**
* 以下是一个统计某电商公司商家的一年的营业额的计算过程
*
* @author: opensnail
* @date : 2024-06-13
* @since : sj_1.1.0
*/
@Component
public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
MapEnum mapEnum = MapEnum.ofMap(mapArgs.getTaskName());
if (Objects.nonNull(mapEnum) && Objects.nonNull(mapEnum.getMap())) {
Map map = mapEnum.getMap();
MapEnum nextMap = mapEnum.getNextMap();
String nextName = null;
if (Objects.nonNull(nextMap)) {
nextName = nextMap.name();
}
return mapHandler.doMap(map.map(mapArgs), nextName);
}
// 未找到map的任务则说明当前需要进行处理
JsonNode json = JsonUtil.toJson(mapArgs.getMapResult());
SnailJobLog.LOCAL.info("LAST_MAP 开始执行 mapResult:{}", json);
// 获取最后一次map的信息.
SubTask subTask = JsonUtil.parseObject(json.toString(), SubTask.class);
// 此处可以统计数据或者做其他的事情
// 模拟统计营业额
int turnover = new Random().nextInt(1000000);
return ExecuteResult.success(turnover);
}
@Override
protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) {
return ExecuteResult.success(reduceArgs.getMapResult());
}
@Override
protected ExecuteResult doMergeReduceExecute(MergeReduceArgs reduceArgs) {
List<?> reduces = reduceArgs.getReduces();
SnailJobLog.LOCAL.info("merge reduce {}", reduces);
return ExecuteResult.success(111);
}
@Getter
private enum MapEnum {
LAST_MAP(null, null),
QUARTER_MAP(new QuarterMap(), LAST_MAP),
MAP_ROOT(new RootMap(), QUARTER_MAP),
;
private final Map map;
private final MapEnum nextMap;
MapEnum(Map map, MapEnum nextMap) {
this.map = map;
this.nextMap = nextMap;
}
public static MapEnum ofMap(String taskName) {
for (final MapEnum value : MapEnum.values()) {
if (value.name().equals(taskName)) {
return value;
}
}
return null;
}
}
private static class RootMap implements Map {
@Override
public List map(MapArgs args) {
// 第一层按照商家ID分片
// 假设总共有一百万商家 每个分片处理10万商家
List<List<Long>> ranges = doSharding(1L, 1000000L, 100000);
return ranges;
}
}
public static class QuarterMap implements Map {
@Override
public List map(MapArgs args) {
// 第二层按照月分片
// 4个季度
JsonNode json = JsonUtil.toJson(args.getMapResult());
List<SubTask> list = new ArrayList<>();
for (JsonNode jsonNode : json) {
long id = jsonNode.asLong();
for (int i = 1; i <= 4; i++) {
list.add(new SubTask(id, i));
}
}
return list;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SubTask {
// 商家id
private Long id;
// 需要处理的月份
private Integer quarter;
}
}
interface Map{
List<Object> map(MapArgs args);
}
public static List<List<Long>> doSharding(Long min, Long max, int interval) {
if (max.equals(min)) {
return new ArrayList<>();
}
// 总数量
long total = max - min + 1;
// 计算分页总页数
Long totalPages = total / interval;
if (total % interval != 0) {
totalPages++;
}
List<List<Long>> partitions = new ArrayList<>();
for (Long index = 0L; index < totalPages; index++) {
// 计算起始点 因为是从min开始所以每次需要加上一个min
Long start = index * interval + min;
// 结算结束点 若最后一个 start + interval - 1 > max 取max
// 减一是保证 [start, end] 都是闭区间
Long end = Math.min(start + interval - 1, max);
partitions.add(Lists.newArrayList(start, end));
}
return partitions;
}
}

View File

@ -0,0 +1,33 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author opensnail
* @date 2024-07-07 12:06:57
* @since sj_1.1.0
*/
@Component
public class TestMyMapExecutor extends AbstractMapExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) {
return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP");
}
return ExecuteResult.success(mapArgs.getMapResult());
}
}

View File

@ -0,0 +1,45 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author opensnail
* @date 2024-07-07 12:06:57
* @since sj_1.1.0
*/
@Component
public class TestMyMapReduceExecutor extends AbstractMapReduceExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) {
return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP");
}
return ExecuteResult.success(mapArgs.getMapResult());
}
@Override
protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) {
List<String> mapResult = (List<String>) reduceArgs.getMapResult();
return ExecuteResult.success(mapResult.stream().mapToInt(Integer::parseInt).sum());
}
@Override
protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
List<String> reduces = (List<String>) mergeReduceArgs.getReduces();
return ExecuteResult.success(reduces.stream().mapToInt(Integer::parseInt).sum());
}
}

View File

@ -17,7 +17,7 @@ import org.springframework.stereotype.Component;
public class TestPartitionJobExecutor {
public ExecuteResult jobExecute(JobArgs jobArgs) {
if (jobArgs.getArgsStr().equals("1")) {
if (jobArgs.getJobParams().equals("1")) {
throw new NullPointerException("分区空指针抛异常了");
}
FailOrderPo failOrderPo = new FailOrderPo();

View File

@ -8,6 +8,8 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import com.example.snailjob.po.FailOrderPo;
import org.springframework.stereotype.Component;
import java.util.Random;
/**
* @author www.byteblogs.com
* @date 2023-09-28 22:54:07
@ -18,11 +20,13 @@ import org.springframework.stereotype.Component;
public class TestWorkflowAnnoJobExecutor {
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
// for (int i = 0; i < 30; i++) {
// SnailJobLog.REMOTE.info("任务执行开始. [{}]", i + "" + JsonUtil.toJsonString(jobArgs));
// }
FailOrderPo failOrderPo = new FailOrderPo();
failOrderPo.setOrderId("xiaowoniu");
// 测试上下文传递
int i = new Random().nextInt(1000);
jobArgs.appendContext("name" + i, "小蜗牛" + i);
jobArgs.appendContext("age", i);
SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
return ExecuteResult.success(failOrderPo);
}

View File

@ -0,0 +1,76 @@
package com.example.snailjob.listener;
import cn.hutool.core.lang.Validator;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* 手机号excel解析 Listener
*
* @author JiChenWang
* @since 2024/6/27 20:38
*/
@Slf4j
public class PhoneNumberExcelListener extends AnalysisEventListener<PhoneNumberBo> {
/** 手机号校验BO **/
private PhoneNumberCheckBo phoneNumberCheckBo;
/** 是否第一次读取Excel **/
private Boolean firstReadStatus = false;
/** 读取批次大小 **/
private Integer batchSize = 100;
/** 已读取的数据数量 **/
private Integer cacheSize = 0;
@Override
public void invokeHeadMap(Map<Integer, String> headMap, AnalysisContext context) {
this.phoneNumberCheckBo.setTotal(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1)));
}
@Override
public void invoke(PhoneNumberBo phoneNumberBo, AnalysisContext context) {
// 如果是第一次读该文件已读取的数量已超过读取批次大小直接返回
if (firstReadStatus || cacheSize >= batchSize) {
return;
}
cacheSize++;
if (ObjectUtil.isEmpty(phoneNumberBo.getPhoneNumber())) {
return;
}
// 校验手机号
log.info("本次校验的手机号为: {}", phoneNumberBo.getPhoneNumber());
Boolean validateStatus = Validator.isMobile(phoneNumberBo.getPhoneNumber());
if (validateStatus) {
this.phoneNumberCheckBo.setSuccess(this.phoneNumberCheckBo.getSuccess() + 1);
// final PhoneNumberPo phoneNumberPo = PhoneNumberPo.builder().phoneNumber(phoneNumberBo.getPhoneNumber()).createTime(LocalDateTime.now()).build();
// this.phoneNumberCheckBo.getCheckSuccessPhoneNumberList().add(phoneNumberPo);
} else {
this.phoneNumberCheckBo.setError(this.phoneNumberCheckBo.getError() + 1);
this.phoneNumberCheckBo.getCheckErrors().add(phoneNumberBo.getPhoneNumber());
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
}
public PhoneNumberExcelListener(PhoneNumberCheckBo phoneNumberCheckBo, Boolean firstReadStatus, Integer batchSize) {
this.phoneNumberCheckBo = phoneNumberCheckBo;
this.firstReadStatus = firstReadStatus;
this.batchSize = batchSize;
}
}

View File

@ -0,0 +1,39 @@
package com.example.snailjob.po;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* phone_number
*
* @author JiChenWang
* @since 2024/6/30 11:48
*/
@TableName("phone_number")
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class PhoneNumberPo {
/**
* 主键
*/
private Long id;
/**
* 手机号
*/
private String phoneNumber;
/**
* 创建时间
*/
private LocalDateTime createTime;
}

View File

@ -9,7 +9,7 @@ spring:
active: dev
datasource:
name: snail_job
url: jdbc:mysql://localhost:3306/snail_job_260?useSSL=false&characterEncoding=utf8&useUnicode=true
url: jdbc:mysql://localhost:3306/snail_job?useSSL=false&characterEncoding=utf8&useUnicode=true
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource

Binary file not shown.