feat: 1.2.0
1. 新增分布式id生成模块 2. 上报异常数据保存unique_id 3. 下发重试流量时透传unique_id
This commit is contained in:
parent
a973892d6c
commit
e92d3bef01
@ -1,14 +1,15 @@
|
||||
CREATE TABLE `group_config`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称',
|
||||
`description` varchar(256) NOT NULL COMMENT '组描述',
|
||||
`group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用',
|
||||
`version` int(11) NOT NULL COMMENT '版本号',
|
||||
`group_partition` int(11) NOT NULL COMMENT '分区',
|
||||
`route_key` tinyint(4) NOT NULL COMMENT '路由策略',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称',
|
||||
`description` varchar(256) NOT NULL COMMENT '组描述',
|
||||
`group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用',
|
||||
`version` int(11) NOT NULL COMMENT '版本号',
|
||||
`group_partition` int(11) NOT NULL COMMENT '分区',
|
||||
`route_key` tinyint(4) NOT NULL COMMENT '路由策略',
|
||||
`id_generator_mode` tinyint(4) NOT NULL DEFAULT '1' COMMENT '唯一id生成模式 默认号段模式',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uk_name` (`group_name`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='组配置'
|
||||
@ -33,9 +34,10 @@ CREATE TABLE `notify_config`
|
||||
CREATE TABLE `retry_dead_letter_0`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
|
||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||
`scene_name` varchar(64) NOT NULL COMMENT '场景id',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
`biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号',
|
||||
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
|
||||
`args_str` text NOT NULL COMMENT '执行方法参数',
|
||||
@ -44,16 +46,18 @@ CREATE TABLE `retry_dead_letter_0`
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_group_name_scene_name` (`group_name`, `scene_name`),
|
||||
KEY `idx_idempotent_id` (`idempotent_id`),
|
||||
KEY `idx_biz_no` (`biz_no`)
|
||||
KEY `idx_biz_no` (`biz_no`),
|
||||
UNIQUE KEY `uk_name_unique_id` (`group_name`, `unique_id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='重试死信队列'
|
||||
;
|
||||
|
||||
CREATE TABLE `retry_task_0`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
|
||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
`biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号',
|
||||
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
|
||||
`args_str` text NOT NULL COMMENT '执行方法参数',
|
||||
@ -64,18 +68,20 @@ CREATE TABLE `retry_task_0`
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_group_name_scene_name` (`group_name`, `scene_name`),
|
||||
KEY `idx_retry_status` (`retry_status`),
|
||||
KEY `idx_idempotent_id` (`idempotent_id`)
|
||||
KEY `idx_group_name_scene_name` (`group_name`, `scene_name`),
|
||||
KEY `idx_retry_status` (`retry_status`),
|
||||
KEY `idx_idempotent_id` (`idempotent_id`),
|
||||
UNIQUE KEY `uk_name_unique_id` (`group_name`, `unique_id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='重试表'
|
||||
;
|
||||
|
||||
CREATE TABLE `retry_task_log`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
|
||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
`biz_no` varchar(64) NOT NULL DEFAULT '' COMMENT '业务编号',
|
||||
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
|
||||
`args_str` text NOT NULL COMMENT '执行方法参数',
|
||||
@ -87,7 +93,8 @@ CREATE TABLE `retry_task_log`
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_group_name_scene_name` (`group_name`, `scene_name`),
|
||||
KEY `idx_retry_status` (`retry_status`),
|
||||
KEY `idx_idempotent_id` (`idempotent_id`)
|
||||
KEY `idx_idempotent_id` (`idempotent_id`),
|
||||
UNIQUE KEY `uk_name_unique_id` (`group_name`, `unique_id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='重试日志表'
|
||||
;
|
||||
|
||||
@ -112,15 +119,15 @@ CREATE TABLE `scene_config`
|
||||
CREATE TABLE `server_node`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||
`host_id` varchar(64) NOT NULL COMMENT '主机id',
|
||||
`host_ip` varchar(64) NOT NULL COMMENT '机器ip',
|
||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||
`host_id` varchar(64) NOT NULL COMMENT '主机id',
|
||||
`host_ip` varchar(64) NOT NULL COMMENT '机器ip',
|
||||
`context_path` varchar(256) NOT NULL DEFAULT '/' COMMENT '客户端上下文路径 server.servlet.context-path',
|
||||
`host_port` int(16) NOT NULL COMMENT '机器端口',
|
||||
`expire_at` datetime NOT NULL COMMENT '过期时间',
|
||||
`expire_at` datetime NOT NULL COMMENT '过期时间',
|
||||
`node_type` tinyint(4) NOT NULL COMMENT '节点类型 1、客户端 2、是服务端',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uk_host_id_host_ip` (`host_id`,`host_ip`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='服务器节点'
|
||||
@ -165,3 +172,14 @@ CREATE TABLE `system_user_permission`
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uk_group_name_system_user_id` (`group_name`, `system_user_id`) USING BTREE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='系统用户权限表';
|
||||
|
||||
CREATE TABLE `sequence_alloc`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称',
|
||||
`max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '最大id',
|
||||
`step` int(11) NOT NULL DEFAULT '100' COMMENT '步长',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uk_group_name` (`group_name`) USING BTREE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='号段模式序号ID分配表';
|
||||
|
@ -88,6 +88,7 @@ public class RetryEndPoint {
|
||||
}
|
||||
|
||||
executeRespDto.setIdempotentId(executeReqDto.getIdempotentId());
|
||||
executeRespDto.setUniqueId(executeReqDto.getUniqueId());
|
||||
if (Objects.nonNull(retryerResultContext.getResult())) {
|
||||
executeRespDto.setResultJson(JsonUtil.toJsonString(retryerResultContext.getResult()));
|
||||
}
|
||||
|
@ -19,4 +19,6 @@ public class DispatchRetryDTO {
|
||||
private String idempotentId;
|
||||
@NotBlank(message = "executorName 不能为空")
|
||||
private String executorName;
|
||||
@NotBlank(message = "uniqueId 不能为空")
|
||||
private String uniqueId;
|
||||
}
|
||||
|
@ -13,4 +13,5 @@ public class DispatchRetryResultDTO {
|
||||
private Integer statusCode;
|
||||
private String idempotentId;
|
||||
private String exceptionMsg;
|
||||
private String uniqueId;
|
||||
}
|
||||
|
@ -23,4 +23,6 @@ public class RetryCallbackDTO {
|
||||
private String executorName;
|
||||
@NotBlank(message = "retryStatus 不能为空")
|
||||
private Integer retryStatus;
|
||||
@NotBlank(message = "uniqueId 不能为空")
|
||||
private String uniqueId;
|
||||
}
|
||||
|
@ -14,8 +14,8 @@ import lombok.Getter;
|
||||
@Getter
|
||||
public enum IdGeneratorMode {
|
||||
|
||||
SNOWFLAKE(1, "雪花算法模式"),
|
||||
SEGMENT(2,"号段模式");
|
||||
SEGMENT(1,"号段模式"),
|
||||
SNOWFLAKE(2, "雪花算法模式");
|
||||
|
||||
private final int mode;
|
||||
|
||||
|
@ -126,6 +126,11 @@
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.perf4j</groupId>
|
||||
<artifactId>perf4j</artifactId>
|
||||
<version>0.9.16</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -3,7 +3,6 @@ package com.aizuda.easy.retry.server.config;
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 系统配置
|
||||
@ -41,4 +40,10 @@ public class SystemProperties {
|
||||
*/
|
||||
private int limiter = 10;
|
||||
|
||||
/**
|
||||
* 号段模式下步长配置
|
||||
* 默认100
|
||||
*/
|
||||
private int step = 100;
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,36 @@
|
||||
package com.aizuda.easy.retry.server.persistence.mybatis.mapper;
|
||||
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.SequenceAlloc;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 号段模式序号ID分配表 Mapper 接口
|
||||
* </p>
|
||||
*
|
||||
* @author www.byteblogs.com
|
||||
* @date 2023-05-05
|
||||
* @since 1.2.0
|
||||
*/
|
||||
@Mapper
|
||||
public interface SequenceAllocMapper extends BaseMapper<SequenceAlloc> {
|
||||
|
||||
/**
|
||||
* 更新业务类型下的最大id
|
||||
*
|
||||
* @param step 步长
|
||||
* @param groupName
|
||||
* @return 更新结果
|
||||
*/
|
||||
Integer updateMaxIdByCustomStep(@Param("step") Integer step, @Param("groupName") String groupName);
|
||||
|
||||
/**
|
||||
* 更新最大id
|
||||
*
|
||||
* @param groupName 组名称
|
||||
* @return 更新结果
|
||||
*/
|
||||
Integer updateMaxId(@Param("groupName") String groupName);
|
||||
}
|
@ -21,6 +21,8 @@ public class GroupConfig implements Serializable {
|
||||
|
||||
private Integer routeKey;
|
||||
|
||||
private Integer idGeneratorMode;
|
||||
|
||||
private Integer version;
|
||||
|
||||
private String description;
|
||||
@ -32,4 +34,4 @@ public class GroupConfig implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,8 @@ public class RetryDeadLetter implements Serializable {
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private Long id;
|
||||
|
||||
private String uniqueId;
|
||||
|
||||
private String groupName;
|
||||
|
||||
private String sceneName;
|
||||
|
@ -15,6 +15,8 @@ public class RetryTask implements Serializable {
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private Long id;
|
||||
|
||||
private String uniqueId;
|
||||
|
||||
private String groupName;
|
||||
|
||||
private String sceneName;
|
||||
|
@ -13,6 +13,8 @@ public class RetryTaskLog implements Serializable {
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private Long id;
|
||||
|
||||
private String uniqueId;
|
||||
|
||||
private String groupName;
|
||||
|
||||
private String sceneName;
|
||||
|
@ -0,0 +1,52 @@
|
||||
package com.aizuda.easy.retry.server.persistence.mybatis.po;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 号段模式序号ID分配表
|
||||
* </p>
|
||||
*
|
||||
* @author www.byteblogs.com
|
||||
* @since 2023-05-05
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@TableName("sequence_alloc")
|
||||
public class SequenceAlloc implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 组名称
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 最大id
|
||||
*/
|
||||
private Long maxId;
|
||||
|
||||
/**
|
||||
* 步长
|
||||
*/
|
||||
private Integer step;
|
||||
|
||||
/**
|
||||
* 更新时间
|
||||
*/
|
||||
private LocalDateTime updateDt;
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.aizuda.easy.retry.server.service.convert;
|
||||
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
||||
/**
|
||||
* @author: shuguang.zhang
|
||||
* @date : 2023-05-05 16:15
|
||||
*/
|
||||
@Mapper
|
||||
public interface RetryTaskLogConverter {
|
||||
|
||||
RetryTaskLogConverter INSTANCE = Mappers.getMapper(RetryTaskLogConverter.class);
|
||||
|
||||
RetryTaskLog toRetryTask(RetryTask retryTask);
|
||||
}
|
@ -6,10 +6,12 @@ import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.GroupConfigMapper;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.NotifyConfigMapper;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.SceneConfigMapper;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.SequenceAllocMapper;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.SequenceAlloc;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
|
||||
import com.aizuda.easy.retry.server.support.handler.ClientRegisterHandler;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
@ -38,8 +40,11 @@ import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* CRUD 组、场景、通知
|
||||
*
|
||||
* @author: www.byteblogs.com
|
||||
* @date : 2021-11-22 14:54
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Service
|
||||
public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
@ -53,10 +58,14 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
@Autowired
|
||||
private ServerNodeMapper serverNodeMapper;
|
||||
@Autowired
|
||||
private SequenceAllocMapper sequenceAllocMapper;
|
||||
@Autowired
|
||||
private ClientRegisterHandler clientRegisterHandler;
|
||||
|
||||
@Value("${easy-retry.total-partition:32}")
|
||||
private Integer totalPartition;
|
||||
@Value("${easy-retry.step:100}")
|
||||
private Integer step;
|
||||
|
||||
private GroupConfigConverter groupConfigConverter = new GroupConfigConverter();
|
||||
private NotifyConfigConverter notifyConfigConverter = new NotifyConfigConverter();
|
||||
@ -71,15 +80,32 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
.eq(GroupConfig::getGroupName, groupConfigRequestVO.getGroupName())) == 0,
|
||||
() -> new EasyRetryServerException("GroupName已经存在 {}", groupConfigRequestVO.getGroupName()));
|
||||
|
||||
// 保存组配置
|
||||
doSaveGroupConfig(groupConfigRequestVO);
|
||||
|
||||
// 保存生成唯一id配置
|
||||
doSaveSequenceAlloc(groupConfigRequestVO);
|
||||
|
||||
// 保存通知配置
|
||||
doSaveNotifyConfig(groupConfigRequestVO);
|
||||
|
||||
// 保存场景配置
|
||||
doSaveSceneConfig(groupConfigRequestVO.getSceneList(), groupConfigRequestVO.getGroupName());
|
||||
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存序号生成规则配置失败
|
||||
* @param groupConfigRequestVO 组、场景、通知配置类
|
||||
*/
|
||||
private void doSaveSequenceAlloc(final GroupConfigRequestVO groupConfigRequestVO) {
|
||||
SequenceAlloc sequenceAlloc = new SequenceAlloc();
|
||||
sequenceAlloc.setGroupName(groupConfigRequestVO.getGroupName());
|
||||
sequenceAlloc.setStep(step);
|
||||
sequenceAlloc.setUpdateDt(LocalDateTime.now());
|
||||
Assert.isTrue(1 == sequenceAllocMapper.insert(sequenceAlloc), () -> new EasyRetryServerException("failed to save sequence generation rule configuration [{}].", groupConfigRequestVO.getGroupName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
@ -96,7 +122,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
|
||||
Assert.isTrue(1 == groupConfigMapper.update(groupConfig,
|
||||
new LambdaUpdateWrapper<GroupConfig>().eq(GroupConfig::getGroupName, groupConfigRequestVO.getGroupName())),
|
||||
() -> new EasyRetryServerException("新增组异常异常 groupConfigVO[{}]", groupConfigRequestVO));
|
||||
() -> new EasyRetryServerException("exception occurred while adding group. groupConfigVO[{}]", groupConfigRequestVO));
|
||||
|
||||
doUpdateNotifyConfig(groupConfigRequestVO);
|
||||
|
||||
@ -182,7 +208,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
notifyConfig.setCreateDt(LocalDateTime.now());
|
||||
|
||||
Assert.isTrue(1 == notifyConfigMapper.insert(notifyConfig),
|
||||
() -> new EasyRetryServerException("插入通知配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig)));
|
||||
() -> new EasyRetryServerException("failed to insert notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig)));
|
||||
}
|
||||
|
||||
private void doSaveSceneConfig(List<GroupConfigRequestVO.SceneConfigVO> sceneList, String groupName) {
|
||||
@ -195,7 +221,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
sceneConfig.setGroupName(groupName);
|
||||
|
||||
Assert.isTrue(1 == sceneConfigMapper.insert(sceneConfig),
|
||||
() -> new EasyRetryServerException("插入场景配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig)));
|
||||
() -> new EasyRetryServerException("failed to insert scene. sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -218,7 +244,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
} else if (sceneConfigVO.getIsDeleted() == 1) {
|
||||
Assert.isTrue(
|
||||
1 == sceneConfigMapper.deleteById(oldSceneConfig.getId()),
|
||||
() -> new EasyRetryServerException("删除场景失败 [{}]", sceneConfigVO.getSceneName()));
|
||||
() -> new EasyRetryServerException("failed to delete scene. [{}]", sceneConfigVO.getSceneName()));
|
||||
} else {
|
||||
SceneConfig sceneConfig = sceneConfigConverter.convert(sceneConfigVO);
|
||||
sceneConfig.setGroupName(groupConfigRequestVO.getGroupName());
|
||||
@ -227,7 +253,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
new LambdaQueryWrapper<SceneConfig>()
|
||||
.eq(SceneConfig::getGroupName, sceneConfig.getGroupName())
|
||||
.eq(SceneConfig::getSceneName, sceneConfig.getSceneName())),
|
||||
() -> new EasyRetryServerException("插入场景配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig)));
|
||||
() -> new EasyRetryServerException("failed to update scene. sceneConfig:[{}]", JsonUtil.toJsonString(sceneConfig)));
|
||||
}
|
||||
|
||||
iterator.remove();
|
||||
@ -253,7 +279,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
} else if (Objects.nonNull(notifyConfigVO.getId()) && notifyConfigVO.getIsDeleted() == 1) {
|
||||
// delete
|
||||
Assert.isTrue(1 == notifyConfigMapper.deleteById(notifyConfigVO.getId()),
|
||||
() -> new EasyRetryServerException("删除通知配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfigVO)));
|
||||
() -> new EasyRetryServerException("failed to delete notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfigVO)));
|
||||
} else {
|
||||
// update
|
||||
Assert.isTrue(1 == notifyConfigMapper.update(notifyConfig,
|
||||
@ -261,7 +287,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
.eq(NotifyConfig::getId, notifyConfigVO.getId())
|
||||
.eq(NotifyConfig::getGroupName, notifyConfig.getGroupName())
|
||||
.eq(NotifyConfig::getNotifyScene, notifyConfig.getNotifyScene())),
|
||||
() -> new EasyRetryServerException("更新通知配置失败 sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig)));
|
||||
() -> new EasyRetryServerException("failed to update notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
|
||||
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
|
||||
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
|
||||
import com.aizuda.easy.retry.server.support.generator.IdGenerator;
|
||||
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||
@ -53,6 +54,9 @@ public class RetryServiceImpl implements RetryService {
|
||||
@Qualifier("configAccessProcessor")
|
||||
private ConfigAccess configAccess;
|
||||
|
||||
@Autowired
|
||||
private List<IdGenerator> idGeneratorList;
|
||||
|
||||
@Autowired
|
||||
private RetryTaskMapper retryTaskMapper;
|
||||
@Autowired
|
||||
@ -61,11 +65,11 @@ public class RetryServiceImpl implements RetryService {
|
||||
@Transactional
|
||||
@Override
|
||||
public Boolean reportRetry(RetryTaskDTO retryTaskDTO) {
|
||||
LogUtils.warn(log, "接收上报数据 [{}]", JsonUtil.toJsonString(retryTaskDTO));
|
||||
LogUtils.warn(log, "received report data [{}]", JsonUtil.toJsonString(retryTaskDTO));
|
||||
|
||||
SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
|
||||
if (Objects.isNull(sceneConfig)) {
|
||||
throw new EasyRetryServerException("上报数据失败, 未查到场景配置 [{}]", retryTaskDTO);
|
||||
throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
|
||||
}
|
||||
|
||||
RequestDataHelper.setPartition(retryTaskDTO.getGroupName());
|
||||
@ -77,11 +81,12 @@ public class RetryServiceImpl implements RetryService {
|
||||
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
|
||||
);
|
||||
if (0 < count) {
|
||||
LogUtils.warn(log, "存在重试中的任务中断上报 [{}]", JsonUtil.toJsonString(retryTaskDTO));
|
||||
LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(retryTaskDTO));
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTaskDTO);
|
||||
retryTask.setUniqueId(getIdGenerator(retryTaskDTO.getGroupName()));
|
||||
retryTask.setCreateDt(LocalDateTime.now());
|
||||
retryTask.setUpdateDt(LocalDateTime.now());
|
||||
|
||||
@ -91,7 +96,7 @@ public class RetryServiceImpl implements RetryService {
|
||||
|
||||
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
|
||||
|
||||
Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), () -> new EasyRetryServerException("上报数据失败"));
|
||||
Assert.isTrue(1 == retryTaskAccess.saveRetryTask(retryTask), () -> new EasyRetryServerException("failed to report data"));
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
@ -156,4 +161,22 @@ public class RetryServiceImpl implements RetryService {
|
||||
// 将已经重试完成的数据删除
|
||||
retryTaskAccess.deleteByDelayLevel(groupId, RetryStatusEnum.FINISH.getStatus());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取分布式id
|
||||
*
|
||||
* @param groupName 组id
|
||||
* @return 分布式id
|
||||
*/
|
||||
private String getIdGenerator(String groupName) {
|
||||
|
||||
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
|
||||
for (final IdGenerator idGenerator : idGeneratorList) {
|
||||
if (idGenerator.supports(groupConfig.getIdGeneratorMode())) {
|
||||
return idGenerator.idGenerator(groupName);
|
||||
}
|
||||
}
|
||||
|
||||
throw new EasyRetryServerException("id generator mode not configured. [{}]", groupName);
|
||||
}
|
||||
}
|
||||
|
@ -59,12 +59,13 @@ public class CallbackRetryResultActor extends AbstractActor {
|
||||
retryCallbackDTO.setScene(retryTask.getSceneName());
|
||||
retryCallbackDTO.setGroup(retryTask.getGroupName());
|
||||
retryCallbackDTO.setExecutorName(retryTask.getExecutorName());
|
||||
retryCallbackDTO.setUniqueId(retryTask.getUniqueId());
|
||||
|
||||
// 设置header
|
||||
HttpHeaders requestHeaders = new HttpHeaders();
|
||||
EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders();
|
||||
easyRetryHeaders.setEasyRetry(Boolean.TRUE);
|
||||
easyRetryHeaders.setEasyRetryId(IdUtil.simpleUUID());
|
||||
easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId());
|
||||
requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));
|
||||
|
||||
HttpEntity<RetryCallbackDTO> requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders);
|
||||
|
@ -2,7 +2,6 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.exec;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
|
||||
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
|
||||
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||
@ -15,6 +14,7 @@ import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMappe
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
|
||||
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
|
||||
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
|
||||
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
|
||||
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
|
||||
@ -62,13 +62,13 @@ public class ExecUnitActor extends AbstractActor {
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {
|
||||
|
||||
RetryTaskLog retryTaskLog = new RetryTaskLog();
|
||||
retryTaskLog.setErrorMessage(StringUtils.EMPTY);
|
||||
|
||||
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext();
|
||||
RetryTask retryTask = context.getRetryTask();
|
||||
ServerNode serverNode = context.getServerNode();
|
||||
|
||||
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
|
||||
retryTaskLog.setErrorMessage(StringUtils.EMPTY);
|
||||
|
||||
try {
|
||||
|
||||
if (Objects.nonNull(serverNode)) {
|
||||
@ -90,7 +90,6 @@ public class ExecUnitActor extends AbstractActor {
|
||||
getContext().stop(getSelf());
|
||||
|
||||
// 记录重试日志
|
||||
BeanUtils.copyProperties(retryTask, retryTaskLog);
|
||||
retryTaskLog.setCreateDt(LocalDateTime.now());
|
||||
retryTaskLog.setId(null);
|
||||
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
|
||||
@ -113,12 +112,13 @@ public class ExecUnitActor extends AbstractActor {
|
||||
dispatchRetryDTO.setScene(retryTask.getSceneName());
|
||||
dispatchRetryDTO.setExecutorName(retryTask.getExecutorName());
|
||||
dispatchRetryDTO.setArgsStr(retryTask.getArgsStr());
|
||||
dispatchRetryDTO.setUniqueId(retryTask.getUniqueId());
|
||||
|
||||
// 设置header
|
||||
HttpHeaders requestHeaders = new HttpHeaders();
|
||||
EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders();
|
||||
easyRetryHeaders.setEasyRetry(Boolean.TRUE);
|
||||
easyRetryHeaders.setEasyRetryId(IdUtil.simpleUUID());
|
||||
easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId());
|
||||
requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));
|
||||
|
||||
HttpEntity<DispatchRetryDTO> requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders);
|
||||
|
@ -0,0 +1,59 @@
|
||||
package com.aizuda.easy.retry.server.support.generator.id;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class Segment {
|
||||
private AtomicLong value = new AtomicLong(0);
|
||||
private volatile long max;
|
||||
private volatile int step;
|
||||
private SegmentBuffer buffer;
|
||||
|
||||
public Segment(SegmentBuffer buffer) {
|
||||
this.buffer = buffer;
|
||||
}
|
||||
|
||||
public AtomicLong getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(AtomicLong value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public long getMax() {
|
||||
return max;
|
||||
}
|
||||
|
||||
public void setMax(long max) {
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
public int getStep() {
|
||||
return step;
|
||||
}
|
||||
|
||||
public void setStep(int step) {
|
||||
this.step = step;
|
||||
}
|
||||
|
||||
public SegmentBuffer getBuffer() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public long getIdle() {
|
||||
return this.getMax() - getValue().get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("Segment(");
|
||||
sb.append("value:");
|
||||
sb.append(value);
|
||||
sb.append(",max:");
|
||||
sb.append(max);
|
||||
sb.append(",step:");
|
||||
sb.append(step);
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,132 @@
|
||||
package com.aizuda.easy.retry.server.support.generator.id;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* 双buffer
|
||||
*/
|
||||
@Data
|
||||
public class SegmentBuffer {
|
||||
private String key;
|
||||
private Segment[] segments; //双buffer
|
||||
private volatile int currentPos; //当前的使用的segment的index
|
||||
private volatile boolean nextReady; //下一个segment是否处于可切换状态
|
||||
private volatile boolean initOk; //是否初始化完成
|
||||
private final AtomicBoolean threadRunning; //线程是否在运行中
|
||||
private final ReadWriteLock lock;
|
||||
|
||||
private volatile int step;
|
||||
private volatile int minStep;
|
||||
private volatile long updateTimestamp;
|
||||
|
||||
public SegmentBuffer() {
|
||||
segments = new Segment[]{new Segment(this), new Segment(this)};
|
||||
currentPos = 0;
|
||||
nextReady = false;
|
||||
initOk = false;
|
||||
threadRunning = new AtomicBoolean(false);
|
||||
lock = new ReentrantReadWriteLock();
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public void setKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public Segment[] getSegments() {
|
||||
return segments;
|
||||
}
|
||||
|
||||
public Segment getCurrent() {
|
||||
return segments[currentPos];
|
||||
}
|
||||
|
||||
public int getCurrentPos() {
|
||||
return currentPos;
|
||||
}
|
||||
|
||||
public int nextPos() {
|
||||
return (currentPos + 1) % 2;
|
||||
}
|
||||
|
||||
public void switchPos() {
|
||||
currentPos = nextPos();
|
||||
}
|
||||
|
||||
public boolean isInitOk() {
|
||||
return initOk;
|
||||
}
|
||||
|
||||
public void setInitOk(boolean initOk) {
|
||||
this.initOk = initOk;
|
||||
}
|
||||
|
||||
public boolean isNextReady() {
|
||||
return nextReady;
|
||||
}
|
||||
|
||||
public void setNextReady(boolean nextReady) {
|
||||
this.nextReady = nextReady;
|
||||
}
|
||||
|
||||
public AtomicBoolean getThreadRunning() {
|
||||
return threadRunning;
|
||||
}
|
||||
|
||||
public Lock rLock() {
|
||||
return lock.readLock();
|
||||
}
|
||||
|
||||
public Lock wLock() {
|
||||
return lock.writeLock();
|
||||
}
|
||||
|
||||
public int getStep() {
|
||||
return step;
|
||||
}
|
||||
|
||||
public void setStep(int step) {
|
||||
this.step = step;
|
||||
}
|
||||
|
||||
public int getMinStep() {
|
||||
return minStep;
|
||||
}
|
||||
|
||||
public void setMinStep(int minStep) {
|
||||
this.minStep = minStep;
|
||||
}
|
||||
|
||||
public long getUpdateTimestamp() {
|
||||
return updateTimestamp;
|
||||
}
|
||||
|
||||
public void setUpdateTimestamp(long updateTimestamp) {
|
||||
this.updateTimestamp = updateTimestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("SegmentBuffer{");
|
||||
sb.append("key='").append(key).append('\'');
|
||||
sb.append(", segments=").append(Arrays.toString(segments));
|
||||
sb.append(", currentPos=").append(currentPos);
|
||||
sb.append(", nextReady=").append(nextReady);
|
||||
sb.append(", initOk=").append(initOk);
|
||||
sb.append(", threadRunning=").append(threadRunning);
|
||||
sb.append(", step=").append(step);
|
||||
sb.append(", minStep=").append(minStep);
|
||||
sb.append(", updateTimestamp=").append(updateTimestamp);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -1,18 +1,307 @@
|
||||
package com.aizuda.easy.retry.server.support.generator.id;
|
||||
|
||||
import com.aizuda.easy.retry.common.core.enums.IdGeneratorMode;
|
||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.SequenceAllocMapper;
|
||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.SequenceAlloc;
|
||||
import com.aizuda.easy.retry.server.support.Lifecycle;
|
||||
import com.aizuda.easy.retry.server.support.generator.IdGenerator;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.perf4j.StopWatch;
|
||||
import org.perf4j.slf4j.Slf4JStopWatch;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 号段模式
|
||||
* 此算法来自美团的leaf号段模式
|
||||
*
|
||||
* @author www.byteblogs.com
|
||||
* @date 2023-05-04
|
||||
* @since 2.0
|
||||
* @since 1.2.0
|
||||
*/
|
||||
@Component
|
||||
public class SegmentIdGenerator implements IdGenerator {
|
||||
@Slf4j
|
||||
public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
||||
|
||||
/**
|
||||
* IDCache未初始化成功时的异常码
|
||||
*/
|
||||
private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
|
||||
/**
|
||||
* key不存在时的异常码
|
||||
*/
|
||||
private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
|
||||
/**
|
||||
* SegmentBuffer中的两个Segment均未从DB中装载时的异常码
|
||||
*/
|
||||
private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
|
||||
/**
|
||||
* 最大步长不超过100,0000
|
||||
*/
|
||||
private static final int MAX_STEP = 1000000;
|
||||
/**
|
||||
* 一个Segment维持时间为15分钟
|
||||
*/
|
||||
private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
|
||||
|
||||
private ThreadPoolExecutor service = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
|
||||
new LinkedBlockingDeque<>(5000), new UpdateThreadFactory());
|
||||
|
||||
private volatile boolean initOK = false;
|
||||
private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<>();
|
||||
|
||||
@Autowired
|
||||
private SequenceAllocMapper sequenceAllocMapper;
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
LogUtils.info(log, "SegmentIdGenerator start");
|
||||
// 确保加载到kv后才初始化成功
|
||||
updateCacheFromDb();
|
||||
initOK = true;
|
||||
updateCacheFromDbAtEveryMinute();
|
||||
LogUtils.info(log, "SegmentIdGenerator start end");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
LogUtils.info(log, "SegmentIdGenerator close");
|
||||
}
|
||||
|
||||
public static class UpdateThreadFactory implements ThreadFactory {
|
||||
|
||||
private static int threadInitNumber = 0;
|
||||
|
||||
private static synchronized int nextThreadNum() {
|
||||
return threadInitNumber++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
|
||||
}
|
||||
}
|
||||
|
||||
private void updateCacheFromDbAtEveryMinute() {
|
||||
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r);
|
||||
t.setName("check-id-cache-thread");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
service.scheduleWithFixedDelay(() -> updateCacheFromDb(), 60, 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void updateCacheFromDb() {
|
||||
LogUtils.info(log, "update cache from db");
|
||||
StopWatch sw = new Slf4JStopWatch();
|
||||
try {
|
||||
List<SequenceAlloc> sequenceAllocs = sequenceAllocMapper
|
||||
.selectList(new LambdaQueryWrapper<SequenceAlloc>().select(SequenceAlloc::getGroupName));
|
||||
if (CollectionUtils.isEmpty(sequenceAllocs)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> dbTags = sequenceAllocs.stream().map(SequenceAlloc::getGroupName).collect(Collectors.toList());
|
||||
|
||||
List<String> cacheTags = new ArrayList<>(cache.keySet());
|
||||
Set<String> insertTagsSet = new HashSet<>(dbTags);
|
||||
Set<String> removeTagsSet = new HashSet<>(cacheTags);
|
||||
//db中新加的tags灌进cache
|
||||
for(int i = 0; i < cacheTags.size(); i++){
|
||||
String tmp = cacheTags.get(i);
|
||||
if(insertTagsSet.contains(tmp)){
|
||||
insertTagsSet.remove(tmp);
|
||||
}
|
||||
}
|
||||
for (String tag : insertTagsSet) {
|
||||
SegmentBuffer buffer = new SegmentBuffer();
|
||||
buffer.setKey(tag);
|
||||
Segment segment = buffer.getCurrent();
|
||||
segment.setValue(new AtomicLong(0));
|
||||
segment.setMax(0);
|
||||
segment.setStep(0);
|
||||
cache.put(tag, buffer);
|
||||
LogUtils.info(log, "Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
|
||||
}
|
||||
//cache中已失效的tags从cache删除
|
||||
for(int i = 0; i < dbTags.size(); i++){
|
||||
String tmp = dbTags.get(i);
|
||||
if(removeTagsSet.contains(tmp)){
|
||||
removeTagsSet.remove(tmp);
|
||||
}
|
||||
}
|
||||
for (String tag : removeTagsSet) {
|
||||
cache.remove(tag);
|
||||
LogUtils.info(log, "Remove tag {} from IdCache", tag);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogUtils.warn(log, "update cache from db exception", e);
|
||||
} finally {
|
||||
sw.stop("updateCacheFromDb");
|
||||
}
|
||||
}
|
||||
|
||||
public String get(final String key) {
|
||||
if (!initOK) {
|
||||
return Long.toString(EXCEPTION_ID_IDCACHE_INIT_FALSE);
|
||||
}
|
||||
|
||||
if (cache.containsKey(key)) {
|
||||
SegmentBuffer buffer = cache.get(key);
|
||||
if (!buffer.isInitOk()) {
|
||||
synchronized (buffer) {
|
||||
if (!buffer.isInitOk()) {
|
||||
try {
|
||||
updateSegmentFromDb(key, buffer.getCurrent());
|
||||
LogUtils.info(log, "Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
|
||||
buffer.setInitOk(true);
|
||||
} catch (Exception e) {
|
||||
LogUtils.warn(log, "Init buffer {} exception", buffer.getCurrent(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return getIdFromSegmentBuffer(cache.get(key));
|
||||
}
|
||||
return Long.toString(EXCEPTION_ID_KEY_NOT_EXISTS);
|
||||
}
|
||||
|
||||
public void updateSegmentFromDb(String key, Segment segment) {
|
||||
StopWatch sw = new Slf4JStopWatch();
|
||||
SegmentBuffer buffer = segment.getBuffer();
|
||||
SequenceAlloc sequenceAlloc;
|
||||
if (!buffer.isInitOk()) {
|
||||
sequenceAllocMapper.updateMaxId(key);
|
||||
sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key));
|
||||
buffer.setStep(sequenceAlloc.getStep());
|
||||
buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step
|
||||
} else if (buffer.getUpdateTimestamp() == 0) {
|
||||
sequenceAllocMapper.updateMaxId(key);
|
||||
sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key));
|
||||
buffer.setUpdateTimestamp(System.currentTimeMillis());
|
||||
buffer.setStep(sequenceAlloc.getStep());
|
||||
buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step
|
||||
} else {
|
||||
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
|
||||
int nextStep = buffer.getStep();
|
||||
if (duration < SEGMENT_DURATION) {
|
||||
if (nextStep * 2 > MAX_STEP) {
|
||||
//do nothing
|
||||
} else {
|
||||
nextStep = nextStep * 2;
|
||||
}
|
||||
} else if (duration < SEGMENT_DURATION * 2) {
|
||||
//do nothing with nextStep
|
||||
} else {
|
||||
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
|
||||
}
|
||||
LogUtils.info(log,"leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
|
||||
|
||||
sequenceAllocMapper.updateMaxIdByCustomStep(nextStep, key);
|
||||
sequenceAlloc = sequenceAllocMapper
|
||||
.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key));
|
||||
buffer.setUpdateTimestamp(System.currentTimeMillis());
|
||||
buffer.setStep(nextStep);
|
||||
buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc的step为DB中的step
|
||||
}
|
||||
// must set value before set max
|
||||
long value = sequenceAlloc.getMaxId() - buffer.getStep();
|
||||
segment.getValue().set(value);
|
||||
segment.setMax(sequenceAlloc.getMaxId());
|
||||
segment.setStep(buffer.getStep());
|
||||
sw.stop("updateSegmentFromDb", key + " " + segment);
|
||||
}
|
||||
|
||||
public String getIdFromSegmentBuffer(final SegmentBuffer buffer) {
|
||||
while (true) {
|
||||
buffer.rLock().lock();
|
||||
try {
|
||||
final Segment segment = buffer.getCurrent();
|
||||
if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
|
||||
service.execute(() -> {
|
||||
Segment next = buffer.getSegments()[buffer.nextPos()];
|
||||
boolean updateOk = false;
|
||||
try {
|
||||
updateSegmentFromDb(buffer.getKey(), next);
|
||||
updateOk = true;
|
||||
LogUtils.info(log,"update segment {} from db {}", buffer.getKey(), next);
|
||||
} catch (Exception e) {
|
||||
LogUtils.warn(log, buffer.getKey() + " updateSegmentFromDb exception", e);
|
||||
} finally {
|
||||
if (updateOk) {
|
||||
buffer.wLock().lock();
|
||||
buffer.setNextReady(true);
|
||||
buffer.getThreadRunning().set(false);
|
||||
buffer.wLock().unlock();
|
||||
} else {
|
||||
buffer.getThreadRunning().set(false);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
long value = segment.getValue().getAndIncrement();
|
||||
if (value < segment.getMax()) {
|
||||
return Long.toString(value);
|
||||
}
|
||||
} finally {
|
||||
buffer.rLock().unlock();
|
||||
}
|
||||
waitAndSleep(buffer);
|
||||
buffer.wLock().lock();
|
||||
try {
|
||||
final Segment segment = buffer.getCurrent();
|
||||
long value = segment.getValue().getAndIncrement();
|
||||
if (value < segment.getMax()) {
|
||||
return Long.toString(value);
|
||||
}
|
||||
if (buffer.isNextReady()) {
|
||||
buffer.switchPos();
|
||||
buffer.setNextReady(false);
|
||||
} else {
|
||||
LogUtils.error(log,"Both two segments in {} are not ready!", buffer);
|
||||
return Long.toString(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL);
|
||||
}
|
||||
} finally {
|
||||
buffer.wLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void waitAndSleep(SegmentBuffer buffer) {
|
||||
int roll = 0;
|
||||
while (buffer.getThreadRunning().get()) {
|
||||
roll += 1;
|
||||
if(roll > 10000) {
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(2);
|
||||
break;
|
||||
} catch (InterruptedException e) {
|
||||
LogUtils.warn(log,"Thread {} Interrupted",Thread.currentThread().getName());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supports(int mode) {
|
||||
@ -21,6 +310,7 @@ public class SegmentIdGenerator implements IdGenerator {
|
||||
|
||||
@Override
|
||||
public String idGenerator(String group) {
|
||||
return null;
|
||||
return get(group);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,19 +1,24 @@
|
||||
package com.aizuda.easy.retry.server.support.generator.id;
|
||||
|
||||
import cn.hutool.core.lang.Snowflake;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.aizuda.easy.retry.common.core.enums.IdGeneratorMode;
|
||||
import com.aizuda.easy.retry.server.support.generator.IdGenerator;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 雪花算法
|
||||
* 使用hutool自带的雪花算法生成id
|
||||
* 若出现时间回拨问题则直接报错 {@link Snowflake#tilNextMillis(long)}
|
||||
*
|
||||
* @author www.byteblogs.com
|
||||
* @date 2023-05-04
|
||||
* @since 2.0
|
||||
* @since 1.2.0
|
||||
*/
|
||||
@Component
|
||||
public class SnowflakeIdGenerator implements IdGenerator {
|
||||
|
||||
private static final Snowflake SNOWFLAKE = IdUtil.getSnowflake();
|
||||
|
||||
@Override
|
||||
public boolean supports(int mode) {
|
||||
return IdGeneratorMode.SNOWFLAKE.getMode() == mode;
|
||||
@ -21,6 +26,6 @@ public class SnowflakeIdGenerator implements IdGenerator {
|
||||
|
||||
@Override
|
||||
public String idGenerator(String group) {
|
||||
return null;
|
||||
return SNOWFLAKE.nextIdStr();
|
||||
}
|
||||
}
|
||||
|
@ -2,9 +2,6 @@ package com.aizuda.easy.retry.server.web.model.request;
|
||||
|
||||
import lombok.Data;
|
||||
import org.hibernate.validator.constraints.NotBlank;
|
||||
import org.hibernate.validator.constraints.NotEmpty;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.PutMapping;
|
||||
|
||||
import javax.validation.constraints.Max;
|
||||
import javax.validation.constraints.Min;
|
||||
@ -12,8 +9,11 @@ import javax.validation.constraints.NotNull;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 组、场景、通知配置类
|
||||
*
|
||||
* @author: www.byteblogs.com
|
||||
* @date : 2021-11-22 13:45
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Data
|
||||
public class GroupConfigRequestVO {
|
||||
|
@ -8,12 +8,13 @@
|
||||
<result column="version" jdbcType="TINYINT" property="version" />
|
||||
<result column="group_partition" jdbcType="TINYINT" property="groupPartition" />
|
||||
<result column="route_key" jdbcType="TINYINT" property="routeKey" />
|
||||
<result column="id_generator_mode" jdbcType="TINYINT" property="idGeneratorMode" />
|
||||
<result column="description" jdbcType="TINYINT" property="description" />
|
||||
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt" />
|
||||
<result column="update_dt" jdbcType="TIMESTAMP" property="updateDt" />
|
||||
</resultMap>
|
||||
<sql id="Base_Column_List">
|
||||
id, `group_name`, group_status, version, `group_partition`, route_key, description, create_dt, update_dt
|
||||
id, `group_name`, group_status, version, `group_partition`, route_key, id_generator_mode, description, create_dt, update_dt
|
||||
</sql>
|
||||
|
||||
</mapper>
|
||||
</mapper>
|
||||
|
@ -3,6 +3,7 @@
|
||||
<mapper namespace="com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryDeadLetterMapper">
|
||||
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.server.persistence.mybatis.po.RetryDeadLetter">
|
||||
<id column="id" jdbcType="BIGINT" property="id" />
|
||||
<result column="unique_id" jdbcType="VARCHAR" property="uniqueId"/>
|
||||
<result column="group_name" jdbcType="VARCHAR" property="groupName" />
|
||||
<result column="scene_name" jdbcType="VARCHAR" property="sceneName" />
|
||||
<result column="idempotent_id" jdbcType="VARCHAR" property="idempotentId" />
|
||||
@ -13,16 +14,16 @@
|
||||
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt" />
|
||||
</resultMap>
|
||||
<sql id="Base_Column_List">
|
||||
id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt
|
||||
id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt
|
||||
</sql>
|
||||
<insert id="insertBatch">
|
||||
insert into retry_dead_letter_${partition} (id, group_name, scene_name,
|
||||
insert into retry_dead_letter_${partition} (id, unique_id, group_name, scene_name,
|
||||
idempotent_id, biz_no, executor_name, args_str,
|
||||
ext_attrs, create_dt
|
||||
)
|
||||
values
|
||||
<foreach collection="retryDeadLetters" item="retryDeadLetter" separator=",">
|
||||
(#{retryDeadLetter.id,jdbcType=BIGINT}, #{retryDeadLetter.groupName,jdbcType=VARCHAR}, #{retryDeadLetter.sceneName,jdbcType=VARCHAR},
|
||||
(#{retryDeadLetter.id,jdbcType=BIGINT}, #{retryDeadLetter.uniqueId,jdbcType=VARCHAR}, #{retryDeadLetter.groupName,jdbcType=VARCHAR}, #{retryDeadLetter.sceneName,jdbcType=VARCHAR},
|
||||
#{retryDeadLetter.idempotentId,jdbcType=VARCHAR}, #{retryDeadLetter.bizNo,jdbcType=VARCHAR}, #{retryDeadLetter.executorName,jdbcType=VARCHAR}, #{retryDeadLetter.argsStr,jdbcType=VARCHAR},
|
||||
#{retryDeadLetter.extAttrs,jdbcType=VARCHAR}, #{retryDeadLetter.createDt,jdbcType=TIMESTAMP})
|
||||
</foreach>
|
||||
|
@ -3,6 +3,7 @@
|
||||
<mapper namespace="com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper">
|
||||
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog">
|
||||
<id column="id" jdbcType="BIGINT" property="id"/>
|
||||
<result column="unique_id" jdbcType="VARCHAR" property="uniqueId"/>
|
||||
<result column="group_name" jdbcType="VARCHAR" property="groupName"/>
|
||||
<result column="scene_name" jdbcType="VARCHAR" property="sceneName"/>
|
||||
<result column="idempotent_id" jdbcType="VARCHAR" property="idempotentId"/>
|
||||
@ -15,8 +16,7 @@
|
||||
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt"/>
|
||||
</resultMap>
|
||||
<sql id="Base_Column_List">
|
||||
id
|
||||
, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status, error_message,
|
||||
id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status, error_message,
|
||||
create_dt
|
||||
</sql>
|
||||
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
|
||||
|
@ -3,6 +3,7 @@
|
||||
<mapper namespace="com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper">
|
||||
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask">
|
||||
<id column="id" jdbcType="BIGINT" property="id" />
|
||||
<result column="unique_id" jdbcType="VARCHAR" property="uniqueId"/>
|
||||
<result column="group_name" jdbcType="VARCHAR" property="groupName" />
|
||||
<result column="scene_name" jdbcType="VARCHAR" property="sceneName" />
|
||||
<result column="idempotent_id" jdbcType="VARCHAR" property="idempotentId" />
|
||||
@ -17,7 +18,7 @@
|
||||
<result column="update_dt" jdbcType="TIMESTAMP" property="updateDt" />
|
||||
</resultMap>
|
||||
<sql id="Base_Column_List">
|
||||
id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, next_trigger_at, retry_count, retry_status,
|
||||
id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, next_trigger_at, retry_count, retry_status,
|
||||
create_dt, update_dt
|
||||
</sql>
|
||||
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
|
||||
|
@ -0,0 +1,25 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.aizuda.easy.retry.server.persistence.mybatis.mapper.SequenceAllocMapper">
|
||||
|
||||
<!-- 通用查询映射结果 -->
|
||||
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.server.persistence.mybatis.po.SequenceAlloc">
|
||||
<id column="id" property="id" />
|
||||
<result column="group_name" property="groupName" />
|
||||
<result column="max_id" property="maxId" />
|
||||
<result column="step" property="step" />
|
||||
<result column="update_dt" property="updateDt" />
|
||||
</resultMap>
|
||||
|
||||
<update id="updateMaxIdByCustomStep">
|
||||
UPDATE sequence_alloc
|
||||
SET max_id = max_id + #{step}, update_dt = now()
|
||||
WHERE group_name = #{groupName}
|
||||
</update>
|
||||
|
||||
<update id="updateMaxId">
|
||||
UPDATE sequence_alloc
|
||||
SET max_id = max_id + step, update_dt = now()
|
||||
WHERE group_name = #{groupName}
|
||||
</update>
|
||||
</mapper>
|
Loading…
Reference in New Issue
Block a user