!3 mapReduce手机号文件解析Demo提交

* 手机号excel文件校验并入库--map 测试
* 手机号excel文件校验--map reduce 测试
This commit is contained in:
jcwang812 2024-07-01 01:07:51 +00:00 committed by opensnail
parent ccfdc14f74
commit 53306f0eac
11 changed files with 472 additions and 1 deletions

View File

@ -12,4 +12,12 @@ CREATE TABLE fail_order
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
); );
-- 手机号表
CREATE TABLE `phone_number` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`phone_number` varchar(20) NOT NULL DEFAULT '' COMMENT '手机号',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='手机号表'

10
pom.xml
View File

@ -111,6 +111,16 @@
<artifactId>okhttp</artifactId> <artifactId>okhttp</artifactId>
<version>4.2.0</version> <version>4.2.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,18 @@
package com.example.snailjob.bo;
import com.alibaba.excel.annotation.ExcelProperty;
import lombok.Data;
/**
* excel表格手机号BO
*
* @author JiChenWang
* @since 2024/6/27 20:28
*/
@Data
public class PhoneNumberBo {
@ExcelProperty(value = "手机号码", index = 0)
private String phoneNumber;
}

View File

@ -0,0 +1,40 @@
package com.example.snailjob.bo;
import com.example.snailjob.po.PhoneNumberPo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* 手机号检测BO
*
* @author JiChenWang
* @since 2024/6/27 20:50
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PhoneNumberCheckBo {
@Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkTotalNum = 0L;
@Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkErrorNum = 0L;
@Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkSuccessNum = 0L;
@Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
private List<String> checkErrorPhoneNumberList = new ArrayList<>();
@Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
private List<PhoneNumberPo> checkSuccessPhoneNumberList = new ArrayList<>();
}

View File

@ -0,0 +1,15 @@
package com.example.snailjob.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.snailjob.po.PhoneNumberPo;
import org.springframework.stereotype.Repository;
/**
* 手机号mapper
*
* @author JiChenWang
* @since 2024/6/30 11:55
*/
@Repository
public interface PhoneNumberBaseMapper extends BaseMapper<PhoneNumberPo> {
}

View File

@ -0,0 +1,34 @@
package com.example.snailjob.dao;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.snailjob.po.PhoneNumberPo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* TODO
*
* @author JiChenWang
* @since 2024/6/30 11:58
*/
@Service
public class PhoneNumberDao extends ServiceImpl<PhoneNumberBaseMapper, PhoneNumberPo> {
@Autowired
private PhoneNumberBaseMapper phoneNumberBaseMapper;
/**
* 批量保存手机号信息
*
* @param phoneNumberPoList 手机号po列表
* @return Boolean 保存成功标识true-成功false-失败
* @author JichenWang
* @since 2024/6/30 12:03
*/
public Boolean insertBatch (List<PhoneNumberPo> phoneNumberPoList) {
return this.saveBatch(phoneNumberPoList);
}
}

View File

@ -0,0 +1,97 @@
package com.example.snailjob.job;
import cn.hutool.core.util.ObjectUtil;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.alibaba.excel.EasyExcel;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.dao.PhoneNumberDao;
import com.example.snailjob.listener.PhoneNumberExcelListener;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.util.List;
/**
* 解析手机号excel文件并将正确的手机号分片入库
*
* @author JiChenWang
* @since 2024/6/30 10:37
*/
@Slf4j
@Component
@JobExecutor(name = "testExcelAnalyseMapJobExecutor")
public class TestExcelAnalyseMapJobExecutor {
private final Integer BATCH_SIZE = 100;
@Autowired
private PhoneNumberDao phoneNumberDao;
/**
* 读取手机号文件总行数并进行分组
* 比如文档中的手机号总量为307条每100条一个分组分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}]
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/30 10:48
*/
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
List<List<Long>> ranges = null;
// 先获取文件总行数便于分组
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();
// 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE);
} catch (Exception e) {
log.error("文件读取异常", e.getMessage());
}
return mapHandler.doMap(ranges, "MONTH_MAP");
}
/**
*
*
* @param mapArgs
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/30 11:05
*/
@MapExecutor(taskName = "MONTH_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
log.info("本次要处理的区间为:{}", mapResult);
// 按照处理区间去读取数据
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync();
} catch (Exception e) {
log.error("文件读取异常:", e.getMessage());
}
// 如果正确手机号不为空则入库
if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckSuccessPhoneNumberList())) {
phoneNumberDao.insertBatch(phoneNumberCheckBo.getCheckSuccessPhoneNumberList());
}
return ExecuteResult.success(phoneNumberCheckBo.getCheckSuccessNum());
}
}

View File

