From 97b563a3f4359879f0b0bc52b8e1e811a1565adb Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Mon, 7 Apr 2025 23:06:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.5.0-beta1):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E5=AD=98=E5=9C=A8=E5=9C=BA=E6=99=AF=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E7=9B=B8=E5=90=8C=E6=97=B6=E8=AE=A1=E7=AE=97=E4=B8=8B?= =?UTF-8?q?=E6=AC=A1=E8=A7=A6=E5=8F=91=E6=97=B6=E9=97=B4=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/snail_job_mysql.sql | 4 ++- .../datasource/persistence/po/Retry.java | 4 +++ .../server/common/dto/PartitionTask.java | 3 -- .../retry/task/dto/RetryPartitionTask.java | 4 +++ .../task/support/dispatch/ScanRetryActor.java | 29 ++++++++++--------- 5 files changed, 26 insertions(+), 18 deletions(-) diff --git a/doc/sql/snail_job_mysql.sql b/doc/sql/snail_job_mysql.sql index b4cb1628a..f370f1cc5 100644 --- a/doc/sql/snail_job_mysql.sql +++ b/doc/sql/snail_job_mysql.sql @@ -111,7 +111,9 @@ CREATE TABLE `sj_retry` `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', `group_name` varchar(64) NOT NULL COMMENT '组名称', + `group_id` bigint(20) NOT NULL COMMENT '组Id', `scene_name` varchar(64) NOT NULL COMMENT '场景名称', + `scene_id` bigint(20) NOT NULL COMMENT '场景ID', `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', `biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号', `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', @@ -134,7 +136,7 @@ CREATE TABLE `sj_retry` KEY `idx_retry_status_bucket_index` (`retry_status`, `bucket_index`), KEY `idx_parent_id` (`parent_id`), KEY `idx_create_dt` (`create_dt`), - UNIQUE KEY `uk_name_task_type_idempotent_id_deleted` (`namespace_id`, `group_name`, `task_type`, `idempotent_id`, `deleted`) + UNIQUE KEY `uk_scene_id_task_type_idempotent_id_deleted` (`scene_id`, `task_type`, `idempotent_id`, `deleted`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表' diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java index 87fe6d266..94f0c10de 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java @@ -23,8 +23,12 @@ public class Retry extends CreateUpdateDt { private String groupName; + private String groupId; + private String sceneName; + private String sceneId; + private String idempotentId; private String bizNo; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PartitionTask.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PartitionTask.java index 0d66c851e..63e18c659 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PartitionTask.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/PartitionTask.java @@ -10,7 +10,4 @@ import lombok.Data; public class PartitionTask { protected Long id; - -// protected String uniqueId; - } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java index 9e414e3db..89a8a5e86 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java @@ -19,8 +19,12 @@ public class RetryPartitionTask extends PartitionTask { private String groupName; + private Long groupId; + private String sceneName; + private Long sceneId; + private Integer taskType; /** diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index f5ad958f5..168f83f90 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -94,7 +94,7 @@ public class ScanRetryActor extends AbstractActor { } if (!rateLimiterHandler.tryAcquire(partitionTasks.size())) { - log.warn("当前节点触发限流"); + log.warn("Current node triggers current limit"); return true; } @@ -107,13 +107,13 @@ public class ScanRetryActor extends AbstractActor { } // 批次查询场景 - Map sceneConfigMap = getSceneConfigMap(partitionTasks); + Map sceneConfigMap = getSceneConfigMap(partitionTasks); List waitUpdateRetries = new ArrayList<>(); List waitExecRetries = new ArrayList<>(); for (PartitionTask task : partitionTasks) { RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task; - RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName()); + RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneId()); if (Objects.isNull(retrySceneConfig)) { continue; } @@ -141,11 +141,11 @@ public class ScanRetryActor extends AbstractActor { * 查询场景配置或者退避策略 * * @param partitionTasks 待处理任务列表 - * @return + * @return */ - private Map getSceneConfigMap(final List partitionTasks) { - Set sceneNameSet = StreamUtils.toSet(partitionTasks, - partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()); + private Map getSceneConfigMap(final List partitionTasks) { + Set sceneIdSet = StreamUtils.toSet(partitionTasks, + partitionTask -> ((RetryPartitionTask) partitionTask).getSceneId()); List retrySceneConfigs = accessTemplate.getSceneConfigAccess() .list(new LambdaQueryWrapper() .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, @@ -153,8 +153,8 @@ public class ScanRetryActor extends AbstractActor { RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout) .eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus()) - .in(RetrySceneConfig::getSceneName, sceneNameSet)); - return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName); + .in(RetrySceneConfig::getId, sceneIdSet)); + return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getId); } private void processRetry(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig, List waitExecRetries, List waitUpdateRetries) { @@ -213,12 +213,13 @@ public class ScanRetryActor extends AbstractActor { // 过滤已关闭的组 if (CollUtil.isNotEmpty(retries)) { - List groupConfigs = StreamUtils.toList(groupConfigMapper.selectList(new LambdaQueryWrapper() - .select(GroupConfig::getGroupName) + List groupConfigs = StreamUtils.toList( + groupConfigMapper.selectList(new LambdaQueryWrapper() + .select(GroupConfig::getId) .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) - .in(GroupConfig::getGroupName, StreamUtils.toSet(retries, Retry::getGroupName))), - GroupConfig::getGroupName); - retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getGroupName())).collect(Collectors.toList()); + .in(GroupConfig::getId, StreamUtils.toSet(retries, Retry::getGroupId))), + GroupConfig::getId); + retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getId())).collect(Collectors.toList()); } return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);