fix:2.5.0

1. 修复分布式id生成器异常
2. 修复页面节点数据量没有区分空间
3. 重试任务场景没有添加空间区分问题
4. 更改前端的slogan
This commit is contained in:
byteblogs168 2023-12-03 20:00:06 +08:00
parent 8da0d1b993
commit a8021e6690
27 changed files with 101 additions and 294 deletions

View File

@ -13,26 +13,6 @@ import java.util.List;
public interface RetryTaskLogMapper extends BaseMapper<RetryTaskLog> {
RetryTaskLog selectByPrimaryKey(Long id);
long countTaskTotal();
long countTaskByRetryStatus(@Param("retryStatus") Integer retryStatus);
List<SceneQuantityRankResponseDO> rankSceneQuantity(@Param("groupName") String groupName,
@Param("startTime") LocalDateTime startTime,
@Param("endTime")LocalDateTime endTime
);
@Deprecated
List<DispatchQuantityResponseDO> lineDispatchQuantity(@Param("groupName") String groupName,
@Param("retryStatus") Integer retryStatus,
@Param("type") String type,
@Param("startTime")LocalDateTime startTime,
@Param("endTime")LocalDateTime endTime
);
int batchInsert(List<RetryTaskLog> list);
List<DashboardRetryResponseDO> retrySummaryRetryTaskLogList(@Param("from") LocalDateTime from, @Param("to") LocalDateTime to);

View File

@ -20,11 +20,11 @@ public interface SequenceAllocMapper extends BaseMapper<SequenceAlloc> {
/**
* 更新业务类型下的最大id
*
* @param step 步长
* @param step 步长
* @param groupName
* @return 更新结果
*/
Integer updateMaxIdByCustomStep(@Param("step") Integer step, @Param("groupName") String groupName);
Integer updateMaxIdByCustomStep(@Param("step") Integer step, @Param("groupName") String groupName, @Param("namespaceId") String namespaceId);
/**
* 更新最大id
@ -32,5 +32,5 @@ public interface SequenceAllocMapper extends BaseMapper<SequenceAlloc> {
* @param groupName 组名称
* @return 更新结果
*/
Integer updateMaxId(@Param("groupName") String groupName);
Integer updateMaxId(@Param("groupName") String groupName, @Param("namespaceId") String namespaceId);
}

View File

@ -17,6 +17,6 @@ public interface ServerNodeMapper extends BaseMapper<ServerNode> {
int deleteByExpireAt(@Param("endTime") LocalDateTime endTime);
List<ActivePodQuantityResponseDO> countActivePod(@Param("namespaceId") String namespaceId);
List<ActivePodQuantityResponseDO> countActivePod(@Param("namespaceIds") List<String> namespaceIds);
}

View File

@ -4,6 +4,7 @@
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="unique_id" jdbcType="VARCHAR" property="uniqueId"/>
<result column="namespace_id" jdbcType="VARCHAR" property="namespaceId"/>
<result column="group_name" jdbcType="VARCHAR" property="groupName"/>
<result column="scene_name" jdbcType="VARCHAR" property="sceneName"/>
<result column="idempotent_id" jdbcType="VARCHAR" property="idempotentId"/>
@ -18,81 +19,17 @@
<sql id="Base_Column_List">
id
, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status,
create_dt, task_type
create_dt, task_type, namespace_id
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from retry_task_log
where id = #{id,jdbcType=BIGINT}
</select>
<select id="countTaskTotal" resultType="java.lang.Long">
select count(*)
from retry_task_log
</select>
<select id="countTaskByRetryStatus" resultType="java.lang.Long">
select count(*)
from retry_task_log
where retry_status = #{retryStatus}
</select>
<select id="rankSceneQuantity"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.SceneQuantityRankResponseDO">
select group_name, scene_name, count(*) as total
from retry_task_log
<where>
<if test="groupName != '' and groupName != null">
and group_name = #{groupName}
</if>
and create_dt >= #{startTime} and create_dt &lt;= #{endTime}
</where>
group by group_name, scene_name
order by total desc
</select>
<select id="lineDispatchQuantity"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.DispatchQuantityResponseDO">
select
<choose>
<when test="type == 'day'">
DATE_FORMAT(create_dt,'%H')
</when>
<when test="type == 'week'">
DATE_FORMAT(create_dt,'%Y-%m-%d')
</when>
<when test="type =='month'">
DATE_FORMAT(create_dt,'%Y-%m-%d')
</when>
<when test="type == 'year'">
DATE_FORMAT(create_dt,'%Y-%m')
</when>
<otherwise>
DATE_FORMAT(create_dt,'%Y-%m-%d')
</otherwise>
</choose>
as createDt, count(*) as total
from retry_task_log
<where>
<if test="groupName != '' and groupName != null">
group_name = #{groupName}
</if>
<if test="retryStatus!=null ">
and retry_status = #{retryStatus}
</if>
and create_dt >= #{startTime} and create_dt &lt;= #{endTime}
</where>
group by createDt
order by total desc
</select>
<!-- 定义批量新增的 SQL 映射 -->
<insert id="batchInsert" parameterType="java.util.List">
INSERT INTO retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, task_type, create_dt)
INSERT INTO retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name,
args_str, ext_attrs, task_type, create_dt, namespace_id)
VALUES
<foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.taskType}, #{item.createDt})
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId},
#{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs},
#{item.taskType}, #{item.createDt}, #{namespaceId})
</foreach>
</insert>
</mapper>

