优化mapreduce的demo

This commit is contained in:
byteblogs168 2024-07-07 13:46:01 +08:00
parent 53306f0eac
commit 92ee029073
10 changed files with 122 additions and 38 deletions

View File

@ -1,6 +1,5 @@
package com.example.snailjob.bo; package com.example.snailjob.bo;
import com.example.snailjob.po.PhoneNumberPo;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
@ -23,18 +22,18 @@ import java.util.List;
public class PhoneNumberCheckBo { public class PhoneNumberCheckBo {
@Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE) @Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkTotalNum = 0L; private Long total = 0L;
@Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE) @Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkErrorNum = 0L; private Long error = 0L;
@Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE) @Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkSuccessNum = 0L; private Long success = 0L;
@Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE) @Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
private List<String> checkErrorPhoneNumberList = new ArrayList<>(); private List<String> checkErrors = new ArrayList<>();
@Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE) // @Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
private List<PhoneNumberPo> checkSuccessPhoneNumberList = new ArrayList<>(); // private List<PhoneNumberPo> checkSuccessPhoneNumberList = new ArrayList<>();
} }

View File

@ -11,16 +11,18 @@ import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo; import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.dao.PhoneNumberDao; import com.example.snailjob.dao.PhoneNumberDao;
import com.example.snailjob.listener.PhoneNumberExcelListener; import com.example.snailjob.listener.PhoneNumberExcelListener;
import com.example.snailjob.po.PhoneNumberPo;
import lombok.Cleanup; import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* 解析手机号excel文件并将正确的手机号分片入库 * 解析excel文件中的手机号码并将错误的手机号分片入库
* *
* @author JiChenWang * @author JiChenWang
* @since 2024/6/30 10:37 * @since 2024/6/30 10:37
@ -44,7 +46,7 @@ public class TestExcelAnalyseMapJobExecutor {
* @since 2024/6/30 10:48 * @since 2024/6/30 10:48
*/ */
@MapExecutor @MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler<List<Long>> mapHandler) {
List<List<Long>> ranges = null; List<List<Long>> ranges = null;
// 先获取文件总行数便于分组 // 先获取文件总行数便于分组
try { try {
@ -54,11 +56,11 @@ public class TestExcelAnalyseMapJobExecutor {
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();
// 设置区间范围 // 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE);
} catch (Exception e) { } catch (Exception e) {
log.error("文件读取异常", e.getMessage()); log.error("文件读取异常", e.getMessage());
} }
return mapHandler.doMap(ranges, "MONTH_MAP"); return mapHandler.doMap(ranges, "TWO_MAP");
} }
@ -70,7 +72,7 @@ public class TestExcelAnalyseMapJobExecutor {
* @author JichenWang * @author JichenWang
* @since 2024/6/30 11:05 * @since 2024/6/30 11:05
*/ */
@MapExecutor(taskName = "MONTH_MAP") @MapExecutor(taskName = "TWO_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) { public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间 // 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult(); final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
@ -87,11 +89,18 @@ public class TestExcelAnalyseMapJobExecutor {
} }
// 如果正确手机号不为空则入库 // 如果正确手机号不为空则入库
if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckSuccessPhoneNumberList())) { if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckErrors())) {
phoneNumberDao.insertBatch(phoneNumberCheckBo.getCheckSuccessPhoneNumberList());
List<PhoneNumberPo> numberPos = new ArrayList<>();
for (String no : phoneNumberCheckBo.getCheckErrors()) {
PhoneNumberPo numberPo = new PhoneNumberPo();
numberPo.setPhoneNumber(no);
numberPos.add(numberPo);
}
phoneNumberDao.insertBatch(numberPos);
} }
return ExecuteResult.success(phoneNumberCheckBo.getCheckSuccessNum()); return ExecuteResult.success(phoneNumberCheckBo.getError());
} }
} }

View File

