feat(1.4.0-beta1): 1.调试死信表迁移 2.优化重试并行扫描 3.单节点限流优化

This commit is contained in:
opensnail 2025-02-22 00:11:34 +08:00
parent 9a144c28a7
commit b0fa9e9e38
11 changed files with 58 additions and 55 deletions

View File

@ -19,8 +19,6 @@ public class RetryDeadLetter extends CreateDt {
private String namespaceId;
private String uniqueId;
private String groupName;
private String sceneName;

View File

@ -3,13 +3,12 @@
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.RetryDeadLetterMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_retry_dead_letter (namespace_id, unique_id, group_name, scene_name, idempotent_id, biz_no,
INSERT INTO sj_retry_dead_letter (namespace_id, group_name, scene_name, idempotent_id, biz_no,
executor_name, args_str, ext_attrs, create_dt)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId,jdbcType=VARCHAR},
#{item.uniqueId,jdbcType=VARCHAR},
#{item.groupName,jdbcType=VARCHAR},
#{item.sceneName,jdbcType=VARCHAR},
#{item.idempotentId,jdbcType=VARCHAR},

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.common.config;
import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -33,7 +34,7 @@ public class SystemProperties {
/**
* 重试任务拉取的并行度
*/
private int retryMaxPullParallel = 10;
private int retryMaxPullParallel = 2;
/**
* 任务调度每次拉取的条数
@ -68,7 +69,7 @@ public class SystemProperties {
/**
* 单个节点支持的最大调度量
*/
private int maxDispatchCapacity = 5000;
private int maxDispatchCapacity = 10000;
/**
* 号段模式下步长配置 默认100

View File

@ -38,6 +38,7 @@ public interface RetryTaskConverter {
@Mappings({
@Mapping(target = "id", ignore = true),
@Mapping(target = "deleted", ignore = true),
})
Retry toRetryTask(Retry retry);

View File

@ -6,6 +6,7 @@ import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
@ -36,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.TimeUnit;
@ -75,20 +77,32 @@ public class ScanRetryActor extends AbstractActor {
}
private void doScan(final ScanTask scanTask) {
PartitionTaskUtils.process(startId ->
listAvailableTasks(startId, scanTask.getBuckets()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
partitionTasks -> {
if (CollUtil.isNotEmpty(partitionTasks) && !rateLimiterHandler.tryAcquire(partitionTasks.size())) {
log.warn("当前节点触发限流");
return false;
}
return true;
}, 0);
PartitionTaskUtils.process(startId -> listAvailableTasks(startId, scanTask.getBuckets()),
this::processRetryPartitionTasks, this::stopCondition, 0);
}
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, final ScanTask scanTask) {
/**
* 拉取任务停止判断
*
* @param partitionTasks RetryPartitionTask
* @return true-停止拉取 false-继续拉取
*/
private boolean stopCondition(List<? extends PartitionTask> partitionTasks) {
if (CollectionUtils.isEmpty(partitionTasks)) {
return true;
}
boolean b = rateLimiterHandler.tryAcquire(partitionTasks.size());
log.warn("获取令牌, b:[{}] size:[{}]", b, partitionTasks.size());
if (!b) {
log.warn("当前节点触发限流");
return true;
}
return false;
}
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks) {
if (CollUtil.isEmpty(partitionTasks)) {
return;
}

View File

@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
@ -49,12 +50,9 @@ public abstract class AbstractGenerator implements TaskGenerator {
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
private List<IdGenerator> idGeneratorList;
@Autowired
private SystemProperties systemProperties;
@Override
@Transactional
public void taskGenerator(TaskContext taskContext) {
SnailJobLog.LOCAL.debug("received report data. {}", JsonUtil.toJsonString(taskContext));
@ -127,7 +125,13 @@ public abstract class AbstractGenerator implements TaskGenerator {
retry.setGroupName(taskContext.getGroupName());
retry.setSceneName(taskContext.getSceneName());
retry.setRetryStatus(initStatus(taskContext));
retry.setBizNo(Optional.ofNullable(retry.getBizNo()).orElse(StrUtil.EMPTY));
if (StrUtil.isBlank(retry.getBizNo())) {
// 默认生成一个业务单据号方便用户查询
retry.setBizNo(IdUtil.fastSimpleUUID());
} else {
retry.setBizNo(retry.getBizNo());
}
// 计算分桶逻辑
retry.setBucketIndex(
HashUtil.bkdrHash(taskContext.getGroupName() + taskContext.getSceneName() + taskInfo.getIdempotentId())
@ -208,21 +212,4 @@ public abstract class AbstractGenerator implements TaskGenerator {
return retrySceneConfig;
}
/**
* 获取分布式id
*
* @param groupName 组id
* @return 分布式id
*/
private String getIdGenerator(String groupName, String namespaceId) {
GroupConfig groupConfig = accessTemplate.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId);
for (final IdGenerator idGenerator : idGeneratorList) {
if (idGenerator.supports(groupConfig.getIdGeneratorMode())) {
return idGenerator.idGenerator(groupName, namespaceId);
}
}
throw new SnailJobServerException("id generator mode not configured. [{}]", groupName);
}
}

View File

@ -23,7 +23,7 @@ public class RateLimiterHandler implements InitializingBean {
private RateLimiter rateLimiter;
public boolean tryAcquire(int permits) {
return rateLimiter.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
return rateLimiter.tryAcquire(permits, 500L, TimeUnit.MILLISECONDS);
}
@ -44,6 +44,7 @@ public class RateLimiterHandler implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity(), 1, TimeUnit.SECONDS);
rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity());
}
}

View File

@ -152,11 +152,12 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
// 删除重试任务
List<RetryTask> retryTaskList = retryTaskMapper.selectList(new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getId, totalWaitRetryIds));
.in(RetryTask::getRetryId, totalWaitRetryIds));
// 删除重试日志信息
List<RetryTaskLogMessage> retryTaskLogMessageList = retryTaskLogMessageMapper.selectList(
new LambdaQueryWrapper<RetryTaskLogMessage>().in(RetryTaskLogMessage::getRetryId, retryIds));
new LambdaQueryWrapper<RetryTaskLogMessage>()
.in(RetryTaskLogMessage::getRetryId, totalWaitRetryIds));
List<Long> finalCbRetryIds = cbRetryIds;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@ -184,8 +185,10 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
});
// 重试最大次数迁移死信表
List<RetryPartitionTask> maxCountRetries = retryPartitionTasks.stream().filter(retryPartitionTask ->
RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus())).toList();
List<RetryPartitionTask> maxCountRetries = retryPartitionTasks.stream()
.filter(retryPartitionTask ->
RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus()))
.toList();
moveDeadLetters(maxCountRetries);
}