View File

@ -14,12 +14,12 @@
<update id="updateMaxIdByCustomStep">
UPDATE sequence_alloc
SET max_id = max_id + #{step}, update_dt = now()
WHERE group_name = #{groupName}
WHERE group_name = #{groupName} and namespace_id = #{namespaceId}
</update>
<update id="updateMaxId">
UPDATE sequence_alloc
SET max_id = max_id + step, update_dt = now()
WHERE group_name = #{groupName}
WHERE group_name = #{groupName} and namespace_id = #{namespaceId}
</update>
</mapper>

View File

@ -35,7 +35,10 @@
</delete>
<select id="countActivePod" resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.ActivePodQuantityResponseDO">
SELECT node_type as nodeType, count(*) as total
from server_node
from server_node where namespace_id in
<foreach collection="namespaceIds" item="namespaceId" separator="," open="(" close=")">
#{namespaceId}
</foreach>
GROUP BY node_type
</select>
</mapper>

View File

@ -4,6 +4,7 @@
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="unique_id" jdbcType="VARCHAR" property="uniqueId"/>
<result column="namespace_id" jdbcType="VARCHAR" property="namespaceId"/>
<result column="group_name" jdbcType="VARCHAR" property="groupName"/>
<result column="scene_name" jdbcType="VARCHAR" property="sceneName"/>
<result column="idempotent_id" jdbcType="VARCHAR" property="idempotentId"/>
@ -18,81 +19,18 @@
<sql id="Base_Column_List">
id
, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status,
create_dt, task_type
create_dt, task_type, namespace_id
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from retry_task_log
where id = #{id,jdbcType=BIGINT}
</select>
<select id="countTaskTotal" resultType="java.lang.Long">
select count(*)
from retry_task_log
</select>
<select id="countTaskByRetryStatus" resultType="java.lang.Long">
select count(*)
from retry_task_log
where retry_status = #{retryStatus}
</select>
<select id="rankSceneQuantity"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.SceneQuantityRankResponseDO">
select group_name, scene_name, count(*) as total
from retry_task_log
<where>
<if test="groupName != '' and groupName != null">
and group_name = #{groupName}
</if>
and create_dt >= #{startTime} and create_dt &lt;= #{endTime}
</where>
group by group_name, scene_name
order by total desc
</select>
<select id="lineDispatchQuantity"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.DispatchQuantityResponseDO">
select
<choose>
<when test="type == 'day'">
DATE_FORMAT(create_dt,'%H')
</when>
<when test="type == 'week'">
DATE_FORMAT(create_dt,'%Y-%m-%d')
</when>
<when test="type =='month'">
DATE_FORMAT(create_dt,'%Y-%m-%d')
</when>
<when test="type == 'year'">
DATE_FORMAT(create_dt,'%Y-%m')
</when>
<otherwise>
DATE_FORMAT(create_dt,'%Y-%m-%d')
</otherwise>
</choose>
as createDt, count(*) as total
from retry_task_log
<where>
<if test="groupName != '' and groupName != null">
group_name = #{groupName}
</if>
<if test="retryStatus!=null ">
and retry_status = #{retryStatus}
</if>
and create_dt >= #{startTime} and create_dt &lt;= #{endTime}
</where>
group by createDt
order by total desc
</select>
<!-- 定义批量新增的 SQL 映射 -->
<insert id="batchInsert" parameterType="java.util.List">
INSERT INTO retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, task_type, create_dt)
INSERT INTO retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name,
args_str, ext_attrs, task_type, create_dt, namespace_id)
VALUES
<foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.taskType}, #{item.createDt})
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId},
#{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs},
#{item.taskType}, #{item.createDt}, #{namespaceId})
</foreach>
</insert>

