feat(1.5.0-beta1): 修复重试存在场景名称相同时计算下次触发时间错误问题
This commit is contained in:
parent
4e7ac8cdc4
commit
67c10e1431
@ -111,7 +111,9 @@ CREATE TABLE `sj_retry`
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
|
||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||
`group_id` bigint(20) NOT NULL COMMENT '组Id',
|
||||
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
|
||||
`scene_id` bigint(20) NOT NULL COMMENT '场景ID',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
`biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号',
|
||||
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
|
||||
@ -134,7 +136,7 @@ CREATE TABLE `sj_retry`
|
||||
KEY `idx_retry_status_bucket_index` (`retry_status`, `bucket_index`),
|
||||
KEY `idx_parent_id` (`parent_id`),
|
||||
KEY `idx_create_dt` (`create_dt`),
|
||||
UNIQUE KEY `uk_name_task_type_idempotent_id_deleted` (`namespace_id`, `group_name`, `task_type`, `idempotent_id`, `deleted`)
|
||||
UNIQUE KEY `uk_scene_id_task_type_idempotent_id_deleted` (`scene_id`, `task_type`, `idempotent_id`, `deleted`)
|
||||
) ENGINE = InnoDB
|
||||
AUTO_INCREMENT = 0
|
||||
DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表'
|
||||
|
@ -23,8 +23,12 @@ public class Retry extends CreateUpdateDt {
|
||||
|
||||
private String groupName;
|
||||
|
||||
private String groupId;
|
||||
|
||||
private String sceneName;
|
||||
|
||||
private String sceneId;
|
||||
|
||||
private String idempotentId;
|
||||
|
||||
private String bizNo;
|
||||
|
@ -10,7 +10,4 @@ import lombok.Data;
|
||||
public class PartitionTask {
|
||||
|
||||
protected Long id;
|
||||
|
||||
// protected String uniqueId;
|
||||
|
||||
}
|
||||
|
@ -19,8 +19,12 @@ public class RetryPartitionTask extends PartitionTask {
|
||||
|
||||
private String groupName;
|
||||
|
||||
private Long groupId;
|
||||
|
||||
private String sceneName;
|
||||
|
||||
private Long sceneId;
|
||||
|
||||
private Integer taskType;
|
||||
|
||||
/**
|
||||
|
@ -94,7 +94,7 @@ public class ScanRetryActor extends AbstractActor {
|
||||
}
|
||||
|
||||
if (!rateLimiterHandler.tryAcquire(partitionTasks.size())) {
|
||||
log.warn("当前节点触发限流");
|
||||
log.warn("Current node triggers current limit");
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -107,13 +107,13 @@ public class ScanRetryActor extends AbstractActor {
|
||||
}
|
||||
|
||||
// 批次查询场景
|
||||
Map<String, RetrySceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks);
|
||||
Map<Long, RetrySceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks);
|
||||
|
||||
List<Retry> waitUpdateRetries = new ArrayList<>();
|
||||
List<RetryTaskPrepareDTO> waitExecRetries = new ArrayList<>();
|
||||
for (PartitionTask task : partitionTasks) {
|
||||
RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task;
|
||||
RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName());
|
||||
RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneId());
|
||||
if (Objects.isNull(retrySceneConfig)) {
|
||||
continue;
|
||||
}
|
||||
@ -141,11 +141,11 @@ public class ScanRetryActor extends AbstractActor {
|
||||
* 查询场景配置或者退避策略
|
||||
*
|
||||
* @param partitionTasks 待处理任务列表
|
||||
* @return <SceneName, RetrySceneConfig>
|
||||
* @return <SceneId, RetrySceneConfig>
|
||||
*/
|
||||
private Map<String, RetrySceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks) {
|
||||
Set<String> sceneNameSet = StreamUtils.toSet(partitionTasks,
|
||||
partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName());
|
||||
private Map<Long, RetrySceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks) {
|
||||
Set<Long> sceneIdSet = StreamUtils.toSet(partitionTasks,
|
||||
partitionTask -> ((RetryPartitionTask) partitionTask).getSceneId());
|
||||
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
|
||||
.list(new LambdaQueryWrapper<RetrySceneConfig>()
|
||||
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
|
||||
@ -153,8 +153,8 @@ public class ScanRetryActor extends AbstractActor {
|
||||
RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval,
|
||||
RetrySceneConfig::getExecutorTimeout)
|
||||
.eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
|
||||
.in(RetrySceneConfig::getSceneName, sceneNameSet));
|
||||
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
|
||||
.in(RetrySceneConfig::getId, sceneIdSet));
|
||||
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getId);
|
||||
}
|
||||
|
||||
private void processRetry(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig, List<RetryTaskPrepareDTO> waitExecRetries, List<Retry> waitUpdateRetries) {
|
||||
@ -213,12 +213,13 @@ public class ScanRetryActor extends AbstractActor {
|
||||
|
||||
// 过滤已关闭的组
|
||||
if (CollUtil.isNotEmpty(retries)) {
|
||||
List<String> groupConfigs = StreamUtils.toList(groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>()
|
||||
.select(GroupConfig::getGroupName)
|
||||
List<Long> groupConfigs = StreamUtils.toList(
|
||||
groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>()
|
||||
.select(GroupConfig::getId)
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||
.in(GroupConfig::getGroupName, StreamUtils.toSet(retries, Retry::getGroupName))),
|
||||
GroupConfig::getGroupName);
|
||||
retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getGroupName())).collect(Collectors.toList());
|
||||
.in(GroupConfig::getId, StreamUtils.toSet(retries, Retry::getGroupId))),
|
||||
GroupConfig::getId);
|
||||
retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getId())).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);
|
||||
|
Loading…
Reference in New Issue
Block a user