feat(1.4.0-beta1): 1.调试死信表迁移 2.优化重试并行扫描 3.单节点限流优化
This commit is contained in:
parent
fe9285ede2
commit
ed6e6c861f
@ -19,8 +19,6 @@ public class RetryDeadLetter extends CreateDt {
|
||||
|
||||
private String namespaceId;
|
||||
|
||||
private String uniqueId;
|
||||
|
||||
private String groupName;
|
||||
|
||||
private String sceneName;
|
||||
|
@ -3,13 +3,12 @@
|
||||
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.RetryDeadLetterMapper">
|
||||
|
||||
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
|
||||
INSERT INTO sj_retry_dead_letter (namespace_id, unique_id, group_name, scene_name, idempotent_id, biz_no,
|
||||
INSERT INTO sj_retry_dead_letter (namespace_id, group_name, scene_name, idempotent_id, biz_no,
|
||||
executor_name, args_str, ext_attrs, create_dt)
|
||||
VALUES
|
||||
<foreach collection="list" item="item" separator=",">
|
||||
(
|
||||
#{item.namespaceId,jdbcType=VARCHAR},
|
||||
#{item.uniqueId,jdbcType=VARCHAR},
|
||||
#{item.groupName,jdbcType=VARCHAR},
|
||||
#{item.sceneName,jdbcType=VARCHAR},
|
||||
#{item.idempotentId,jdbcType=VARCHAR},
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.aizuda.snailjob.server.common.config;
|
||||
|
||||
import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
@ -33,7 +34,7 @@ public class SystemProperties {
|
||||
/**
|
||||
* 重试任务拉取的并行度
|
||||
*/
|
||||
private int retryMaxPullParallel = 10;
|
||||
private int retryMaxPullParallel = 2;
|
||||
|
||||
/**
|
||||
* 任务调度每次拉取的条数
|
||||
@ -68,7 +69,7 @@ public class SystemProperties {
|
||||
/**
|
||||
* 单个节点支持的最大调度量
|
||||
*/
|
||||
private int maxDispatchCapacity = 5000;
|
||||
private int maxDispatchCapacity = 10000;
|
||||
|
||||
/**
|
||||
* 号段模式下步长配置 默认100
|
||||
|
@ -38,6 +38,7 @@ public interface RetryTaskConverter {
|
||||
|
||||
@Mappings({
|
||||
@Mapping(target = "id", ignore = true),
|
||||
@Mapping(target = "deleted", ignore = true),
|
||||
})
|
||||
Retry toRetryTask(Retry retry);
|
||||
|
||||
|
@ -6,6 +6,7 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.WaitStrategy;
|
||||
@ -36,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -75,20 +77,32 @@ public class ScanRetryActor extends AbstractActor {
|
||||
}
|
||||
|
||||
private void doScan(final ScanTask scanTask) {
|
||||
PartitionTaskUtils.process(startId ->
|
||||
listAvailableTasks(startId, scanTask.getBuckets()),
|
||||
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
|
||||
partitionTasks -> {
|
||||
if (CollUtil.isNotEmpty(partitionTasks) && !rateLimiterHandler.tryAcquire(partitionTasks.size())) {
|
||||
log.warn("当前节点触发限流");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}, 0);
|
||||
|
||||
PartitionTaskUtils.process(startId -> listAvailableTasks(startId, scanTask.getBuckets()),
|
||||
this::processRetryPartitionTasks, this::stopCondition, 0);
|
||||
}
|
||||
|
||||
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, final ScanTask scanTask) {
|
||||
/**
|
||||
* 拉取任务停止判断
|
||||
*
|
||||
* @param partitionTasks RetryPartitionTask
|
||||
* @return true-停止拉取 false-继续拉取
|
||||
*/
|
||||
private boolean stopCondition(List<? extends PartitionTask> partitionTasks) {
|
||||
if (CollectionUtils.isEmpty(partitionTasks)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean b = rateLimiterHandler.tryAcquire(partitionTasks.size());
|
||||
log.warn("获取令牌, b:[{}] size:[{}]", b, partitionTasks.size());
|
||||
if (!b) {
|
||||
log.warn("当前节点触发限流");
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks) {
|
||||
if (CollUtil.isEmpty(partitionTasks)) {
|
||||
return;
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.lang.Pair;
|
||||
import cn.hutool.core.util.HashUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
@ -49,12 +50,9 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
@Autowired
|
||||
protected AccessTemplate accessTemplate;
|
||||
@Autowired
|
||||
private List<IdGenerator> idGeneratorList;
|
||||
@Autowired
|
||||
private SystemProperties systemProperties;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void taskGenerator(TaskContext taskContext) {
|
||||
SnailJobLog.LOCAL.debug("received report data. {}", JsonUtil.toJsonString(taskContext));
|
||||
|
||||
@ -127,7 +125,13 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
retry.setGroupName(taskContext.getGroupName());
|
||||
retry.setSceneName(taskContext.getSceneName());
|
||||
retry.setRetryStatus(initStatus(taskContext));
|
||||
retry.setBizNo(Optional.ofNullable(retry.getBizNo()).orElse(StrUtil.EMPTY));
|
||||
if (StrUtil.isBlank(retry.getBizNo())) {
|
||||
// 默认生成一个业务单据号方便用户查询
|
||||
retry.setBizNo(IdUtil.fastSimpleUUID());
|
||||
} else {
|
||||
retry.setBizNo(retry.getBizNo());
|
||||
}
|
||||
|
||||
// 计算分桶逻辑
|
||||
retry.setBucketIndex(
|
||||
HashUtil.bkdrHash(taskContext.getGroupName() + taskContext.getSceneName() + taskInfo.getIdempotentId())
|
||||
@ -208,21 +212,4 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
return retrySceneConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取分布式id
|
||||
*
|
||||
* @param groupName 组id
|
||||
* @return 分布式id
|
||||
*/
|
||||
private String getIdGenerator(String groupName, String namespaceId) {
|
||||
|
||||
GroupConfig groupConfig = accessTemplate.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId);
|
||||
for (final IdGenerator idGenerator : idGeneratorList) {
|
||||
if (idGenerator.supports(groupConfig.getIdGeneratorMode())) {
|
||||
return idGenerator.idGenerator(groupName, namespaceId);
|
||||
}
|
||||
}
|
||||
|
||||
throw new SnailJobServerException("id generator mode not configured. [{}]", groupName);
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ public class RateLimiterHandler implements InitializingBean {
|
||||
private RateLimiter rateLimiter;
|
||||
|
||||
public boolean tryAcquire(int permits) {
|
||||
return rateLimiter.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS);
|
||||
return rateLimiter.tryAcquire(permits, 500L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
||||
@ -44,6 +44,7 @@ public class RateLimiterHandler implements InitializingBean {
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity(), 1, TimeUnit.SECONDS);
|
||||
rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -152,11 +152,12 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
|
||||
|
||||
// 删除重试任务
|
||||
List<RetryTask> retryTaskList = retryTaskMapper.selectList(new LambdaQueryWrapper<RetryTask>()
|
||||
.in(RetryTask::getId, totalWaitRetryIds));
|
||||
.in(RetryTask::getRetryId, totalWaitRetryIds));
|
||||
|
||||
// 删除重试日志信息
|
||||
List<RetryTaskLogMessage> retryTaskLogMessageList = retryTaskLogMessageMapper.selectList(
|
||||
new LambdaQueryWrapper<RetryTaskLogMessage>().in(RetryTaskLogMessage::getRetryId, retryIds));
|
||||
new LambdaQueryWrapper<RetryTaskLogMessage>()
|
||||
.in(RetryTaskLogMessage::getRetryId, totalWaitRetryIds));
|
||||
|
||||
List<Long> finalCbRetryIds = cbRetryIds;
|
||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
@ -184,8 +185,10 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
|
||||
});
|
||||
|
||||
// 重试最大次数迁移死信表
|
||||
List<RetryPartitionTask> maxCountRetries = retryPartitionTasks.stream().filter(retryPartitionTask ->
|
||||
RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus())).toList();
|
||||
List<RetryPartitionTask> maxCountRetries = retryPartitionTasks.stream()
|
||||
.filter(retryPartitionTask ->
|
||||
RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus()))
|
||||
.toList();
|
||||
moveDeadLetters(maxCountRetries);
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,7 @@ public abstract class AbstractTimerTask implements TimerTask<String> {
|
||||
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
|
||||
log.debug("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
|
||||
LocalDateTime.now(), groupName, retryId, retryTaskId, namespaceId);
|
||||
try {
|
||||
doRun(timeout);
|
||||
|
@ -16,10 +16,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 消费当前节点分配的bucket并生成扫描任务
|
||||
@ -68,12 +65,15 @@ public class ConsumerBucketActor extends AbstractActor {
|
||||
// 刷新最新的配置
|
||||
rateLimiterHandler.refreshRate();
|
||||
|
||||
ScanTask scanTask = new ScanTask();
|
||||
// 通过并行度配置计算拉取范围
|
||||
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());
|
||||
Set<Integer> totalBuckets = consumerBucket.getBuckets();
|
||||
int retryMaxPullParallel = systemProperties.getRetryMaxPullParallel();
|
||||
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(totalBuckets),
|
||||
(totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel);
|
||||
for (List<Integer> buckets : partitions) {
|
||||
ScanTask scanTask = new ScanTask();
|
||||
scanTask.setBuckets(new HashSet<>(buckets));
|
||||
ActorRef scanRetryActorRef = SyetemTaskTypeEnum.RETRY.getActorRef().get();
|
||||
ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor();
|
||||
scanRetryActorRef.tell(scanTask, scanRetryActorRef);
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,6 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
|
||||
.eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryDeadLetter::getSceneName, queryVO.getSceneName())
|
||||
.eq(StrUtil.isNotBlank(queryVO.getBizNo()), RetryDeadLetter::getBizNo, queryVO.getBizNo())
|
||||
.eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), RetryDeadLetter::getIdempotentId, queryVO.getIdempotentId())
|
||||
.eq(StrUtil.isNotBlank(queryVO.getUniqueId()), RetryDeadLetter::getUniqueId, queryVO.getUniqueId())
|
||||
.between(ObjUtil.isAllNotEmpty(queryVO.getStartDt(), queryVO.getEndDt()),
|
||||
RetryDeadLetter::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt())
|
||||
.orderByDesc(RetryDeadLetter::getId));
|
||||
@ -177,7 +176,7 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
|
||||
|
||||
List<RetryDeadLetter> tasks = retryDeadLetterAccess.list(
|
||||
new LambdaQueryWrapper<RetryDeadLetter>()
|
||||
.select(RetryDeadLetter::getUniqueId)
|
||||
.select(RetryDeadLetter::getId)
|
||||
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
|
||||
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
|
||||
.in(RetryDeadLetter::getId, deadLetterVO.getIds())
|
||||
|
Loading…
Reference in New Issue
Block a user