View File

@ -21,7 +21,7 @@ public abstract class AbstractTimerTask implements TimerTask<String> {
@Override
public void run(Timeout timeout) throws Exception {
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
log.debug("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
LocalDateTime.now(), groupName, retryId, retryTaskId, namespaceId);
try {
doRun(timeout);

View File

@ -16,10 +16,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.*;
/**
* 消费当前节点分配的bucket并生成扫描任务
@ -68,12 +65,15 @@ public class ConsumerBucketActor extends AbstractActor {
// 刷新最新的配置
rateLimiterHandler.refreshRate();
ScanTask scanTask = new ScanTask();
// 通过并行度配置计算拉取范围
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());
Set<Integer> totalBuckets = consumerBucket.getBuckets();
int retryMaxPullParallel = systemProperties.getRetryMaxPullParallel();
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(totalBuckets),
(totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel);
for (List<Integer> buckets : partitions) {
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(new HashSet<>(buckets));
ActorRef scanRetryActorRef = SyetemTaskTypeEnum.RETRY.getActorRef().get();
ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor();
scanRetryActorRef.tell(scanTask, scanRetryActorRef);
}
}

View File

@ -74,7 +74,6 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
.eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryDeadLetter::getSceneName, queryVO.getSceneName())
.eq(StrUtil.isNotBlank(queryVO.getBizNo()), RetryDeadLetter::getBizNo, queryVO.getBizNo())
.eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), RetryDeadLetter::getIdempotentId, queryVO.getIdempotentId())
.eq(StrUtil.isNotBlank(queryVO.getUniqueId()), RetryDeadLetter::getUniqueId, queryVO.getUniqueId())
.between(ObjUtil.isAllNotEmpty(queryVO.getStartDt(), queryVO.getEndDt()),
RetryDeadLetter::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt())
.orderByDesc(RetryDeadLetter::getId));
@ -177,7 +176,7 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
List<RetryDeadLetter> tasks = retryDeadLetterAccess.list(
new LambdaQueryWrapper<RetryDeadLetter>()
.select(RetryDeadLetter::getUniqueId)
.select(RetryDeadLetter::getId)
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds())