feat(1.5.0-beta1): 修复重试存在场景名称相同时计算下次触发时间错误问题

This commit is contained in:
opensnail 2025-04-07 23:06:06 +08:00
parent 4de55b5fd3
commit 97b563a3f4
5 changed files with 26 additions and 18 deletions

View File

@ -111,7 +111,9 @@ CREATE TABLE `sj_retry`
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称', `group_name` varchar(64) NOT NULL COMMENT '组名称',
`group_id` bigint(20) NOT NULL COMMENT '组Id',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称', `scene_name` varchar(64) NOT NULL COMMENT '场景名称',
`scene_id` bigint(20) NOT NULL COMMENT '场景ID',
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
`biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号', `biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号',
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
@ -134,7 +136,7 @@ CREATE TABLE `sj_retry`
KEY `idx_retry_status_bucket_index` (`retry_status`, `bucket_index`), KEY `idx_retry_status_bucket_index` (`retry_status`, `bucket_index`),
KEY `idx_parent_id` (`parent_id`), KEY `idx_parent_id` (`parent_id`),
KEY `idx_create_dt` (`create_dt`), KEY `idx_create_dt` (`create_dt`),
UNIQUE KEY `uk_name_task_type_idempotent_id_deleted` (`namespace_id`, `group_name`, `task_type`, `idempotent_id`, `deleted`) UNIQUE KEY `uk_scene_id_task_type_idempotent_id_deleted` (`scene_id`, `task_type`, `idempotent_id`, `deleted`)
) ENGINE = InnoDB ) ENGINE = InnoDB
AUTO_INCREMENT = 0 AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表' DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表'

View File

@ -23,8 +23,12 @@ public class Retry extends CreateUpdateDt {
private String groupName; private String groupName;
private String groupId;
private String sceneName; private String sceneName;
private String sceneId;
private String idempotentId; private String idempotentId;
private String bizNo; private String bizNo;

View File

@ -10,7 +10,4 @@ import lombok.Data;
public class PartitionTask { public class PartitionTask {
protected Long id; protected Long id;
// protected String uniqueId;
} }

View File

@ -19,8 +19,12 @@ public class RetryPartitionTask extends PartitionTask {
private String groupName; private String groupName;
private Long groupId;
private String sceneName; private String sceneName;
private Long sceneId;
private Integer taskType; private Integer taskType;
/** /**

View File

@ -94,7 +94,7 @@ public class ScanRetryActor extends AbstractActor {
} }
if (!rateLimiterHandler.tryAcquire(partitionTasks.size())) { if (!rateLimiterHandler.tryAcquire(partitionTasks.size())) {
log.warn("当前节点触发限流"); log.warn("Current node triggers current limit");
return true; return true;
} }
@ -107,13 +107,13 @@ public class ScanRetryActor extends AbstractActor {
} }
// 批次查询场景 // 批次查询场景
Map<String, RetrySceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks); Map<Long, RetrySceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks);
List<Retry> waitUpdateRetries = new ArrayList<>(); List<Retry> waitUpdateRetries = new ArrayList<>();
List<RetryTaskPrepareDTO> waitExecRetries = new ArrayList<>(); List<RetryTaskPrepareDTO> waitExecRetries = new ArrayList<>();
for (PartitionTask task : partitionTasks) { for (PartitionTask task : partitionTasks) {
RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task; RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task;
RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName()); RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneId());
if (Objects.isNull(retrySceneConfig)) { if (Objects.isNull(retrySceneConfig)) {
continue; continue;
} }
@ -141,11 +141,11 @@ public class ScanRetryActor extends AbstractActor {
* 查询场景配置或者退避策略 * 查询场景配置或者退避策略
* *
* @param partitionTasks 待处理任务列表 * @param partitionTasks 待处理任务列表
* @return <SceneName, RetrySceneConfig> * @return <SceneId, RetrySceneConfig>
*/ */
private Map<String, RetrySceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks) { private Map<Long, RetrySceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks) {
Set<String> sceneNameSet = StreamUtils.toSet(partitionTasks, Set<Long> sceneIdSet = StreamUtils.toSet(partitionTasks,
partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()); partitionTask -> ((RetryPartitionTask) partitionTask).getSceneId());
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess() List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<RetrySceneConfig>() .list(new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
@ -153,8 +153,8 @@ public class ScanRetryActor extends AbstractActor {
RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval,
RetrySceneConfig::getExecutorTimeout) RetrySceneConfig::getExecutorTimeout)
.eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus()) .eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
.in(RetrySceneConfig::getSceneName, sceneNameSet)); .in(RetrySceneConfig::getId, sceneIdSet));
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName); return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getId);
} }
private void processRetry(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig, List<RetryTaskPrepareDTO> waitExecRetries, List<Retry> waitUpdateRetries) { private void processRetry(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig, List<RetryTaskPrepareDTO> waitExecRetries, List<Retry> waitUpdateRetries) {
@ -213,12 +213,13 @@ public class ScanRetryActor extends AbstractActor {
// 过滤已关闭的组 // 过滤已关闭的组
if (CollUtil.isNotEmpty(retries)) { if (CollUtil.isNotEmpty(retries)) {
List<String> groupConfigs = StreamUtils.toList(groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>() List<Long> groupConfigs = StreamUtils.toList(
.select(GroupConfig::getGroupName) groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
.in(GroupConfig::getGroupName, StreamUtils.toSet(retries, Retry::getGroupName))), .in(GroupConfig::getId, StreamUtils.toSet(retries, Retry::getGroupId))),
GroupConfig::getGroupName); GroupConfig::getId);
retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getGroupName())).collect(Collectors.toList()); retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getId())).collect(Collectors.toList());
} }
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries); return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);