From e92d3bef01815bf271d9f94f54942089a7170cb7 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 5 May 2023 16:23:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=201.2.0=201.=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=88=86=E5=B8=83=E5=BC=8Fid=E7=94=9F=E6=88=90=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=202.=20=E4=B8=8A=E6=8A=A5=E5=BC=82=E5=B8=B8=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=BF=9D=E5=AD=98unique=5Fid=203.=20=E4=B8=8B?= =?UTF-8?q?=E5=8F=91=E9=87=8D=E8=AF=95=E6=B5=81=E9=87=8F=E6=97=B6=E9=80=8F?= =?UTF-8?q?=E4=BC=A0unique=5Fid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry.sql | 64 ++-- .../client/core/client/RetryEndPoint.java | 1 + .../retry/client/model/DispatchRetryDTO.java | 2 + .../client/model/DispatchRetryResultDTO.java | 1 + .../retry/client/model/RetryCallbackDTO.java | 2 + .../common/core/enums/IdGeneratorMode.java | 4 +- easy-retry-server/pom.xml | 5 + .../retry/server/config/SystemProperties.java | 7 +- .../mybatis/mapper/SequenceAllocMapper.java | 36 +++ .../persistence/mybatis/po/GroupConfig.java | 4 +- .../mybatis/po/RetryDeadLetter.java | 2 + .../persistence/mybatis/po/RetryTask.java | 2 + .../persistence/mybatis/po/RetryTaskLog.java | 2 + .../persistence/mybatis/po/SequenceAlloc.java | 52 +++ .../convert/RetryTaskLogConverter.java | 18 ++ .../service/impl/GroupConfigServiceImpl.java | 40 ++- .../server/service/impl/RetryServiceImpl.java | 31 +- .../callback/CallbackRetryResultActor.java | 3 +- .../dispatch/actor/exec/ExecUnitActor.java | 12 +- .../server/support/generator/id/Segment.java | 59 ++++ .../support/generator/id/SegmentBuffer.java | 132 ++++++++ .../generator/id/SegmentIdGenerator.java | 298 +++++++++++++++++- .../generator/id/SnowflakeIdGenerator.java | 11 +- .../model/request/GroupConfigRequestVO.java | 6 +- .../resources/mapper/GroupConfigMapper.xml | 5 +- .../mapper/RetryDeadLetterMapper.xml | 7 +- .../resources/mapper/RetryTaskLogMapper.xml | 4 +- .../main/resources/mapper/RetryTaskMapper.xml | 3 +- .../resources/mapper/SequenceAllocMapper.xml | 25 ++ 29 files changed, 775 insertions(+), 63 deletions(-) create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/mapper/SequenceAllocMapper.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/SequenceAlloc.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java create mode 100644 easy-retry-server/src/main/resources/mapper/SequenceAllocMapper.xml diff --git a/doc/sql/easy_retry.sql b/doc/sql/easy_retry.sql index 61b986d3..4d13b8d1 100644 --- a/doc/sql/easy_retry.sql +++ b/doc/sql/easy_retry.sql @@ -1,14 +1,15 @@ CREATE TABLE `group_config` ( - `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称', - `description` varchar(256) NOT NULL COMMENT '组描述', - `group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用', - `version` int(11) NOT NULL COMMENT '版本号', - `group_partition` int(11) NOT NULL COMMENT '分区', - `route_key` tinyint(4) NOT NULL COMMENT '路由策略', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称', + `description` varchar(256) NOT NULL COMMENT '组描述', + `group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用', + `version` int(11) NOT NULL COMMENT '版本号', + `group_partition` int(11) NOT NULL COMMENT '分区', + `route_key` tinyint(4) NOT NULL COMMENT '路由策略', + `id_generator_mode` tinyint(4) NOT NULL DEFAULT '1' COMMENT '唯一id生成模式 默认号段模式', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), UNIQUE KEY `uk_name` (`group_name`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='组配置' @@ -33,9 +34,10 @@ CREATE TABLE `notify_config` CREATE TABLE `retry_dead_letter_0` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一', `group_name` varchar(64) NOT NULL COMMENT '组名称', `scene_name` varchar(64) NOT NULL COMMENT '场景id', - `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', + `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', `biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号', `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `args_str` text NOT NULL COMMENT '执行方法参数', @@ -44,16 +46,18 @@ CREATE TABLE `retry_dead_letter_0` PRIMARY KEY (`id`), KEY `idx_group_name_scene_name` (`group_name`, `scene_name`), KEY `idx_idempotent_id` (`idempotent_id`), - KEY `idx_biz_no` (`biz_no`) + KEY `idx_biz_no` (`biz_no`), + UNIQUE KEY `uk_name_unique_id` (`group_name`, `unique_id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='重试死信队列' ; CREATE TABLE `retry_task_0` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一', `group_name` varchar(64) NOT NULL COMMENT '组名称', `scene_name` varchar(64) NOT NULL COMMENT '场景名称', - `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', + `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', `biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号', `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `args_str` text NOT NULL COMMENT '执行方法参数', @@ -64,18 +68,20 @@ CREATE TABLE `retry_task_0` `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), - KEY `idx_group_name_scene_name` (`group_name`, `scene_name`), - KEY `idx_retry_status` (`retry_status`), - KEY `idx_idempotent_id` (`idempotent_id`) + KEY `idx_group_name_scene_name` (`group_name`, `scene_name`), + KEY `idx_retry_status` (`retry_status`), + KEY `idx_idempotent_id` (`idempotent_id`), + UNIQUE KEY `uk_name_unique_id` (`group_name`, `unique_id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='重试表' ; CREATE TABLE `retry_task_log` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一', `group_name` varchar(64) NOT NULL COMMENT '组名称', `scene_name` varchar(64) NOT NULL COMMENT '场景名称', - `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', + `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', `biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号', `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `args_str` text NOT NULL COMMENT '执行方法参数', @@ -87,7 +93,8 @@ CREATE TABLE `retry_task_log` PRIMARY KEY (`id`), KEY `idx_group_name_scene_name` (`group_name`, `scene_name`), KEY `idx_retry_status` (`retry_status`), - KEY `idx_idempotent_id` (`idempotent_id`) + KEY `idx_idempotent_id` (`idempotent_id`), + UNIQUE KEY `uk_name_unique_id` (`group_name`, `unique_id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='重试日志表' ; @@ -112,15 +119,15 @@ CREATE TABLE `scene_config` CREATE TABLE `server_node` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `group_name` varchar(64) NOT NULL COMMENT '组名称', - `host_id` varchar(64) NOT NULL COMMENT '主机id', - `host_ip` varchar(64) NOT NULL COMMENT '机器ip', + `group_name` varchar(64) NOT NULL COMMENT '组名称', + `host_id` varchar(64) NOT NULL COMMENT '主机id', + `host_ip` varchar(64) NOT NULL COMMENT '机器ip', `context_path` varchar(256) NOT NULL DEFAULT '/' COMMENT '客户端上下文路径 server.servlet.context-path', `host_port` int(16) NOT NULL COMMENT '机器端口', - `expire_at` datetime NOT NULL COMMENT '过期时间', + `expire_at` datetime NOT NULL COMMENT '过期时间', `node_type` tinyint(4) NOT NULL COMMENT '节点类型 1、客户端 2、是服务端', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), UNIQUE KEY `uk_host_id_host_ip` (`host_id`,`host_ip`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='服务器节点' @@ -165,3 +172,14 @@ CREATE TABLE `system_user_permission` PRIMARY KEY (`id`), UNIQUE KEY `uk_group_name_system_user_id` (`group_name`, `system_user_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='系统用户权限表'; + +CREATE TABLE `sequence_alloc` +( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称', + `max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '最大id', + `step` int(11) NOT NULL DEFAULT '100' COMMENT '步长', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_group_name` (`group_name`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='号段模式序号ID分配表'; diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java index d81d4e11..f87964bb 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java @@ -88,6 +88,7 @@ public class RetryEndPoint { } executeRespDto.setIdempotentId(executeReqDto.getIdempotentId()); + executeRespDto.setUniqueId(executeReqDto.getUniqueId()); if (Objects.nonNull(retryerResultContext.getResult())) { executeRespDto.setResultJson(JsonUtil.toJsonString(retryerResultContext.getResult())); } diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryDTO.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryDTO.java index a9a6bcd2..8a0a7e15 100644 --- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryDTO.java +++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryDTO.java @@ -19,4 +19,6 @@ public class DispatchRetryDTO { private String idempotentId; @NotBlank(message = "executorName 不能为空") private String executorName; + @NotBlank(message = "uniqueId 不能为空") + private String uniqueId; } diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryResultDTO.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryResultDTO.java index ec545e34..dcc0e440 100644 --- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryResultDTO.java +++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/DispatchRetryResultDTO.java @@ -13,4 +13,5 @@ public class DispatchRetryResultDTO { private Integer statusCode; private String idempotentId; private String exceptionMsg; + private String uniqueId; } diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/RetryCallbackDTO.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/RetryCallbackDTO.java index 4fb50370..6abd28c4 100644 --- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/RetryCallbackDTO.java +++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/RetryCallbackDTO.java @@ -23,4 +23,6 @@ public class RetryCallbackDTO { private String executorName; @NotBlank(message = "retryStatus 不能为空") private Integer retryStatus; + @NotBlank(message = "uniqueId 不能为空") + private String uniqueId; } diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/IdGeneratorMode.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/IdGeneratorMode.java index 0df915f4..cb9ed3a8 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/IdGeneratorMode.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/IdGeneratorMode.java @@ -14,8 +14,8 @@ import lombok.Getter; @Getter public enum IdGeneratorMode { - SNOWFLAKE(1, "雪花算法模式"), - SEGMENT(2,"号段模式"); + SEGMENT(1,"号段模式"), + SNOWFLAKE(2, "雪花算法模式"); private final int mode; diff --git a/easy-retry-server/pom.xml b/easy-retry-server/pom.xml index dab5de11..af42f56e 100644 --- a/easy-retry-server/pom.xml +++ b/easy-retry-server/pom.xml @@ -126,6 +126,11 @@ spring-boot-configuration-processor true + + org.perf4j + perf4j + 0.9.16 + diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java index 69082993..182fb297 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java @@ -3,7 +3,6 @@ package com.aizuda.easy.retry.server.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; /** * 系统配置 @@ -41,4 +40,10 @@ public class SystemProperties { */ private int limiter = 10; + /** + * 号段模式下步长配置 + * 默认100 + */ + private int step = 100; + } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/mapper/SequenceAllocMapper.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/mapper/SequenceAllocMapper.java new file mode 100644 index 00000000..982c032f --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/mapper/SequenceAllocMapper.java @@ -0,0 +1,36 @@ +package com.aizuda.easy.retry.server.persistence.mybatis.mapper; + +import com.aizuda.easy.retry.server.persistence.mybatis.po.SequenceAlloc; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +/** + *

+ * 号段模式序号ID分配表 Mapper 接口 + *

+ * + * @author www.byteblogs.com + * @date 2023-05-05 + * @since 1.2.0 + */ +@Mapper +public interface SequenceAllocMapper extends BaseMapper { + + /** + * 更新业务类型下的最大id + * + * @param step 步长 + * @param groupName + * @return 更新结果 + */ + Integer updateMaxIdByCustomStep(@Param("step") Integer step, @Param("groupName") String groupName); + + /** + * 更新最大id + * + * @param groupName 组名称 + * @return 更新结果 + */ + Integer updateMaxId(@Param("groupName") String groupName); +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/GroupConfig.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/GroupConfig.java index e05132ef..debc43bc 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/GroupConfig.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/GroupConfig.java @@ -21,6 +21,8 @@ public class GroupConfig implements Serializable { private Integer routeKey; + private Integer idGeneratorMode; + private Integer version; private String description; @@ -32,4 +34,4 @@ public class GroupConfig implements Serializable { private static final long serialVersionUID = 1L; -} \ No newline at end of file +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java index 6c3261ef..3599b793 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryDeadLetter.java @@ -13,6 +13,8 @@ public class RetryDeadLetter implements Serializable { @TableId(value = "id", type = IdType.AUTO) private Long id; + private String uniqueId; + private String groupName; private String sceneName; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java index 50103395..460d754d 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTask.java @@ -15,6 +15,8 @@ public class RetryTask implements Serializable { @TableId(value = "id", type = IdType.AUTO) private Long id; + private String uniqueId; + private String groupName; private String sceneName; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java index 09a942fb..3121da53 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/RetryTaskLog.java @@ -13,6 +13,8 @@ public class RetryTaskLog implements Serializable { @TableId(value = "id", type = IdType.AUTO) private Long id; + private String uniqueId; + private String groupName; private String sceneName; diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/SequenceAlloc.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/SequenceAlloc.java new file mode 100644 index 00000000..3b780252 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/mybatis/po/SequenceAlloc.java @@ -0,0 +1,52 @@ +package com.aizuda.easy.retry.server.persistence.mybatis.po; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + *

+ * 号段模式序号ID分配表 + *

+ * + * @author www.byteblogs.com + * @since 2023-05-05 + */ +@Getter +@Setter +@TableName("sequence_alloc") +public class SequenceAlloc implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 主键 + */ + @TableId(value = "id", type = IdType.AUTO) + private Long id; + + /** + * 组名称 + */ + private String groupName; + + /** + * 最大id + */ + private Long maxId; + + /** + * 步长 + */ + private Integer step; + + /** + * 更新时间 + */ + private LocalDateTime updateDt; +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java new file mode 100644 index 00000000..22d14042 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/convert/RetryTaskLogConverter.java @@ -0,0 +1,18 @@ +package com.aizuda.easy.retry.server.service.convert; + +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +/** + * @author: shuguang.zhang + * @date : 2023-05-05 16:15 + */ +@Mapper +public interface RetryTaskLogConverter { + + RetryTaskLogConverter INSTANCE = Mappers.getMapper(RetryTaskLogConverter.class); + + RetryTaskLog toRetryTask(RetryTask retryTask); +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/GroupConfigServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/GroupConfigServiceImpl.java index 032f41e5..c5d922a6 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/GroupConfigServiceImpl.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/GroupConfigServiceImpl.java @@ -6,10 +6,12 @@ import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.GroupConfigMapper; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.NotifyConfigMapper; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.SceneConfigMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.SequenceAllocMapper; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig; import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; +import com.aizuda.easy.retry.server.persistence.mybatis.po.SequenceAlloc; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; import com.aizuda.easy.retry.server.support.handler.ClientRegisterHandler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -38,8 +40,11 @@ import java.util.*; import java.util.stream.Collectors; /** + * CRUD 组、场景、通知 + * * @author: www.byteblogs.com * @date : 2021-11-22 14:54 + * @since 1.0.0 */ @Service public class GroupConfigServiceImpl implements GroupConfigService { @@ -53,10 +58,14 @@ public class GroupConfigServiceImpl implements GroupConfigService { @Autowired private ServerNodeMapper serverNodeMapper; @Autowired + private SequenceAllocMapper sequenceAllocMapper; + @Autowired private ClientRegisterHandler clientRegisterHandler; @Value("${easy-retry.total-partition:32}") private Integer totalPartition; + @Value("${easy-retry.step:100}") + private Integer step; private GroupConfigConverter groupConfigConverter = new GroupConfigConverter(); private NotifyConfigConverter notifyConfigConverter = new NotifyConfigConverter(); @@ -71,15 +80,32 @@ public class GroupConfigServiceImpl implements GroupConfigService { .eq(GroupConfig::getGroupName, groupConfigRequestVO.getGroupName())) == 0, () -> new EasyRetryServerException("GroupName已经存在 {}", groupConfigRequestVO.getGroupName())); + // 保存组配置 doSaveGroupConfig(groupConfigRequestVO); + // 保存生成唯一id配置 + doSaveSequenceAlloc(groupConfigRequestVO); + + // 保存通知配置 doSaveNotifyConfig(groupConfigRequestVO); + // 保存场景配置 doSaveSceneConfig(groupConfigRequestVO.getSceneList(), groupConfigRequestVO.getGroupName()); return Boolean.TRUE; } + /** + * 保存序号生成规则配置失败 + * @param groupConfigRequestVO 组、场景、通知配置类 + */ + private void doSaveSequenceAlloc(final GroupConfigRequestVO groupConfigRequestVO) { + SequenceAlloc sequenceAlloc = new SequenceAlloc(); + sequenceAlloc.setGroupName(groupConfigRequestVO.getGroupName()); + sequenceAlloc.setStep(step); + sequenceAlloc.setUpdateDt(LocalDateTime.now()); + Assert.isTrue(1 == sequenceAllocMapper.insert(sequenceAlloc), () -> new EasyRetryServerException("failed to save sequence generation rule configuration [{}].", groupConfigRequestVO.getGroupName())); + } @Override @Transactional @@ -96,7 +122,7 @@ public class GroupConfigServiceImpl implements GroupConfigService { Assert.isTrue(1 == groupConfigMapper.update(groupConfig, new LambdaUpdateWrapper().eq(GroupConfig::getGroupName, groupConfigRequestVO.getGroupName())), - () -> new EasyRetryServerException("新增组异常异常 groupConfigVO[{}]", groupConfigRequestVO)); + () -> new EasyRetryServerException("exception occurred while adding group. groupConfigVO[{}]", groupConfigRequestVO)); doUpdateNotifyConfig(groupConfigRequestVO); @@ -182,7 +208,7 @@ public class GroupConfigServiceImpl implements GroupConfigService { notifyConfig.setCreateDt(LocalDateTime.now()); Assert.isTrue(1 == notifyConfigMapper.insert(notifyConfig), - () -> new EasyRetryServerException("插入通知配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig))); + () -> new EasyRetryServerException("failed to insert notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig))); } private void doSaveSceneConfig(List sceneList, String groupName) { @@ -195,7 +221,7 @@ public class GroupConfigServiceImpl implements GroupConfigService { sceneConfig.setGroupName(groupName); Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig), - () -> new EasyRetryServerException("插入场景配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig))); + () -> new EasyRetryServerException("failed to insert scene. sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig))); } } } @@ -218,7 +244,7 @@ public class GroupConfigServiceImpl implements GroupConfigService { } else if (sceneConfigVO.getIsDeleted() == 1) { Assert.isTrue( 1 == sceneConfigMapper.deleteById(oldSceneConfig.getId()), - () -> new EasyRetryServerException("删除场景失败 [{}]", sceneConfigVO.getSceneName())); + () -> new EasyRetryServerException("failed to delete scene. [{}]", sceneConfigVO.getSceneName())); } else { SceneConfig sceneConfig = sceneConfigConverter.convert(sceneConfigVO); sceneConfig.setGroupName(groupConfigRequestVO.getGroupName()); @@ -227,7 +253,7 @@ public class GroupConfigServiceImpl implements GroupConfigService { new LambdaQueryWrapper() .eq(SceneConfig::getGroupName, sceneConfig.getGroupName()) .eq(SceneConfig::getSceneName, sceneConfig.getSceneName())), - () -> new EasyRetryServerException("插入场景配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig))); + () -> new EasyRetryServerException("failed to update scene. sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig))); } iterator.remove(); @@ -253,7 +279,7 @@ public class GroupConfigServiceImpl implements GroupConfigService { } else if (Objects.nonNull(notifyConfigVO.getId()) && notifyConfigVO.getIsDeleted() == 1) { // delete Assert.isTrue(1 == notifyConfigMapper.deleteById(notifyConfigVO.getId()), - () -> new EasyRetryServerException("删除通知配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfigVO))); + () -> new EasyRetryServerException("failed to delete notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfigVO))); } else { // update Assert.isTrue(1 == notifyConfigMapper.update(notifyConfig, @@ -261,7 +287,7 @@ public class GroupConfigServiceImpl implements GroupConfigService { .eq(NotifyConfig::getId, notifyConfigVO.getId()) .eq(NotifyConfig::getGroupName, notifyConfig.getGroupName()) .eq(NotifyConfig::getNotifyScene, notifyConfig.getNotifyScene())), - () -> new EasyRetryServerException("更新通知配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig))); + () -> new EasyRetryServerException("failed to update notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig))); } } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java index a7e145a4..ddecc2f0 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/service/impl/RetryServiceImpl.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig; import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess; +import com.aizuda.easy.retry.server.support.generator.IdGenerator; import com.aizuda.easy.retry.server.support.strategy.WaitStrategies; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; @@ -53,6 +54,9 @@ public class RetryServiceImpl implements RetryService { @Qualifier("configAccessProcessor") private ConfigAccess configAccess; + @Autowired + private List idGeneratorList; + @Autowired private RetryTaskMapper retryTaskMapper; @Autowired @@ -61,11 +65,11 @@ public class RetryServiceImpl implements RetryService { @Transactional @Override public Boolean reportRetry(RetryTaskDTO retryTaskDTO) { - LogUtils.warn(log, "接收上报数据 [{}]", JsonUtil.toJsonString(retryTaskDTO)); + LogUtils.warn(log, "received report data [{}]", JsonUtil.toJsonString(retryTaskDTO)); SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName()); if (Objects.isNull(sceneConfig)) { - throw new EasyRetryServerException("上报数据失败, 未查到场景配置 [{}]", retryTaskDTO); + throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName()); } RequestDataHelper.setPartition(retryTaskDTO.getGroupName()); @@ -77,11 +81,12 @@ public class RetryServiceImpl implements RetryService { .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) ); if (0 < count) { - LogUtils.warn(log, "存在重试中的任务中断上报 [{}]", JsonUtil.toJsonString(retryTaskDTO)); + LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(retryTaskDTO)); return Boolean.TRUE; } RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTaskDTO); + retryTask.setUniqueId(getIdGenerator(retryTaskDTO.getGroupName())); retryTask.setCreateDt(LocalDateTime.now()); retryTask.setUpdateDt(LocalDateTime.now()); @@ -91,7 +96,7 @@ public class RetryServiceImpl implements RetryService { retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); - Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), () -> new EasyRetryServerException("上报数据失败")); + Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), () -> new EasyRetryServerException("failed to report data")); return Boolean.TRUE; } @@ -156,4 +161,22 @@ public class RetryServiceImpl implements RetryService { // 将已经重试完成的数据删除 retryTaskAccess.deleteByDelayLevel(groupId, RetryStatusEnum.FINISH.getStatus()); } + + /** + * 获取分布式id + * + * @param groupName 组id + * @return 分布式id + */ + private String getIdGenerator(String groupName) { + + GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName); + for (final IdGenerator idGenerator : idGeneratorList) { + if (idGenerator.supports(groupConfig.getIdGeneratorMode())) { + return idGenerator.idGenerator(groupName); + } + } + + throw new EasyRetryServerException("id generator mode not configured. [{}]", groupName); + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java index bf876e98..490dc149 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/callback/CallbackRetryResultActor.java @@ -59,12 +59,13 @@ public class CallbackRetryResultActor extends AbstractActor { retryCallbackDTO.setScene(retryTask.getSceneName()); retryCallbackDTO.setGroup(retryTask.getGroupName()); retryCallbackDTO.setExecutorName(retryTask.getExecutorName()); + retryCallbackDTO.setUniqueId(retryTask.getUniqueId()); // 设置header HttpHeaders requestHeaders = new HttpHeaders(); EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders(); easyRetryHeaders.setEasyRetry(Boolean.TRUE); - easyRetryHeaders.setEasyRetryId(IdUtil.simpleUUID()); + easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId()); requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders)); HttpEntity requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java index 0f546cbf..002a51da 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/dispatch/actor/exec/ExecUnitActor.java @@ -2,7 +2,6 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.exec; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.IdUtil; import com.aizuda.easy.retry.client.model.DispatchRetryDTO; import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; import com.aizuda.easy.retry.common.core.constant.SystemConstants; @@ -15,6 +14,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMappe import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask; import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter; import com.aizuda.easy.retry.server.support.IdempotentStrategy; import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.support.retry.RetryExecutor; @@ -62,13 +62,13 @@ public class ExecUnitActor extends AbstractActor { public Receive createReceive() { return receiveBuilder().match(RetryExecutor.class, retryExecutor -> { - RetryTaskLog retryTaskLog = new RetryTaskLog(); - retryTaskLog.setErrorMessage(StringUtils.EMPTY); - MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext(); RetryTask retryTask = context.getRetryTask(); ServerNode serverNode = context.getServerNode(); + RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask); + retryTaskLog.setErrorMessage(StringUtils.EMPTY); + try { if (Objects.nonNull(serverNode)) { @@ -90,7 +90,6 @@ public class ExecUnitActor extends AbstractActor { getContext().stop(getSelf()); // 记录重试日志 - BeanUtils.copyProperties(retryTask, retryTaskLog); retryTaskLog.setCreateDt(LocalDateTime.now()); retryTaskLog.setId(null); Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), @@ -113,12 +112,13 @@ public class ExecUnitActor extends AbstractActor { dispatchRetryDTO.setScene(retryTask.getSceneName()); dispatchRetryDTO.setExecutorName(retryTask.getExecutorName()); dispatchRetryDTO.setArgsStr(retryTask.getArgsStr()); + dispatchRetryDTO.setUniqueId(retryTask.getUniqueId()); // 设置header HttpHeaders requestHeaders = new HttpHeaders(); EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders(); easyRetryHeaders.setEasyRetry(Boolean.TRUE); - easyRetryHeaders.setEasyRetryId(IdUtil.simpleUUID()); + easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId()); requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders)); HttpEntity requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java new file mode 100644 index 00000000..0cf2626f --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/Segment.java @@ -0,0 +1,59 @@ +package com.aizuda.easy.retry.server.support.generator.id; + +import java.util.concurrent.atomic.AtomicLong; + +public class Segment { + private AtomicLong value = new AtomicLong(0); + private volatile long max; + private volatile int step; + private SegmentBuffer buffer; + + public Segment(SegmentBuffer buffer) { + this.buffer = buffer; + } + + public AtomicLong getValue() { + return value; + } + + public void setValue(AtomicLong value) { + this.value = value; + } + + public long getMax() { + return max; + } + + public void setMax(long max) { + this.max = max; + } + + public int getStep() { + return step; + } + + public void setStep(int step) { + this.step = step; + } + + public SegmentBuffer getBuffer() { + return buffer; + } + + public long getIdle() { + return this.getMax() - getValue().get(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("Segment("); + sb.append("value:"); + sb.append(value); + sb.append(",max:"); + sb.append(max); + sb.append(",step:"); + sb.append(step); + sb.append(")"); + return sb.toString(); + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java new file mode 100644 index 00000000..adf5cabc --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentBuffer.java @@ -0,0 +1,132 @@ +package com.aizuda.easy.retry.server.support.generator.id; + +import lombok.Data; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * 双buffer + */ +@Data +public class SegmentBuffer { + private String key; + private Segment[] segments; //双buffer + private volatile int currentPos; //当前的使用的segment的index + private volatile boolean nextReady; //下一个segment是否处于可切换状态 + private volatile boolean initOk; //是否初始化完成 + private final AtomicBoolean threadRunning; //线程是否在运行中 + private final ReadWriteLock lock; + + private volatile int step; + private volatile int minStep; + private volatile long updateTimestamp; + + public SegmentBuffer() { + segments = new Segment[]{new Segment(this), new Segment(this)}; + currentPos = 0; + nextReady = false; + initOk = false; + threadRunning = new AtomicBoolean(false); + lock = new ReentrantReadWriteLock(); + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public Segment[] getSegments() { + return segments; + } + + public Segment getCurrent() { + return segments[currentPos]; + } + + public int getCurrentPos() { + return currentPos; + } + + public int nextPos() { + return (currentPos + 1) % 2; + } + + public void switchPos() { + currentPos = nextPos(); + } + + public boolean isInitOk() { + return initOk; + } + + public void setInitOk(boolean initOk) { + this.initOk = initOk; + } + + public boolean isNextReady() { + return nextReady; + } + + public void setNextReady(boolean nextReady) { + this.nextReady = nextReady; + } + + public AtomicBoolean getThreadRunning() { + return threadRunning; + } + + public Lock rLock() { + return lock.readLock(); + } + + public Lock wLock() { + return lock.writeLock(); + } + + public int getStep() { + return step; + } + + public void setStep(int step) { + this.step = step; + } + + public int getMinStep() { + return minStep; + } + + public void setMinStep(int minStep) { + this.minStep = minStep; + } + + public long getUpdateTimestamp() { + return updateTimestamp; + } + + public void setUpdateTimestamp(long updateTimestamp) { + this.updateTimestamp = updateTimestamp; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("SegmentBuffer{"); + sb.append("key='").append(key).append('\''); + sb.append(", segments=").append(Arrays.toString(segments)); + sb.append(", currentPos=").append(currentPos); + sb.append(", nextReady=").append(nextReady); + sb.append(", initOk=").append(initOk); + sb.append(", threadRunning=").append(threadRunning); + sb.append(", step=").append(step); + sb.append(", minStep=").append(minStep); + sb.append(", updateTimestamp=").append(updateTimestamp); + sb.append('}'); + return sb.toString(); + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java index ab6f909c..d13c02f7 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java @@ -1,18 +1,307 @@ package com.aizuda.easy.retry.server.support.generator.id; import com.aizuda.easy.retry.common.core.enums.IdGeneratorMode; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.SequenceAllocMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.SequenceAlloc; +import com.aizuda.easy.retry.server.support.Lifecycle; import com.aizuda.easy.retry.server.support.generator.IdGenerator; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; +import org.perf4j.StopWatch; +import org.perf4j.slf4j.Slf4JStopWatch; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; /** - * 号段模式 + * 此算法来自美团的leaf号段模式 * * @author www.byteblogs.com * @date 2023-05-04 - * @since 2.0 + * @since 1.2.0 */ @Component -public class SegmentIdGenerator implements IdGenerator { +@Slf4j +public class SegmentIdGenerator implements IdGenerator, Lifecycle { + + /** + * IDCache未初始化成功时的异常码 + */ + private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1; + /** + * key不存在时的异常码 + */ + private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2; + /** + * SegmentBuffer中的两个Segment均未从DB中装载时的异常码 + */ + private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3; + /** + * 最大步长不超过100,0000 + */ + private static final int MAX_STEP = 1000000; + /** + * 一个Segment维持时间为15分钟 + */ + private static final long SEGMENT_DURATION = 15 * 60 * 1000L; + + private ThreadPoolExecutor service = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, + new LinkedBlockingDeque<>(5000), new UpdateThreadFactory()); + + private volatile boolean initOK = false; + private Map cache = new ConcurrentHashMap<>(); + + @Autowired + private SequenceAllocMapper sequenceAllocMapper; + + @Override + public void start() { + LogUtils.info(log, "SegmentIdGenerator start"); + // 确保加载到kv后才初始化成功 + updateCacheFromDb(); + initOK = true; + updateCacheFromDbAtEveryMinute(); + LogUtils.info(log, "SegmentIdGenerator start end"); + } + + @Override + public void close() { + LogUtils.info(log, "SegmentIdGenerator close"); + } + + public static class UpdateThreadFactory implements ThreadFactory { + + private static int threadInitNumber = 0; + + private static synchronized int nextThreadNum() { + return threadInitNumber++; + } + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "Thread-Segment-Update-" + nextThreadNum()); + } + } + + private void updateCacheFromDbAtEveryMinute() { + ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r); + t.setName("check-id-cache-thread"); + t.setDaemon(true); + return t; + }); + service.scheduleWithFixedDelay(() -> updateCacheFromDb(), 60, 60, TimeUnit.SECONDS); + } + + private void updateCacheFromDb() { + LogUtils.info(log, "update cache from db"); + StopWatch sw = new Slf4JStopWatch(); + try { + List sequenceAllocs = sequenceAllocMapper + .selectList(new LambdaQueryWrapper().select(SequenceAlloc::getGroupName)); + if (CollectionUtils.isEmpty(sequenceAllocs)) { + return; + } + + List dbTags = sequenceAllocs.stream().map(SequenceAlloc::getGroupName).collect(Collectors.toList()); + + List cacheTags = new ArrayList<>(cache.keySet()); + Set insertTagsSet = new HashSet<>(dbTags); + Set removeTagsSet = new HashSet<>(cacheTags); + //db中新加的tags灌进cache + for(int i = 0; i < cacheTags.size(); i++){ + String tmp = cacheTags.get(i); + if(insertTagsSet.contains(tmp)){ + insertTagsSet.remove(tmp); + } + } + for (String tag : insertTagsSet) { + SegmentBuffer buffer = new SegmentBuffer(); + buffer.setKey(tag); + Segment segment = buffer.getCurrent(); + segment.setValue(new AtomicLong(0)); + segment.setMax(0); + segment.setStep(0); + cache.put(tag, buffer); + LogUtils.info(log, "Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); + } + //cache中已失效的tags从cache删除 + for(int i = 0; i < dbTags.size(); i++){ + String tmp = dbTags.get(i); + if(removeTagsSet.contains(tmp)){ + removeTagsSet.remove(tmp); + } + } + for (String tag : removeTagsSet) { + cache.remove(tag); + LogUtils.info(log, "Remove tag {} from IdCache", tag); + } + } catch (Exception e) { + LogUtils.warn(log, "update cache from db exception", e); + } finally { + sw.stop("updateCacheFromDb"); + } + } + + public String get(final String key) { + if (!initOK) { + return Long.toString(EXCEPTION_ID_IDCACHE_INIT_FALSE); + } + + if (cache.containsKey(key)) { + SegmentBuffer buffer = cache.get(key); + if (!buffer.isInitOk()) { + synchronized (buffer) { + if (!buffer.isInitOk()) { + try { + updateSegmentFromDb(key, buffer.getCurrent()); + LogUtils.info(log, "Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent()); + buffer.setInitOk(true); + } catch (Exception e) { + LogUtils.warn(log, "Init buffer {} exception", buffer.getCurrent(), e); + } + } + } + } + return getIdFromSegmentBuffer(cache.get(key)); + } + return Long.toString(EXCEPTION_ID_KEY_NOT_EXISTS); + } + + public void updateSegmentFromDb(String key, Segment segment) { + StopWatch sw = new Slf4JStopWatch(); + SegmentBuffer buffer = segment.getBuffer(); + SequenceAlloc sequenceAlloc; + if (!buffer.isInitOk()) { + sequenceAllocMapper.updateMaxId(key); + sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper().eq(SequenceAlloc::getGroupName, key)); + buffer.setStep(sequenceAlloc.getStep()); + buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step + } else if (buffer.getUpdateTimestamp() == 0) { + sequenceAllocMapper.updateMaxId(key); + sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper().eq(SequenceAlloc::getGroupName, key)); + buffer.setUpdateTimestamp(System.currentTimeMillis()); + buffer.setStep(sequenceAlloc.getStep()); + buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step + } else { + long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); + int nextStep = buffer.getStep(); + if (duration < SEGMENT_DURATION) { + if (nextStep * 2 > MAX_STEP) { + //do nothing + } else { + nextStep = nextStep * 2; + } + } else if (duration < SEGMENT_DURATION * 2) { + //do nothing with nextStep + } else { + nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; + } + LogUtils.info(log,"leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); + + sequenceAllocMapper.updateMaxIdByCustomStep(nextStep, key); + sequenceAlloc = sequenceAllocMapper + .selectOne(new LambdaQueryWrapper().eq(SequenceAlloc::getGroupName, key)); + buffer.setUpdateTimestamp(System.currentTimeMillis()); + buffer.setStep(nextStep); + buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc的step为DB中的step + } + // must set value before set max + long value = sequenceAlloc.getMaxId() - buffer.getStep(); + segment.getValue().set(value); + segment.setMax(sequenceAlloc.getMaxId()); + segment.setStep(buffer.getStep()); + sw.stop("updateSegmentFromDb", key + " " + segment); + } + + public String getIdFromSegmentBuffer(final SegmentBuffer buffer) { + while (true) { + buffer.rLock().lock(); + try { + final Segment segment = buffer.getCurrent(); + if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) { + service.execute(() -> { + Segment next = buffer.getSegments()[buffer.nextPos()]; + boolean updateOk = false; + try { + updateSegmentFromDb(buffer.getKey(), next); + updateOk = true; + LogUtils.info(log,"update segment {} from db {}", buffer.getKey(), next); + } catch (Exception e) { + LogUtils.warn(log, buffer.getKey() + " updateSegmentFromDb exception", e); + } finally { + if (updateOk) { + buffer.wLock().lock(); + buffer.setNextReady(true); + buffer.getThreadRunning().set(false); + buffer.wLock().unlock(); + } else { + buffer.getThreadRunning().set(false); + } + } + }); + } + long value = segment.getValue().getAndIncrement(); + if (value < segment.getMax()) { + return Long.toString(value); + } + } finally { + buffer.rLock().unlock(); + } + waitAndSleep(buffer); + buffer.wLock().lock(); + try { + final Segment segment = buffer.getCurrent(); + long value = segment.getValue().getAndIncrement(); + if (value < segment.getMax()) { + return Long.toString(value); + } + if (buffer.isNextReady()) { + buffer.switchPos(); + buffer.setNextReady(false); + } else { + LogUtils.error(log,"Both two segments in {} are not ready!", buffer); + return Long.toString(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL); + } + } finally { + buffer.wLock().unlock(); + } + } + } + + private void waitAndSleep(SegmentBuffer buffer) { + int roll = 0; + while (buffer.getThreadRunning().get()) { + roll += 1; + if(roll > 10000) { + try { + TimeUnit.MILLISECONDS.sleep(2); + break; + } catch (InterruptedException e) { + LogUtils.warn(log,"Thread {} Interrupted",Thread.currentThread().getName()); + break; + } + } + } + } @Override public boolean supports(int mode) { @@ -21,6 +310,7 @@ public class SegmentIdGenerator implements IdGenerator { @Override public String idGenerator(String group) { - return null; + return get(group); } + } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SnowflakeIdGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SnowflakeIdGenerator.java index 32e498f7..ec94a220 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SnowflakeIdGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SnowflakeIdGenerator.java @@ -1,19 +1,24 @@ package com.aizuda.easy.retry.server.support.generator.id; +import cn.hutool.core.lang.Snowflake; +import cn.hutool.core.util.IdUtil; import com.aizuda.easy.retry.common.core.enums.IdGeneratorMode; import com.aizuda.easy.retry.server.support.generator.IdGenerator; import org.springframework.stereotype.Component; /** - * 雪花算法 + * 使用hutool自带的雪花算法生成id + * 若出现时间回拨问题则直接报错 {@link Snowflake#tilNextMillis(long)} * * @author www.byteblogs.com * @date 2023-05-04 - * @since 2.0 + * @since 1.2.0 */ @Component public class SnowflakeIdGenerator implements IdGenerator { + private static final Snowflake SNOWFLAKE = IdUtil.getSnowflake(); + @Override public boolean supports(int mode) { return IdGeneratorMode.SNOWFLAKE.getMode() == mode; @@ -21,6 +26,6 @@ public class SnowflakeIdGenerator implements IdGenerator { @Override public String idGenerator(String group) { - return null; + return SNOWFLAKE.nextIdStr(); } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/GroupConfigRequestVO.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/GroupConfigRequestVO.java index 304f4207..6b1de34f 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/GroupConfigRequestVO.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/web/model/request/GroupConfigRequestVO.java @@ -2,9 +2,6 @@ package com.aizuda.easy.retry.server.web.model.request; import lombok.Data; import org.hibernate.validator.constraints.NotBlank; -import org.hibernate.validator.constraints.NotEmpty; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.PutMapping; import javax.validation.constraints.Max; import javax.validation.constraints.Min; @@ -12,8 +9,11 @@ import javax.validation.constraints.NotNull; import java.util.List; /** + * 组、场景、通知配置类 + * * @author: www.byteblogs.com * @date : 2021-11-22 13:45 + * @since 1.0.0 */ @Data public class GroupConfigRequestVO { diff --git a/easy-retry-server/src/main/resources/mapper/GroupConfigMapper.xml b/easy-retry-server/src/main/resources/mapper/GroupConfigMapper.xml index eed3952e..7cbac039 100644 --- a/easy-retry-server/src/main/resources/mapper/GroupConfigMapper.xml +++ b/easy-retry-server/src/main/resources/mapper/GroupConfigMapper.xml @@ -8,12 +8,13 @@ + - id, `group_name`, group_status, version, `group_partition`, route_key, description, create_dt, update_dt + id, `group_name`, group_status, version, `group_partition`, route_key, id_generator_mode, description, create_dt, update_dt - \ No newline at end of file + diff --git a/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml b/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml index 0d36b7bf..3dd8acd4 100644 --- a/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml +++ b/easy-retry-server/src/main/resources/mapper/RetryDeadLetterMapper.xml @@ -3,6 +3,7 @@ + @@ -13,16 +14,16 @@ - id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt + id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt - insert into retry_dead_letter_${partition} (id, group_name, scene_name, + insert into retry_dead_letter_${partition} (id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt ) values - (#{retryDeadLetter.id,jdbcType=BIGINT}, #{retryDeadLetter.groupName,jdbcType=VARCHAR}, #{retryDeadLetter.sceneName,jdbcType=VARCHAR}, + (#{retryDeadLetter.id,jdbcType=BIGINT}, #{retryDeadLetter.uniqueId,jdbcType=VARCHAR}, #{retryDeadLetter.groupName,jdbcType=VARCHAR}, #{retryDeadLetter.sceneName,jdbcType=VARCHAR}, #{retryDeadLetter.idempotentId,jdbcType=VARCHAR}, #{retryDeadLetter.bizNo,jdbcType=VARCHAR}, #{retryDeadLetter.executorName,jdbcType=VARCHAR}, #{retryDeadLetter.argsStr,jdbcType=VARCHAR}, #{retryDeadLetter.extAttrs,jdbcType=VARCHAR}, #{retryDeadLetter.createDt,jdbcType=TIMESTAMP}) diff --git a/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml b/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml index 9109a58d..60f0d3f6 100644 --- a/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml +++ b/easy-retry-server/src/main/resources/mapper/RetryTaskLogMapper.xml @@ -3,6 +3,7 @@ + @@ -15,8 +16,7 @@ - id - , group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status, error_message, + id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status, error_message, create_dt diff --git a/easy-retry-server/src/main/resources/mapper/SequenceAllocMapper.xml b/easy-retry-server/src/main/resources/mapper/SequenceAllocMapper.xml new file mode 100644 index 00000000..4be216e4 --- /dev/null +++ b/easy-retry-server/src/main/resources/mapper/SequenceAllocMapper.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + UPDATE sequence_alloc + SET max_id = max_id + #{step}, update_dt = now() + WHERE group_name = #{groupName} + + + + UPDATE sequence_alloc + SET max_id = max_id + step, update_dt = now() + WHERE group_name = #{groupName} + +