View File

@ -14,12 +14,12 @@
<update id="updateMaxIdByCustomStep">
UPDATE sequence_alloc
SET max_id = max_id + #{step}, update_dt = now()
WHERE group_name = #{groupName}
WHERE group_name = #{groupName} and namespace_id = #{namespaceId}
</update>
<update id="updateMaxId">
UPDATE sequence_alloc
SET max_id = max_id + step, update_dt = now()
WHERE group_name = #{groupName}
WHERE group_name = #{groupName} and namespace_id = #{namespaceId}
</update>
</mapper>

View File

@ -37,7 +37,10 @@
<select id="countActivePod"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.ActivePodQuantityResponseDO">
SELECT node_type as nodeType, count(*) as total
from server_node
from server_node where namespace_id in
<foreach collection="namespaceIds" item="namespaceId" separator="," open="(" close=")">
#{namespaceId}
</foreach>
GROUP BY node_type
</select>
</mapper>

View File

@ -4,6 +4,7 @@
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="unique_id" jdbcType="VARCHAR" property="uniqueId"/>
<result column="namespace_id" jdbcType="VARCHAR" property="namespaceId"/>
<result column="group_name" jdbcType="VARCHAR" property="groupName"/>
<result column="scene_name" jdbcType="VARCHAR" property="sceneName"/>
<result column="idempotent_id" jdbcType="VARCHAR" property="idempotentId"/>
@ -18,81 +19,17 @@
<sql id="Base_Column_List">
id
, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status,
create_dt, task_type
create_dt, task_type, namespace_id
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from retry_task_log
where id = #{id,jdbcType=BIGINT}
</select>
<select id="countTaskTotal" resultType="java.lang.Long">
select count(*)
from retry_task_log
</select>
<select id="countTaskByRetryStatus" resultType="java.lang.Long">
select count(*)
from retry_task_log
where retry_status = #{retryStatus}
</select>
<select id="rankSceneQuantity"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.SceneQuantityRankResponseDO">
select group_name, scene_name, count(*) as total
from retry_task_log
<where>
<if test="groupName != '' and groupName != null">
and group_name = #{groupName}
</if>
and create_dt >= #{startTime} and create_dt &lt;= #{endTime}
</where>
group by group_name, scene_name
order by total desc
</select>
<select id="lineDispatchQuantity"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.DispatchQuantityResponseDO">
select
<choose>
<when test="type == 'day'">
TO_CHAR(create_dt,'%H')
</when>
<when test="type == 'week'">
TO_CHAR(create_dt,'%Y-%m-%d')
</when>
<when test="type =='month'">
TO_CHAR(create_dt,'%Y-%m-%d')
</when>
<when test="type == 'year'">
TO_CHAR(create_dt,'%Y-%m')
</when>
<otherwise>
TO_CHAR(create_dt,'%Y-%m-%d')
</otherwise>
</choose>
as createDt, count(*) as total
from retry_task_log
<where>
<if test="groupName != '' and groupName != null">
group_name = #{groupName}
</if>
<if test="retryStatus!=null ">
and retry_status = #{retryStatus}
</if>
and create_dt >= #{startTime} and create_dt &lt;= #{endTime}
</where>
group by createDt
order by total desc
</select>
<!-- 定义批量新增的 SQL 映射 -->
<insert id="batchInsert" parameterType="java.util.List">
INSERT INTO retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, task_type, create_dt)
INSERT INTO retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name,
args_str, ext_attrs, task_type, create_dt, namespace_id)
VALUES
<foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.taskType}, #{item.createDt})
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId},
#{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs},
#{item.taskType}, #{item.createDt}, #{namespaceId})
</foreach>
</insert>
</mapper>