@ -0,0 +1,132 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor;
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.alibaba.excel.EasyExcel;
import com.alibaba.fastjson.JSONArray;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.listener.PhoneNumberExcelListener;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* 解析校验Excel中的手机号统计出错手机号数量并返回错误手机号详情
*
* @author JichenWang
* @since 2024/6/27 19:52
*/
@Slf4j
@Component
@JobExecutor(name = "TestExcelAnalyseMapReduceJobExecutor")
public class TestExcelAnalyseMapReduceJobExecutor {
private final Integer BATCH_SIZE = 100;
/**
* 处理手机号文件信息将文档中的手机号进行分组
* 比如文档中的手机号总量为307条每100条一个分组分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}]
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/29 14:03
*/
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
List<List<Long>> ranges = null;
// 先获取文件总行数便于分组
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();
// 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE);
} catch (Exception e) {
log.error("文件读取异常", e.getMessage());
}
return mapHandler.doMap(ranges, "MONTH_MAP");
}
/**
* 处理每个分组内容如读取{0,99}区间的手机号并解析
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/29 14:04
*/
@MapExecutor(taskName = "MONTH_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
log.info("本次要处理的区间为:{}", mapResult);
// 按照处理区间去读取数据
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync();
} catch (Exception e) {
log.error("文件读取异常:", e.getMessage());
}
return ExecuteResult.success(phoneNumberCheckBo);
}
@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) {
log.info("WJC Test reduceExecute, 参数为:{}", mapReduceArgs.getMapResult());
final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mapReduceArgs.getMapResult().toString());
return ExecuteResult.success(phoneNumberCheckBo);
}
/**
* 当只有一个reduce任务时无此执行器
*/
@MergeReduceExecutor
public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mergeReduceArgs.getReduces().toString());
log.info("WJC 最终检测结果为:{}", phoneNumberCheckBo);
return ExecuteResult.success(phoneNumberCheckBo);
}
/**
* 构造汇总手机号校验结果BO
*
* @param phoneNumberCheckBoStr 手机号校验BO字符串
* @return PhoneNumberCheckBo 汇总手机号校验结果BO
* @author JichenWang
* @since 2024/6/29 14:24
*/
private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) {
final List<PhoneNumberCheckBo> phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class);
// 获取校验总数
final long checkTotalNum = phoneNumberCheckBoList.get(0).getCheckTotalNum();
// 汇总校验失败数量
final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckErrorNum).sum();
// 汇总校验成功数量
final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckSuccessNum).sum();
// 汇总错误手机号
final List<String> errorPhoneNumberList = new ArrayList<>();
phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrorPhoneNumberList()));
// 汇总手机号校验结果
return PhoneNumberCheckBo.builder().checkTotalNum(checkTotalNum).checkErrorNum(checkErrorNum).checkSuccessNum(checkSuccessNum).checkErrorPhoneNumberList(errorPhoneNumberList).build();
}
}

View File

@ -0,0 +1,78 @@
package com.example.snailjob.listener;
import cn.hutool.core.lang.Validator;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.po.PhoneNumberPo;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Map;
/**
* 手机号excel解析 Listener
*
* @author JiChenWang
* @since 2024/6/27 20:38
*/
@Slf4j
public class PhoneNumberExcelListener extends AnalysisEventListener<PhoneNumberBo> {
/** 手机号校验BO **/
private PhoneNumberCheckBo phoneNumberCheckBo;
/** 是否第一次读取Excel **/
private Boolean firstReadStatus = false;
/** 读取批次大小 **/
private Integer batchSize = 100;
/** 已读取的数据数量 **/
private Integer cacheSize = 0;
@Override
public void invokeHeadMap(Map<Integer, String> headMap, AnalysisContext context) {
this.phoneNumberCheckBo.setCheckTotalNum(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1)));
}
@Override
public void invoke(PhoneNumberBo phoneNumberBo, AnalysisContext context) {
// 如果是第一次读该文件已读取的数量已超过读取批次大小直接返回
if (firstReadStatus || cacheSize >= batchSize) {
return;
}
cacheSize++;
if (ObjectUtil.isEmpty(phoneNumberBo.getPhoneNumber())) {
return;
}
// 校验手机号
log.info("本次校验的手机号为: {}", phoneNumberBo.getPhoneNumber());
Boolean validateStatus = Validator.isMobile(phoneNumberBo.getPhoneNumber());
if (validateStatus) {
this.phoneNumberCheckBo.setCheckSuccessNum(this.phoneNumberCheckBo.getCheckSuccessNum() + 1);
final PhoneNumberPo phoneNumberPo = PhoneNumberPo.builder().phoneNumber(phoneNumberBo.getPhoneNumber()).createTime(LocalDateTime.now()).build();
this.phoneNumberCheckBo.getCheckSuccessPhoneNumberList().add(phoneNumberPo);
} else {
this.phoneNumberCheckBo.setCheckErrorNum(this.phoneNumberCheckBo.getCheckErrorNum() + 1);
this.phoneNumberCheckBo.getCheckErrorPhoneNumberList().add(phoneNumberBo.getPhoneNumber());
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
}
public PhoneNumberExcelListener(PhoneNumberCheckBo phoneNumberCheckBo, Boolean firstReadStatus, Integer batchSize) {
this.phoneNumberCheckBo = phoneNumberCheckBo;
this.firstReadStatus = firstReadStatus;
this.batchSize = batchSize;
}
}

View File

@ -0,0 +1,39 @@
package com.example.snailjob.po;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* phone_number
*
* @author JiChenWang
* @since 2024/6/30 11:48
*/
@TableName("phone_number")
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class PhoneNumberPo {
/**
* 主键
*/
private Long id;
/**
* 手机号
*/
private String phoneNumber;
/**
* 创建时间
*/
private LocalDateTime createTime;
}

Binary file not shown.