feat: 2.1.0

1. 完成通过控制台解析日志页面
2. 完成后端日志解析和新增任务
3. 重构了新增任务模型,支持客户端上报、控制台手动新增和单个新增
This commit is contained in:
byteblogs168 2023-07-16 22:58:54 +08:00
parent 421c71dbbc
commit 018357cada
31 changed files with 763 additions and 74 deletions

View File

@ -44,7 +44,7 @@ public @interface ExecutorMethodRegister {
* 时刻3: 上报一个异常 idempotentId: A1 不会新增一个重试任务会被幂等处理
* 时刻4: idempotentId: A1 重试完成, 状态为已完成
* 时刻5: 上报一个异常 idempotentId: A1 状态为重试中, 新增一条重试任务
**
* <p>
* 默认的idempotentId生成器{@link SimpleIdempotentIdGenerate} 对所有参数进行MD5
*
* @return idempotentId
@ -54,14 +54,13 @@ public @interface ExecutorMethodRegister {
/**
* 服务端重试完成(重试成功重试到达最大次数)回调客户端
*
* @return
*/
Class<? extends RetryCompleteCallback> retryCompleteCallback() default SimpleRetryCompleteCallback.class;
/**
* 用于标识具有业务特点的值, 比如订单号物流编号等可以根据具体的业务场景生成生成规则采用通用成熟的Spel表达式进行解析
*
* see: https://docs.spring.io/spring-framework/docs/5.0.0.M5/spring-framework-reference/html/expressions.html
* <p>
* see: <a href="https://docs.spring.io/spring-framework/docs/5.0.0.M5/spring-framework-reference/html/expressions.html">...</a>
*/
String bizNo() default "";

View File

@ -20,8 +20,14 @@ import java.lang.annotation.Target;
@Documented
public @interface Mapping {
/**
* 请求类型
*/
RequestMethod method() default RequestMethod.GET;
/**
* 请求路径
*/
String path() default "";
}

View File

@ -45,7 +45,6 @@ public @interface Retryable {
/**
* 重试处理入口默认为原方法
*
* @return
*/
Class<? extends ExecutorMethod> retryMethod() default ExecutorAnnotationMethod.class;
@ -60,7 +59,7 @@ public @interface Retryable {
* 时刻3: 上报一个异常 idempotentId: A1 不会新增一个重试任务会被幂等处理
* 时刻4: idempotentId: A1 重试完成, 状态为已完成
* 时刻5: 上报一个异常 idempotentId: A1 状态为重试中, 新增一条重试任务
**
* <p>
* 默认的idempotentId生成器{@link SimpleIdempotentIdGenerate} 对所有参数进行MD5
*
* @return idempotentId
@ -70,14 +69,13 @@ public @interface Retryable {
/**
* 服务端重试完成(重试成功重试到达最大次数)回调客户端
*
* @return
*/
Class<? extends RetryCompleteCallback> retryCompleteCallback() default SimpleRetryCompleteCallback.class;
/**
* 用于标识具有业务特点的值, 比如订单号物流编号等可以根据具体的业务场景生成生成规则采用通用成熟的Spel表达式进行解析
*
* see: https://docs.spring.io/spring-framework/docs/5.0.0.M5/spring-framework-reference/html/expressions.html
* see: <a href="https://docs.spring.io/spring-framework/docs/5.0.0.M5/spring-framework-reference/html/expressions.html">...</a>
*/
String bizNo() default "";

View File

@ -50,7 +50,7 @@ public class NettyChannel {
public static void send(HttpMethod method, String url, String body) throws InterruptedException {
if (Objects.isNull(CHANNEL)) {
LogUtils.info(log, "send message but channel is null url:[{}] method:[{}] body:[{}] ", url, method, body);
LogUtils.error(log, "send message but channel is null url:[{}] method:[{}] body:[{}] ", url, method, body);
return;
}

View File

@ -2,13 +2,10 @@ package com.aizuda.easy.retry.client.core.report;
import com.aizuda.easy.retry.client.core.RetryExecutor;
import com.aizuda.easy.retry.client.core.RetryExecutorParameter;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.client.NettyClient;
import com.aizuda.easy.retry.client.core.client.proxy.RequestBuilder;
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.github.rholder.retry.*;
import com.google.common.base.Predicate;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.executor.GuavaRetryExecutor;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
@ -16,11 +13,13 @@ import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.core.window.Listener;
import com.aizuda.easy.retry.server.model.dto.ConfigDTO;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.github.rholder.retry.*;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
@ -61,13 +60,13 @@ public class ReportListener implements Listener<RetryTaskDTO> {
try {
retryExecutor.call(retryer, () -> {
LogUtils.info(log, "Batch asynchronous reporting ...");
LogUtils.info(log, "Batch asynchronous reporting ... <|>{}<|>", JsonUtil.toJsonString(list));
CLIENT.reportRetryInfo(list);
return null;
}, throwable -> {
LogUtils.info(log,"Data report failed{}", JsonUtil.toJsonString(list));
LogUtils.error(log,"Data report failed. <|>{}<|>", JsonUtil.toJsonString(list));
sendMessage(throwable);
}, o -> LogUtils.info(log,"Data report successful retry{}", JsonUtil.toJsonString(list)));
}, o -> LogUtils.info(log,"Data report successful retry<|>{}<|>", JsonUtil.toJsonString(list)));
} catch (Exception e) {
e.printStackTrace();
}
@ -91,9 +90,6 @@ public class ReportListener implements Listener<RetryTaskDTO> {
return Collections.singletonList(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasResult()) {
LogUtils.error(log,"easy-retry 上报成功,第[{}]次调度", attempt.getAttemptNumber());
}
if (attempt.hasException()) {
LogUtils.error(log,"easy-retry 上报失败,第[{}]次调度 ", attempt.getAttemptNumber(), attempt.getExceptionCause());

View File

@ -56,8 +56,8 @@ public abstract class AbstractRetryStrategies implements RetryStrategy {
retryerResultContext.setRetryerInfo(retryerInfo);
try {
for (EasyRetryListener EasyRetryListener : easyRetryListeners) {
EasyRetryListener.beforeRetry(sceneName, executorClassName, params);
for (EasyRetryListener easyRetryListener : easyRetryListeners) {
easyRetryListener.beforeRetry(sceneName, executorClassName, params);
}
Object result = retryExecutor.call(retryer, doGetCallable(retryExecutor, params), getRetryErrorConsumer(retryerResultContext, params), getRetrySuccessConsumer(retryerResultContext));
@ -83,8 +83,8 @@ public abstract class AbstractRetryStrategies implements RetryStrategy {
Object result = retryerResultContext.getResult();
RetryerInfo retryerInfo = retryerResultContext.getRetryerInfo();
for (EasyRetryListener EasyRetryListener : easyRetryListeners) {
EasyRetryListener.successOnRetry(result, retryerInfo.getScene(), retryerInfo.getExecutorClassName());
for (EasyRetryListener easyRetryListener : easyRetryListeners) {
easyRetryListener.successOnRetry(result, retryerInfo.getScene(), retryerInfo.getExecutorClassName());
}
doRetrySuccessConsumer(retryerResultContext).accept(retryerResultContext);
@ -103,8 +103,8 @@ public abstract class AbstractRetryStrategies implements RetryStrategy {
RetryerInfo retryerInfo = context.getRetryerInfo();
try {
for (EasyRetryListener EasyRetryListener : easyRetryListeners) {
EasyRetryListener
for (EasyRetryListener easyRetryListener : easyRetryListeners) {
easyRetryListener
.failureOnRetry(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), throwable);
}
} catch (Exception e) {

View File

@ -18,7 +18,7 @@ public class RetryTaskDTO implements Serializable {
/**
* 加密的groupId
*/
@NotBlank(message = "shardingGroupId 不能为空")
@NotBlank(message = "groupName 不能为空")
@Length(max = 16, message = "组id最长为16")
private String groupName;

View File

@ -137,6 +137,12 @@
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>13.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.server.enums;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* id生成模式
*
* @author www.byteblogs.com
* @date 2023-05-04
* @since 2.0
*/
@AllArgsConstructor
@Getter
public enum TaskGeneratorScene {
CLIENT_REPORT(1,"客户端匹配上报"),
MANA_BATCH(2, "控制台手动批量新增"),
MANA_SINGLE(3, "控制台手动单个新增"),
;
private final int scene;
private final String desc;
public static TaskGeneratorScene modeOf(int scene) {
for (TaskGeneratorScene value : TaskGeneratorScene.values()) {
if (value.getScene() == scene) {
return value;
}
}
throw new EasyRetryServerException("不支持的任务生成场景 [{}]", scene);
}
}

View File

@ -30,4 +30,6 @@ public interface RetryTaskLogMapper extends BaseMapper<RetryTaskLog> {
@Param("endTime")LocalDateTime endTime
);
int batchInsert(List<RetryTaskLog> list);
}

View File

@ -13,4 +13,6 @@ public interface RetryTaskMapper extends BaseMapper<RetryTask> {
int countAllRetryTaskByRetryStatus(@Param("partition") Integer partition,
@Param("retryStatus") Integer retryStatus);
int batchInsert(List<RetryTask> list);
}

View File

@ -76,7 +76,7 @@ public interface ConfigAccess {
*
* @return 黑名单列表
*/
Set<String> getBlacklist(String shardingGroupId);
Set<String> getBlacklist(String groupName);
/**
* 获取所有组配置信息

View File

@ -1,6 +1,9 @@
package com.aizuda.easy.retry.server.server.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.easy.retry.server.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.service.RetryService;
import com.aizuda.easy.retry.server.enums.StatusEnum;
@ -8,12 +11,20 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.service.convert.TaskContextConverter;
import com.aizuda.easy.retry.server.support.generator.TaskGenerator;
import com.aizuda.easy.retry.server.support.generator.task.TaskContext;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.BATCH_REPORT;
/**
@ -28,7 +39,7 @@ import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PA
public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
@Autowired
private RetryService retryService;
private List<TaskGenerator> taskGenerators;
@Override
public boolean supports(String path) {
@ -42,18 +53,38 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
@Override
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
LogUtils.info(log, "批量上报重试数据 content:[{}]", content);
LogUtils.info(log, "Batch Report Retry Data. content:[{}]", content);
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
try {
Object[] args = retryRequest.getArgs();
Boolean aBoolean = retryService.batchReportRetry(JsonUtil.parseList(JsonUtil.toJsonString(args[0]), RetryTaskDTO.class));
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "批量上报重试数据处理成功", aBoolean, retryRequest.getReqId()));
try {
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorScene.CLIENT_REPORT.getScene()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("没有匹配的任务生成器"));
Assert.notEmpty(args, () -> new EasyRetryServerException("上报的数据不能为空. reqId:[{}]", retryRequest.getReqId()));
List<RetryTaskDTO> retryTaskList = JsonUtil.parseList(JsonUtil.toJsonString(args[0]), RetryTaskDTO.class);
Set<String> set = retryTaskList.stream().map(RetryTaskDTO::getGroupName).collect(Collectors.toSet());
Assert.isTrue(set.size() <= 1, () -> new EasyRetryServerException("批量上报数据,同一批次只能是相同的组. reqId:[{}]", retryRequest.getReqId()));
Map<String, List<RetryTaskDTO>> map = retryTaskList.stream().collect(Collectors.groupingBy(RetryTaskDTO::getSceneName));
map.forEach(((sceneName, retryTaskDTOS) -> {
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(sceneName);
taskContext.setGroupName(set.stream().findFirst().get());
taskContext.setTaskInfos(TaskContextConverter.INSTANCE.toTaskContextInfo(retryTaskList));
// 生成任务
taskGenerator.taskGenerator(taskContext);
}));
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Batch Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
} catch (Exception e) {
LogUtils.error(log, "批量上报重试数据失败", e);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), e.getMessage(), null, retryRequest.getReqId()));
LogUtils.error(log, "Batch Report Retry Data Error. <|>{}<|>", args[0], e);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), e.getMessage(), Boolean.FALSE, retryRequest.getReqId()));
}
}
}

View File

@ -1,12 +1,7 @@
package com.aizuda.easy.retry.server.service;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.BatchDeleteRetryTaskVO;
import com.aizuda.easy.retry.server.web.model.request.GenerateRetryIdempotentIdVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskUpdateStatusRequestVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskSaveRequestVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskUpdateExecutorNameRequestVO;
import com.aizuda.easy.retry.server.web.model.request.*;
import com.aizuda.easy.retry.server.web.model.response.RetryTaskResponseVO;
import java.util.List;
@ -69,4 +64,11 @@ public interface RetryTaskService {
*/
Integer deleteRetryTask(BatchDeleteRetryTaskVO requestVO);
/**
* 解析日志
*
* @param parseLogsVO {@link ParseLogsVO} 解析参数模型
* @return
*/
Integer parseLogs(ParseLogsVO parseLogsVO);
}

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.service.convert;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryDeadLetter;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.support.generator.task.TaskContext;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskSaveRequestVO;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@ -32,4 +33,6 @@ public interface RetryTaskConverter {
RetryTask toRetryTask(RetryTaskSaveRequestVO retryTaskSaveRequestVO);
List<RetryTask> toRetryTaskList(List<RetryTaskDTO> retryTaskDTOList);
RetryTask toRetryTask(TaskContext.TaskInfo taskInfo);
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.server.service.convert;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.support.generator.task.TaskContext;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskSaveRequestVO;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2023-07-16 22:09:40
* @since 2.1.0
*/
@Mapper
public interface TaskContextConverter {
TaskContextConverter INSTANCE = Mappers.getMapper(TaskContextConverter.class);
TaskContext.TaskInfo toTaskContextInfo(RetryTaskSaveRequestVO retryTaskSaveRequestVO);
List<TaskContext.TaskInfo> toTaskContextInfo(List<RetryTaskDTO> retryTasks);
}

View File

@ -74,7 +74,7 @@ public class RetryServiceImpl implements RetryService {
@Transactional
@Override
public Boolean reportRetry(RetryTaskDTO retryTaskDTO) {
LogUtils.warn(log, "received report data [{}]", JsonUtil.toJsonString(retryTaskDTO));
LogUtils.info(log, "received report data. <|>{}<|>", JsonUtil.toJsonString(retryTaskDTO));
SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
if (Objects.isNull(sceneConfig)) {

View File

@ -5,10 +5,13 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.config.RequestDataHelper;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMessageMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper;
@ -21,17 +24,15 @@ import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.service.RetryTaskService;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.aizuda.easy.retry.server.service.convert.RetryTaskResponseVOConverter;
import com.aizuda.easy.retry.server.service.convert.TaskContextConverter;
import com.aizuda.easy.retry.server.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.support.generator.IdGenerator;
import com.aizuda.easy.retry.server.support.generator.TaskGenerator;
import com.aizuda.easy.retry.server.support.generator.task.TaskContext;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.BatchDeleteRetryTaskVO;
import com.aizuda.easy.retry.server.web.model.request.GenerateRetryIdempotentIdVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskUpdateStatusRequestVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskSaveRequestVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskUpdateExecutorNameRequestVO;
import com.aizuda.easy.retry.server.web.model.request.*;
import com.aizuda.easy.retry.server.web.model.response.RetryTaskResponseVO;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@ -42,14 +43,16 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpEntity;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* @author www.byteblogs.com
@ -76,6 +79,8 @@ public class RetryTaskServiceImpl implements RetryTaskService {
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private List<TaskGenerator> taskGenerators;
@Override
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
@ -172,19 +177,19 @@ public class RetryTaskServiceImpl implements RetryTaskService {
throw new EasyRetryServerException("重试状态错误");
}
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTaskRequestVO);
retryTask.setCreateDt(LocalDateTime.now());
retryTask.setUpdateDt(LocalDateTime.now());
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorScene.MANA_SINGLE.getScene()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("没有匹配的任务生成器"));
if (StringUtils.isBlank(retryTask.getExtAttrs())) {
retryTask.setExtAttrs(StringUtils.EMPTY);
}
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(retryTaskRequestVO.getSceneName());
taskContext.setGroupName(retryTaskRequestVO.getGroupName());
taskContext.setTaskInfos(Collections.singletonList(TaskContextConverter.INSTANCE.toTaskContextInfo(retryTaskRequestVO)));
retryTask.setNextTriggerAt(
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
retryTask.setUniqueId(getIdGenerator(retryTask.getGroupName()));
RequestDataHelper.setPartition(retryTaskRequestVO.getGroupName());
return retryTaskMapper.insert(retryTask);
// 生成任务
taskGenerator.taskGenerator(taskContext);
return 1;
}
@Override
@ -231,6 +236,47 @@ public class RetryTaskServiceImpl implements RetryTaskService {
return retryTaskMapper.deleteBatchIds(requestVO.getIds());
}
@Override
public Integer parseLogs(ParseLogsVO parseLogsVO) {
String logStr = parseLogsVO.getLogStr();
String patternString = "<\\|>(.*?)<\\|>";
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(logStr);
List<RetryTaskDTO> waitInsertList = new ArrayList<>();
// 查找匹配的内容并输出
while (matcher.find()) {
String extractedData = matcher.group(1);
if (StringUtils.isBlank(extractedData)) {
continue;
}
List<RetryTaskDTO> retryTaskList = JsonUtil.parseList(extractedData, RetryTaskDTO.class);
if (!CollectionUtils.isEmpty(retryTaskList)) {
waitInsertList.addAll(retryTaskList);
}
}
Assert.isFalse(waitInsertList.isEmpty(), () -> new EasyRetryServerException("未找到匹配的数据"));
Assert.isTrue(waitInsertList.size() <= 500, () -> new EasyRetryServerException("最多只能处理500条数据"));
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorScene.MANA_BATCH.getScene()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("没有匹配的任务生成器"));
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(parseLogsVO.getSceneName());
taskContext.setGroupName(parseLogsVO.getGroupName());
taskContext.setTaskInfos(TaskContextConverter.INSTANCE.toTaskContextInfo(waitInsertList));
// 生成任务
taskGenerator.taskGenerator(taskContext);
return waitInsertList.size();
}
/**
* 获取分布式id
*

View File

@ -0,0 +1,29 @@
package com.aizuda.easy.retry.server.support.generator;
import com.aizuda.easy.retry.server.support.generator.task.TaskContext;
/**
* 任务生成器
*
* @author www.byteblogs.com
* @date 2023-07-16 11:42:38
* @since 2.1.0
*/
public interface TaskGenerator {
/**
* 获取匹配的模式
*
* @param scene 1. 客户端上报 2.控制台新增单个任务 3.控制台批量新增任务
* @return 符合条件的生成器
*/
boolean supports(int scene);
/**
* 任务生成器
*
* @param taskContext 任务列表
* @return 成功处理的数据量
*/
void taskGenerator(TaskContext taskContext);
}

View File

@ -0,0 +1,195 @@
package com.aizuda.easy.retry.server.support.generator.task;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.config.RequestDataHelper;
import com.aizuda.easy.retry.server.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.enums.StatusEnum;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.*;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.support.generator.IdGenerator;
import com.aizuda.easy.retry.server.support.generator.TaskGenerator;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author www.byteblogs.com
* @date 2023-07-16 11:52:39
* @since 2.1.0
*/
@Slf4j
public abstract class AbstractGenerator implements TaskGenerator {
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private List<IdGenerator> idGeneratorList;
@Autowired
private RetryTaskMapper retryTaskMapper;
@Autowired
private SceneConfigMapper sceneConfigMapper;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Override
@Transactional
public void taskGenerator(TaskContext taskContext) {
LogUtils.info(log, "received report data. {}", JsonUtil.toJsonString(taskContext));
checkAndInitScene(taskContext);
List<TaskContext.TaskInfo> taskInfos = taskContext.getTaskInfos();
Set<String> idempotentIdSet = taskInfos.stream().map(TaskContext.TaskInfo::getIdempotentId).collect(Collectors.toSet());
// 获取相关的任务用户幂等校验
List<RetryTask> retryTasks = retryTaskMapper.selectList(new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getGroupName, taskContext.getGroupName())
.eq(RetryTask::getSceneName, taskContext.getSceneName())
.in(RetryTask::getIdempotentId, idempotentIdSet));
Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap = retryTasks.stream().collect(Collectors.groupingBy(RetryTask::getIdempotentId));
List<RetryTask> waitInsertTasks = new ArrayList<>();
List<RetryTaskLog> waitInsertTaskLogs = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
for (TaskContext.TaskInfo taskInfo : taskInfos) {
Pair<List<RetryTask>, List<RetryTaskLog>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo);
waitInsertTasks.addAll(pair.getKey());
waitInsertTaskLogs.addAll(pair.getValue());
}
RequestDataHelper.setPartition(taskContext.getGroupName());
Assert.isTrue(waitInsertTasks.size() == retryTaskMapper.batchInsert(waitInsertTasks), () -> new EasyRetryServerException("failed to report data"));
Assert.isTrue(waitInsertTaskLogs.size() == retryTaskLogMapper.batchInsert(waitInsertTaskLogs),
() -> new EasyRetryServerException("新增重试日志失败"));
}
/**
* @param retryTaskMap
* @param now
* @param taskInfo
*/
private Pair<List<RetryTask>, List<RetryTaskLog>> doConvertTask(Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap,
TaskContext taskContext, LocalDateTime now,
TaskContext.TaskInfo taskInfo) {
List<RetryTask> waitInsertTasks = new ArrayList<>();
List<RetryTaskLog> waitInsertTaskLogs = new ArrayList<>();
// 判断是否存在与幂等ID相同的任务
List<RetryTask> list = retryTaskMap.get(taskInfo.getIdempotentId()).stream()
.filter(retryTask -> taskContext.getGroupName().equals(retryTask.getGroupName())
&& taskContext.getSceneName().equals(retryTask.getSceneName())).collect(Collectors.toList());
// 说明存在相同的任务
if (!CollectionUtils.isEmpty(list)) {
LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(taskInfo));
return Pair.of(waitInsertTasks, waitInsertTaskLogs);
}
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(taskInfo);
retryTask.setUniqueId(getIdGenerator(taskContext.getGroupName()));
retryTask.setTaskType(TaskTypeEnum.RETRY.getType());
retryTask.setGroupName(taskContext.getGroupName());
retryTask.setSceneName(taskContext.getSceneName());
retryTask.setCreateDt(now);
retryTask.setUpdateDt(now);
if (StringUtils.isBlank(retryTask.getExtAttrs())) {
retryTask.setExtAttrs(StringUtils.EMPTY);
}
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
waitInsertTasks.add(retryTask);
// 初始化日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
retryTaskLog.setTaskType(TaskTypeEnum.RETRY.getType());
retryTaskLog.setCreateDt(now);
waitInsertTaskLogs.add(retryTaskLog);
return Pair.of(waitInsertTasks, waitInsertTaskLogs);
}
private void checkAndInitScene( TaskContext taskContext) {
SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(taskContext.getGroupName(), taskContext.getSceneName());
if (Objects.isNull(sceneConfig)) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(taskContext.getGroupName());
if (Objects.isNull(groupConfig)) {
throw new EasyRetryServerException("failed to report data, no group configuration found. groupName:[{}]", taskContext.getGroupName());
}
if (groupConfig.getInitScene().equals(StatusEnum.NO.getStatus())) {
throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", taskContext.getGroupName(), taskContext.getSceneName());
} else {
// 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景
initScene(taskContext.getGroupName(), taskContext.getSceneName());
}
}
}
/**
* 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景
* backOff(退避策略): 等级策略
* maxRetryCount(最大重试次数): 26
* triggerInterval(间隔时间): see: {@link DelayLevelEnum}
*
* @param groupName 组名称
* @param sceneName 场景名称
*/
private void initScene(String groupName, String sceneName) {
SceneConfig sceneConfig;
sceneConfig = new SceneConfig();
sceneConfig.setGroupName(groupName);
sceneConfig.setSceneName(sceneName);
sceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getBackOff());
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig), () -> new EasyRetryServerException("init scene error"));
}
/**
* 获取分布式id
*
* @param groupName 组id
* @return 分布式id
*/
private String getIdGenerator(String groupName) {
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
for (final IdGenerator idGenerator : idGeneratorList) {
if (idGenerator.supports(groupConfig.getIdGeneratorMode())) {
return idGenerator.idGenerator(groupName);
}
}
throw new EasyRetryServerException("id generator mode not configured. [{}]", groupName);
}
}

View File

@ -0,0 +1,19 @@
package com.aizuda.easy.retry.server.support.generator.task;
import com.aizuda.easy.retry.server.enums.TaskGeneratorScene;
import org.springframework.stereotype.Component;
/**
* 客户端上报任务生成器
*
* @author www.byteblogs.com
* @date 2023-07-16 11:51:56
* @since 2.1.0
*/
@Component
public class ClientReportRetryGenerator extends AbstractGenerator {
@Override
public boolean supports(int scene) {
return TaskGeneratorScene.CLIENT_REPORT.getScene() == scene;
}
}

View File

@ -0,0 +1,19 @@
package com.aizuda.easy.retry.server.support.generator.task;
import com.aizuda.easy.retry.server.enums.TaskGeneratorScene;
import org.springframework.stereotype.Component;
/**
* 控制台手动批量新增
*
* @author www.byteblogs.com
* @date 2023-07-16 11:51:56
* @since 2.1.0
*/
@Component
public class ManaBatchRetryGenerator extends AbstractGenerator {
@Override
public boolean supports(int scene) {
return TaskGeneratorScene.MANA_BATCH.getScene() == scene;
}
}

View File

@ -0,0 +1,19 @@
package com.aizuda.easy.retry.server.support.generator.task;
import com.aizuda.easy.retry.server.enums.TaskGeneratorScene;
import org.springframework.stereotype.Component;
/**
* 控制台手动单个新增
*
* @author www.byteblogs.com
* @date 2023-07-16 11:51:56
* @since 2.1.0
*/
@Component
public class ManaSingleRetryGenerator extends AbstractGenerator {
@Override
public boolean supports(int scene) {
return TaskGeneratorScene.MANA_SINGLE.getScene() == scene;
}
}

View File

@ -0,0 +1,57 @@
package com.aizuda.easy.retry.server.support.generator.task;
import lombok.Data;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2023-07-16 21:26:52
* @since
*/
@Data
public class TaskContext {
/**
* 加密的groupId
*/
private String groupName;
/**
* 加密的sceneId
*/
private String sceneName;
/**
* 任务信息
*/
private List<TaskInfo> taskInfos;
@Data
public static class TaskInfo {
/**
* 业务唯一id
*/
private String idempotentId;
/**
* 执行器名称
*/
private String executorName;
/**
* 业务唯一编号
*/
private String bizNo;
/**
* 客户端上报参数
*/
private String argsStr;
/**
* 额外扩展参数
*/
private String extAttrs;
}
}

View File

@ -4,12 +4,7 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.service.RetryTaskService;
import com.aizuda.easy.retry.server.web.annotation.LoginRequired;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.BatchDeleteRetryTaskVO;
import com.aizuda.easy.retry.server.web.model.request.GenerateRetryIdempotentIdVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskUpdateStatusRequestVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskSaveRequestVO;
import com.aizuda.easy.retry.server.web.model.request.RetryTaskUpdateExecutorNameRequestVO;
import com.aizuda.easy.retry.server.web.model.request.*;
import com.aizuda.easy.retry.server.web.model.response.RetryTaskResponseVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
@ -80,4 +75,10 @@ public class RetryTaskController {
public Integer deleteRetryTask(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) {
return retryTaskService.deleteRetryTask(requestVO);
}
@LoginRequired
@PostMapping("/batch")
public Integer parseLogs(@RequestBody @Validated ParseLogsVO parseLogsVO) {
return retryTaskService.parseLogs(parseLogsVO);
}
}

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.server.web.model.request;
import lombok.Data;
import org.hibernate.validator.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* 解析参数模型
*
* @author: www.byteblogs.com
* @date: 2023-07-15 23:15
*/
@Data
public class ParseLogsVO {
/**
* 客户端打印的上报日志信息
*/
@NotBlank(message = "日志信息不能为空")
private String logStr;
/**
* 组名称
*/
@NotBlank(message = "组名称不能为空")
private String groupName;
/**
* 场景名称
*/
@NotBlank(message = "场景名称不能为空")
private String sceneName;
@NotNull(message = "重试状态不能为空")
private Integer retryStatus;
}

View File

@ -86,4 +86,13 @@
</select>
<!-- 定义批量新增的 SQL 映射 -->
<insert id="batchInsert" parameterType="java.util.List">
INSERT INTO retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status, task_type, create_dt)
VALUES
<foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.retryStatus}, #{item.taskType}, #{item.createDt})
</foreach>
</insert>
</mapper>

View File

@ -68,4 +68,12 @@
<include refid="Base_Column_List" />
from retry_task_${partition}
</select>
<!-- 定义批量新增的 SQL 映射 -->
<insert id="batchInsert" parameterType="java.util.List">
INSERT INTO retry_task_${partition} (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, next_trigger_at, retry_count, retry_status, task_type, create_dt, update_dt)
VALUES
<foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.nextTriggerAt}, #{item.retryCount}, #{item.retryStatus}, #{item.taskType}, #{item.createDt}, #{item.updateDt})
</foreach>
</insert>
</mapper>

View File

@ -14,6 +14,7 @@ const api = {
retryTaskPage: '/retry-task/list',
retryTaskById: '/retry-task/',
saveRetryTask: '/retry-task',
batchSaveRetryTask: '/retry-task/batch',
idempotentIdGenerate: '/retry-task/generate/idempotent-id',
batchUpdate: '/retry-task/batch',
deleteRetryTask: '/retry-task/batch',
@ -115,6 +116,14 @@ export function saveRetryTask (data) {
})
}
export function batchSaveRetryTask (data) {
return request({
url: api.batchSaveRetryTask,
method: 'post',
data
})
}
export function getTotalPartition () {
return request({
url: api.totalPartition,

View File

@ -66,7 +66,8 @@
</a-form>
</div>
<div class="table-operator">
<a-button type="primary" icon="plus" @click="handleNew()">新增</a-button>
<a-button type="primary" icon="plus" @click="handleNew()">单个</a-button>
<a-button type="primary" icon="plus" @click="handleBatchNew()">批量</a-button>
<a-dropdown v-action:edit v-if="selectedRowKeys.length > 0">
<a-menu slot="overlay" @click="onClick">
<a-menu-item key="1"><a-icon type="delete" />删除</a-menu-item>
@ -115,6 +116,8 @@
<SaveRetryTask ref="saveRetryTask" @refreshTable="refreshTable"/>
<BatchUpdateRetryTaskInfo ref="batchUpdateRetryTaskInfo" @refreshTable="refreshTable"/>
<BatchSaveRetryTask ref="batchSaveRetryTask" @refreshTable="refreshTable"/>
</a-card>
</template>
@ -125,6 +128,7 @@ import { getAllGroupNameList, getRetryTaskPage, getSceneList, updateRetryTaskSta
import { STable } from '@/components'
import SaveRetryTask from './form/SaveRetryTask'
import BatchUpdateRetryTaskInfo from './form/BatchUpdateRetryTaskInfo'
import BatchSaveRetryTask from '@/views/task/form/BatchSaveRetryTask.vue'
export default {
name: 'RetryTask',
@ -133,7 +137,8 @@ export default {
ATextarea,
STable,
SaveRetryTask,
BatchUpdateRetryTaskInfo
BatchUpdateRetryTaskInfo,
BatchSaveRetryTask
},
data () {
return {
@ -285,6 +290,9 @@ export default {
handleNew () {
this.$refs.saveRetryTask.isShow(true, null)
},
handleBatchNew () {
this.$refs.batchSaveRetryTask.isShow(true, null)
},
handleChange (value) {
getSceneList({ groupName: value }).then((res) => {
this.sceneList = res.data

View File

@ -0,0 +1,128 @@
<template>
<div>
<a-modal :visible="visible" title="新增任务" @ok="handleOk" @cancel="visible = false" width="800px">
<a-form @submit="handleOk" :form="form" v-bind="formItemLayout">
<a-form-item label="组">
<a-select
placeholder="请选择组"
v-decorator="['groupName', { rules: [{ required: true, message: '请选择组' }] }]"
@change="(value) => handleChange(value)"
>
<a-select-option v-for="item in groupNameList" :value="item" :key="item">{{ item }}</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="场景名称">
<a-select
placeholder="请选择场景名称"
v-decorator="['sceneName', { rules: [{ required: true, message: '请选择场景名称' }] }]" >
<a-select-option v-for="item in sceneList" :value="item.sceneName" :key="item.sceneName">
{{ item.sceneName }}</a-select-option
>
</a-select>
</a-form-item>
<a-form-item label="重试状态">
<a-select
placeholder="请选择重试状态"
v-decorator="['retryStatus', { rules: [{ required: true, message: '请选择重试状态' }] }]"
>
<a-select-option v-for="(value, key) in retryStatus" :value="key" :key="key"> {{ value }}</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="日志信息">
<a-textarea
rows="4"
allow-clear
placeholder="请输入日志信息"
v-decorator="['logStr', { rules: [{ required: true, message: '请输入包含<|>参数<|>的日志信息.' }, { validator: handleLogStr }], validateTrigger: 'change' }]"
/>
<a href="https://www.easyretry.com/" target="_blank">获取日志信息</a>
</a-form-item>
</a-form>
</a-modal>
</div>
</template>
<script>
import { getAllGroupNameList, getSceneList, batchSaveRetryTask } from '@/api/manage'
export default {
name: 'SavRetryTask',
props: {},
data () {
return {
visible: false,
form: this.$form.createForm(this),
formItemLayout: {
labelCol: { lg: { span: 6 }, sm: { span: 7 } },
wrapperCol: { lg: { span: 14 }, sm: { span: 17 } }
},
groupNameList: [],
sceneList: [],
retryStatus: {
0: '重试中',
1: '重试完成',
2: '最大次数',
3: '暂停'
}
}
},
methods: {
handleOk (e) {
console.log(e)
e.preventDefault()
this.form.validateFields((err, values) => {
if (!err) {
console.log(values)
batchSaveRetryTask(values).then((res) => {
// this.form.resetFields()
// this.$message.success('')
// this.visible = false
// this.$emit('refreshTable', 1)
})
}
})
},
handleChange (value) {
getSceneList({ groupName: value }).then((res) => {
this.sceneList = res.data
})
},
isShow (visible, data) {
this.visible = visible
getAllGroupNameList().then((res) => {
this.groupNameList = res.data
})
},
handleLogStr (rule, value, callback) {
if (!value) {
return callback()
}
const regex = /<\|>(.*?)<\|>/g
let matchCount = 0
let result
while ((result = regex.exec(value)) !== null) {
const matchedData = result[1]
console.log(matchedData)
matchCount++
}
console.log('符合条件的数据条数:' + matchCount)
if (matchCount === 0) {
return callback(new Error('未包含<|>'))
} else if (matchCount > 500) {
return callback(new Error('最多只能提交500个有效数据'))
} else {
return callback()
}
}
}
}
</script>
<style scoped>
</style>