@ -44,7 +44,7 @@ public class TestExcelAnalyseMapReduceJobExecutor {
* @since 2024/6/29 14:03 * @since 2024/6/29 14:03
*/ */
@MapExecutor @MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) { public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler<List<Long>> mapHandler) {
List<List<Long>> ranges = null; List<List<Long>> ranges = null;
// 先获取文件总行数便于分组 // 先获取文件总行数便于分组
try { try {
@ -54,11 +54,11 @@ public class TestExcelAnalyseMapReduceJobExecutor {
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync(); EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();
// 设置区间范围 // 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE); ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getTotal(), BATCH_SIZE);
} catch (Exception e) { } catch (Exception e) {
log.error("文件读取异常", e.getMessage()); log.error("文件读取异常", e.getMessage());
} }
return mapHandler.doMap(ranges, "MONTH_MAP"); return mapHandler.doMap(ranges, "TWO_MAP");
} }
/** /**
@ -68,7 +68,7 @@ public class TestExcelAnalyseMapReduceJobExecutor {
* @author JichenWang * @author JichenWang
* @since 2024/6/29 14:04 * @since 2024/6/29 14:04
*/ */
@MapExecutor(taskName = "MONTH_MAP") @MapExecutor(taskName = "TWO_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) { public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间 // 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult(); final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
@ -116,17 +116,17 @@ public class TestExcelAnalyseMapReduceJobExecutor {
private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) { private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) {
final List<PhoneNumberCheckBo> phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class); final List<PhoneNumberCheckBo> phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class);
// 获取校验总数 // 获取校验总数
final long checkTotalNum = phoneNumberCheckBoList.get(0).getCheckTotalNum(); final long checkTotalNum = phoneNumberCheckBoList.get(0).getTotal();
// 汇总校验失败数量 // 汇总校验失败数量
final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckErrorNum).sum(); final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getError).sum();
// 汇总校验成功数量 // 汇总校验成功数量
final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckSuccessNum).sum(); final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getSuccess).sum();
// 汇总错误手机号 // 汇总错误手机号
final List<String> errorPhoneNumberList = new ArrayList<>(); final List<String> errorPhoneNumberList = new ArrayList<>();
phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrorPhoneNumberList())); phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrors()));
// 汇总手机号校验结果 // 汇总手机号校验结果
return PhoneNumberCheckBo.builder().checkTotalNum(checkTotalNum).checkErrorNum(checkErrorNum).checkSuccessNum(checkSuccessNum).checkErrorPhoneNumberList(errorPhoneNumberList).build(); return PhoneNumberCheckBo.builder().total(checkTotalNum).error(checkErrorNum).success(checkSuccessNum).checkErrors(errorPhoneNumberList).build();
} }
} }

View File

@ -60,8 +60,8 @@ public class TestMapJobExecutor extends AbstractMapExecutor {
@Getter @Getter
private enum MapEnum { private enum MapEnum {
LAST_MAP(null, null), LAST_MAP(null, null),
MONTH_MAP(new QuarterMap(), LAST_MAP), QUARTER_MAP(new QuarterMap(), LAST_MAP),
MAP_ROOT(new RootMap(), MONTH_MAP), MAP_ROOT(new RootMap(), QUARTER_MAP),
; ;
private final Map map; private final Map map;

View File

@ -1,6 +1,5 @@
package com.example.snailjob.job; package com.example.snailjob.job;
import cn.hutool.core.util.ByteUtil;
import com.aizuda.snailjob.client.job.core.MapHandler; import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
@ -75,8 +74,8 @@ public class TestMapReduceJobExecutor extends AbstractMapReduceExecutor {
@Getter @Getter
private enum MapEnum { private enum MapEnum {
LAST_MAP(null, null), LAST_MAP(null, null),
MONTH_MAP(new QuarterMap(), LAST_MAP), QUARTER_MAP(new QuarterMap(), LAST_MAP),
MAP_ROOT(new RootMap(), MONTH_MAP), MAP_ROOT(new RootMap(), QUARTER_MAP),
; ;
private final Map map; private final Map map;

View File

@ -0,0 +1,33 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author opensnail
* @date 2024-07-07 12:06:57
* @since sj_1.1.0
*/
@Component
public class TestMyMapExecutor extends AbstractMapExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) {
return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP");
}
return ExecuteResult.success(mapArgs.getMapResult());
}
}

View File

@ -0,0 +1,45 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author opensnail
* @date 2024-07-07 12:06:57
* @since sj_1.1.0
*/
@Component
public class TestMyMapReduceExecutor extends AbstractMapReduceExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
if (SystemConstants.ROOT_MAP.equals(mapArgs.getTaskName())) {
return mapHandler.doMap(Lists.newArrayList("1", "2", "3", "4"), "TWO_MAP");
}
return ExecuteResult.success(mapArgs.getMapResult());
}
@Override
protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) {
List<String> mapResult = (List<String>) reduceArgs.getMapResult();
return ExecuteResult.success(mapResult.stream().mapToInt(Integer::parseInt).sum());
}
@Override
protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
List<String> reduces = (List<String>) mergeReduceArgs.getReduces();
return ExecuteResult.success(reduces.stream().mapToInt(Integer::parseInt).sum());
}
}

View File

@ -26,6 +26,7 @@ public class TestWorkflowAnnoJobExecutor {
int i = new Random().nextInt(1000); int i = new Random().nextInt(1000);
jobArgs.appendContext("name" + i, "小蜗牛" + i); jobArgs.appendContext("name" + i, "小蜗牛" + i);
jobArgs.appendContext("age", i); jobArgs.appendContext("age", i);
SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
return ExecuteResult.success(failOrderPo); return ExecuteResult.success(failOrderPo);
} }

View File

@ -6,10 +6,8 @@ import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener; import com.alibaba.excel.event.AnalysisEventListener;
import com.example.snailjob.bo.PhoneNumberBo; import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo; import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.po.PhoneNumberPo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Map; import java.util.Map;
/** /**
@ -35,7 +33,7 @@ public class PhoneNumberExcelListener extends AnalysisEventListener<PhoneNumberB
@Override @Override
public void invokeHeadMap(Map<Integer, String> headMap, AnalysisContext context) { public void invokeHeadMap(Map<Integer, String> headMap, AnalysisContext context) {
this.phoneNumberCheckBo.setCheckTotalNum(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1))); this.phoneNumberCheckBo.setTotal(Long.parseLong(String.valueOf(context.readSheetHolder().getApproximateTotalRowNumber() - 1)));
} }
@Override @Override
@ -55,12 +53,12 @@ public class PhoneNumberExcelListener extends AnalysisEventListener<PhoneNumberB
log.info("本次校验的手机号为: {}", phoneNumberBo.getPhoneNumber()); log.info("本次校验的手机号为: {}", phoneNumberBo.getPhoneNumber());
Boolean validateStatus = Validator.isMobile(phoneNumberBo.getPhoneNumber()); Boolean validateStatus = Validator.isMobile(phoneNumberBo.getPhoneNumber());
if (validateStatus) { if (validateStatus) {
this.phoneNumberCheckBo.setCheckSuccessNum(this.phoneNumberCheckBo.getCheckSuccessNum() + 1); this.phoneNumberCheckBo.setSuccess(this.phoneNumberCheckBo.getSuccess() + 1);
final PhoneNumberPo phoneNumberPo = PhoneNumberPo.builder().phoneNumber(phoneNumberBo.getPhoneNumber()).createTime(LocalDateTime.now()).build(); // final PhoneNumberPo phoneNumberPo = PhoneNumberPo.builder().phoneNumber(phoneNumberBo.getPhoneNumber()).createTime(LocalDateTime.now()).build();
this.phoneNumberCheckBo.getCheckSuccessPhoneNumberList().add(phoneNumberPo); // this.phoneNumberCheckBo.getCheckSuccessPhoneNumberList().add(phoneNumberPo);
} else { } else {
this.phoneNumberCheckBo.setCheckErrorNum(this.phoneNumberCheckBo.getCheckErrorNum() + 1); this.phoneNumberCheckBo.setError(this.phoneNumberCheckBo.getError() + 1);
this.phoneNumberCheckBo.getCheckErrorPhoneNumberList().add(phoneNumberBo.getPhoneNumber()); this.phoneNumberCheckBo.getCheckErrors().add(phoneNumberBo.getPhoneNumber());
} }
} }

View File

@ -9,7 +9,7 @@ spring:
active: dev active: dev
datasource: datasource:
name: snail_job name: snail_job
url: jdbc:mysql://localhost:3306/snail_job_260?useSSL=false&characterEncoding=utf8&useUnicode=true url: jdbc:mysql://localhost:3306/snail_job?useSSL=false&characterEncoding=utf8&useUnicode=true
username: root username: root
password: root password: root
type: com.zaxxer.hikari.HikariDataSource type: com.zaxxer.hikari.HikariDataSource