View File

@ -14,12 +14,12 @@
<update id="updateMaxIdByCustomStep">
UPDATE sequence_alloc
SET max_id = max_id + #{step}, update_dt = now()
WHERE group_name = #{groupName}
WHERE group_name = #{groupName} and namespace_id = #{namespaceId}
</update>
<update id="updateMaxId">
UPDATE sequence_alloc
SET max_id = max_id + step, update_dt = now()
WHERE group_name = #{groupName}
WHERE group_name = #{groupName} and namespace_id = #{namespaceId}
</update>
</mapper>

View File

@ -34,7 +34,10 @@
<select id="countActivePod"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.ActivePodQuantityResponseDO">
SELECT node_type as nodeType, count(*) as total
from server_node
from server_node where namespace_id in
<foreach collection="namespaceIds" item="namespaceId" separator="," open="(" close=")">
#{namespaceId}
</foreach>
GROUP BY node_type
</select>
</mapper>

View File

@ -10,6 +10,7 @@ import com.aizuda.easy.retry.server.common.dto.AlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -61,7 +62,7 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
// 循环发送消息
waitSendAlarmInfos.forEach((key, list) -> {
List<NotifyConfigInfo> notifyConfigsList = notifyConfig.get(key);
List<NotifyConfigInfo> notifyConfigsList = notifyConfig.getOrDefault(key, Lists.newArrayList());
for (A alarmDTO : list) {
sendAlarm(notifyConfigsList, alarmDTO);
}

View File

@ -39,7 +39,7 @@ public class CacheConsumerGroup implements Lifecycle {
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName, String namespaceId) {
LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName);
// LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName);
CACHE.put(groupName, namespaceId);
}

View File

@ -20,9 +20,10 @@ public interface IdGenerator {
/**
* 获取分布式id
*
* @param group
* @param groupName
* @param namespaceId 命名空间
* @return id
*/
String idGenerator(String group);
String idGenerator(String groupName, String namespaceId);
}

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.common.generator.id;
import cn.hutool.core.lang.Pair;
import lombok.Data;
import java.util.Arrays;
@ -15,7 +16,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
@Data
public class SegmentBuffer {
private String key;
private Pair<String/*groupName*/, String/*namespaceId*/> key;
private Segment[] segments; //双buffer
private volatile int currentPos; //当前的使用的segment的index
private volatile boolean nextReady; //下一个segment是否处于可切换状态
@ -36,11 +37,11 @@ public class SegmentBuffer {
lock = new ReentrantReadWriteLock();
}
public String getKey() {
public Pair<String, String> getKey() {
return key;
}
public void setKey(String key) {
public void setKey(Pair<String, String> key) {
this.key = key;
}

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.common.generator.id;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorMode;
@ -14,8 +15,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -68,7 +67,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
new LinkedBlockingDeque<>(5000), new UpdateThreadFactory());
private volatile boolean initOK = false;
private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<>();
private Map<Pair<String, String>, SegmentBuffer> cache = new ConcurrentHashMap<>();
@Autowired
private SequenceAllocMapper sequenceAllocMapper;
@ -121,19 +120,21 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
return;
}
List<String> dbTags = sequenceAllocs.stream().map(SequenceAlloc::getGroupName).collect(Collectors.toList());
List<Pair<String, String>> dbTags = sequenceAllocs.stream()
.map(sequenceAlloc -> Pair.of(sequenceAlloc.getGroupName(), sequenceAlloc.getNamespaceId()))
.collect(Collectors.toList());
List<String> cacheTags = new ArrayList<>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
List<Pair<String, String>> cacheTags = new ArrayList<>(cache.keySet());
Set<Pair<String, String>> insertTagsSet = new HashSet<>(dbTags);
Set<Pair<String, String>> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌进cache
for (int i = 0; i < cacheTags.size(); i++) {
String tmp = cacheTags.get(i);
Pair<String, String> tmp = cacheTags.get(i);
if (insertTagsSet.contains(tmp)) {
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
for (Pair<String, String> tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
@ -145,12 +146,12 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
}
//cache中已失效的tags从cache删除
for (int i = 0; i < dbTags.size(); i++) {
String tmp = dbTags.get(i);
Pair<String, String> tmp = dbTags.get(i);
if (removeTagsSet.contains(tmp)) {
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
for (Pair<String, String> tag : removeTagsSet) {
cache.remove(tag);
LogUtils.info(log, "Remove tag {} from IdCache", tag);
}
@ -161,11 +162,12 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
}
}
public String get(final String key) {
public String get(final String groupName, String namespaceId) {
if (!initOK) {
return Long.toString(EXCEPTION_ID_IDCACHE_INIT_FALSE);
}
Pair<String, String> key = Pair.of(groupName, namespaceId);
if (cache.containsKey(key)) {
SegmentBuffer buffer = cache.get(key);
if (!buffer.isInitOk()) {
@ -186,16 +188,16 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
return Long.toString(EXCEPTION_ID_KEY_NOT_EXISTS);
}
public void updateSegmentFromDb(String key, Segment segment) {
public void updateSegmentFromDb(Pair<String, String> key, Segment segment) {
SegmentBuffer buffer = segment.getBuffer();
SequenceAlloc sequenceAlloc;
if (!buffer.isInitOk()) {
sequenceAllocMapper.updateMaxId(key);
sequenceAllocMapper.updateMaxId(key.getKey(), key.getValue());
sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key));
buffer.setStep(sequenceAlloc.getStep());
buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step
} else if (buffer.getUpdateTimestamp() == 0) {
sequenceAllocMapper.updateMaxId(key);
sequenceAllocMapper.updateMaxId(key.getKey(), key.getValue());
sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key));
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(sequenceAlloc.getStep());
@ -216,7 +218,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
}
LogUtils.info(log, "leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep);
sequenceAllocMapper.updateMaxIdByCustomStep(nextStep, key);
sequenceAllocMapper.updateMaxIdByCustomStep(nextStep, key.getKey(), key.getValue());
sequenceAlloc = sequenceAllocMapper
.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key));
buffer.setUpdateTimestamp(System.currentTimeMillis());
@ -307,9 +309,9 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
}
@Override
public String idGenerator(String group) {
public String idGenerator(String groupName, String namespaceId) {
String time = DateUtils.format(DateUtils.toNowLocalDateTime(), DateUtils.PURE_DATETIME_MS_PATTERN);
return time.concat(get(group));
return time.concat(get(groupName, namespaceId));
}
}

View File

@ -24,7 +24,7 @@ public class SnowflakeIdGenerator implements IdGenerator {
}
@Override
public String idGenerator(String group) {
public String idGenerator(String group, String namespaceId) {
return SNOWFLAKE.nextIdStr();
}
}

View File

@ -221,7 +221,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
GroupConfig groupConfig = accessTemplate.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId);
for (final IdGenerator idGenerator : idGeneratorList) {
if (idGenerator.supports(groupConfig.getIdGeneratorMode())) {
return idGenerator.idGenerator(groupName);
return idGenerator.idGenerator(groupName, namespaceId);
}
}

View File

@ -2,7 +2,6 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.lang.Tuple;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -117,7 +116,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, final ScanTask scanTask) {
// 批次查询场景
Map<String, SceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks);
Map<String, SceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks, scanTask);
List<RetryTask> waitUpdateRetryTasks = new ArrayList<>();
for (PartitionTask task : partitionTasks) {
@ -148,12 +147,14 @@ public abstract class AbstractScanGroup extends AbstractActor {
}
private Map<String, SceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks) {
private Map<String, SceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
Set<String> sceneNameSet = partitionTasks.stream()
.map(partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()).collect(Collectors.toSet());
List<SceneConfig> sceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<SceneConfig>()
.select(SceneConfig::getBackOff, SceneConfig::getTriggerInterval, SceneConfig::getSceneName)
.eq(SceneConfig::getNamespaceId, scanTask.getNamespaceId())
.eq(SceneConfig::getGroupName, scanTask.getGroupName())
.in(SceneConfig::getSceneName, sceneNameSet));
return sceneConfigs.stream()
.collect(Collectors.toMap(SceneConfig::getSceneName, i -> i));

View File

@ -22,26 +22,20 @@ public abstract class AbstractTimerTask implements TimerTask {
protected String uniqueId;
protected String namespaceId;
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
@Override
public void run(Timeout timeout) throws Exception {
executor.execute(() -> {
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName,
uniqueId, namespaceId);
try {
doRun(timeout);
} catch (Exception e) {
log.error("重试任务执行失败 groupName:[{}] uniqueId:[{}] namespaceId:[{}]", groupName, uniqueId, namespaceId, e);
} finally {
// 先清除时间轮的缓存
RetryTimerWheel.clearCache(Pair.of(groupName, namespaceId), uniqueId);
}
});
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName,
uniqueId, namespaceId);
try {
doRun(timeout);
} catch (Exception e) {
log.error("重试任务执行失败 groupName:[{}] uniqueId:[{}] namespaceId:[{}]", groupName, uniqueId, namespaceId, e);
} finally {
// 先清除时间轮的缓存
RetryTimerWheel.clearCache(Pair.of(groupName, namespaceId), uniqueId);
}
}
protected abstract void doRun(Timeout timeout);

View File

@ -28,6 +28,7 @@ public class RetryTimerTask extends AbstractTimerTask {
this.context = context;
super.groupName = context.getGroupName();
super.uniqueId = context.getUniqueId();
super.namespaceId = context.getNamespaceId();
}
@Override

View File

@ -11,6 +11,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@ -24,13 +26,15 @@ public class RetryTimerWheel implements Lifecycle {
private static final int TICK_DURATION = 500;
private static final String THREAD_NAME_PREFIX = "retry-task-timer-wheel-";
private static HashedWheelTimer timer = null;
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
private static final TimerIdempotent idempotent = new TimerIdempotent();
@Override
public void start() {
timer = new HashedWheelTimer(
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS);
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS, 512,
true, -1, executor);
timer.start();
}

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.support.generator.id;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.generator.id.IdGenerator;
@ -42,7 +43,7 @@ public class SegmentIdGeneratorTest {
public void run() {
count.countDown();
String id = idGenerator.idGenerator("example_group");
String id = idGenerator.idGenerator("example_group", SystemConstants.DEFAULT_NAMESPACE);
LogUtils.info(log, "id:[{}]", id);
if (Long.parseLong(id) < 0) {
throw new EasyRetryServerException("exception id");

View File

@ -80,7 +80,7 @@ public class DashBoardServiceImpl implements DashBoardService {
// 重试任务柱状图
dashboardCardResponseVO.setRetryTaskBarList(RetrySummaryResponseVOConverter.INSTANCE.toRetryTaskBar(retrySummaryMapper.retryTaskBarList(namespaceId)));
// 在线Pods
List<ActivePodQuantityResponseDO> activePodQuantityDO = serverNodeMapper.countActivePod(namespaceId);
List<ActivePodQuantityResponseDO> activePodQuantityDO = serverNodeMapper.countActivePod(Lists.newArrayList(namespaceId, ServerRegister.NAMESPACE_ID));
Map<Integer, Long> map = activePodQuantityDO.stream().collect(Collectors.toMap(ActivePodQuantityResponseDO::getNodeType, ActivePodQuantityResponseDO::getTotal));
Long clientTotal = map.getOrDefault(NodeTypeEnum.CLIENT.getType(), 0L);
Long serverTotal = map.getOrDefault(NodeTypeEnum.SERVER.getType(), 0L);

View File

@ -23,7 +23,7 @@
<div class="loading-wrp">
<span class="dot dot-spin"><i></i><i></i><i></i><i></i></span>
</div>
<div style="display: flex; justify-content: center; align-items: center;">一款为了提高分布式业务系统一致性的分布式任务重试和分布式任务调度平台</div>
<div style="display: flex; justify-content: center; align-items: center;">灵活,可靠和快速的分布式任务重试和分布式任务调度平台</div>
</div>
</div>
<!-- require cdn assets js -->

View File

@ -21,7 +21,7 @@ export default {
'layouts.usermenu.dialog.title': '信息',
'layouts.usermenu.dialog.content': '您确定要注销吗?',
'layouts.userLayout.title': '一款为了提高分布式业务系统一致性的分布式任务重试和分布式任务调度平台',
'layouts.userLayout.title': '灵活,可靠和快速的分布式任务重试和分布式任务调度平台',
...components,
...global,
...menu,