diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryDeadLetter.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryDeadLetter.java index a85d52466..e62f89c72 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryDeadLetter.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetryDeadLetter.java @@ -19,8 +19,6 @@ public class RetryDeadLetter extends CreateDt { private String namespaceId; - private String uniqueId; - private String groupName; private String sceneName; diff --git a/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetryDeadLetterMapper.xml b/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetryDeadLetterMapper.xml index a15916ef4..dadc23ddd 100644 --- a/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetryDeadLetterMapper.xml +++ b/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetryDeadLetterMapper.xml @@ -3,13 +3,12 @@ - INSERT INTO sj_retry_dead_letter (namespace_id, unique_id, group_name, scene_name, idempotent_id, biz_no, + INSERT INTO sj_retry_dead_letter (namespace_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt) VALUES ( #{item.namespaceId,jdbcType=VARCHAR}, - #{item.uniqueId,jdbcType=VARCHAR}, #{item.groupName,jdbcType=VARCHAR}, #{item.sceneName,jdbcType=VARCHAR}, #{item.idempotentId,jdbcType=VARCHAR}, diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java index c62abaddb..27c78adc5 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.common.config; import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties; +import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.RpcTypeEnum; import lombok.AllArgsConstructor; import lombok.Data; @@ -33,7 +34,7 @@ public class SystemProperties { /** * 重试任务拉取的并行度 */ - private int retryMaxPullParallel = 10; + private int retryMaxPullParallel = 2; /** * 任务调度每次拉取的条数 @@ -68,7 +69,7 @@ public class SystemProperties { /** * 单个节点支持的最大调度量 */ - private int maxDispatchCapacity = 5000; + private int maxDispatchCapacity = 10000; /** * 号段模式下步长配置 默认100 diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java index a4dbf306d..36b22702b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java @@ -38,6 +38,7 @@ public interface RetryTaskConverter { @Mappings({ @Mapping(target = "id", ignore = true), + @Mapping(target = "deleted", ignore = true), }) Retry toRetryTask(Retry retry); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 02d0dbcaf..bf03ceeec 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -6,6 +6,7 @@ import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.WaitStrategy; @@ -36,6 +37,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.*; import java.util.concurrent.TimeUnit; @@ -75,20 +77,32 @@ public class ScanRetryActor extends AbstractActor { } private void doScan(final ScanTask scanTask) { - PartitionTaskUtils.process(startId -> - listAvailableTasks(startId, scanTask.getBuckets()), - partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), - partitionTasks -> { - if (CollUtil.isNotEmpty(partitionTasks) && !rateLimiterHandler.tryAcquire(partitionTasks.size())) { - log.warn("当前节点触发限流"); - return false; - } - return true; - }, 0); - + PartitionTaskUtils.process(startId -> listAvailableTasks(startId, scanTask.getBuckets()), + this::processRetryPartitionTasks, this::stopCondition, 0); } - private void processRetryPartitionTasks(List partitionTasks, final ScanTask scanTask) { + /** + * 拉取任务停止判断 + * + * @param partitionTasks RetryPartitionTask + * @return true-停止拉取 false-继续拉取 + */ + private boolean stopCondition(List partitionTasks) { + if (CollectionUtils.isEmpty(partitionTasks)) { + return true; + } + + boolean b = rateLimiterHandler.tryAcquire(partitionTasks.size()); + log.warn("获取令牌, b:[{}] size:[{}]", b, partitionTasks.size()); + if (!b) { + log.warn("当前节点触发限流"); + return true; + } + + return false; + } + + private void processRetryPartitionTasks(List partitionTasks) { if (CollUtil.isEmpty(partitionTasks)) { return; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java index 5f9140126..4b27c45ec 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/generator/retry/AbstractGenerator.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Pair; import cn.hutool.core.util.HashUtil; +import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; @@ -49,12 +50,9 @@ public abstract class AbstractGenerator implements TaskGenerator { @Autowired protected AccessTemplate accessTemplate; @Autowired - private List idGeneratorList; - @Autowired private SystemProperties systemProperties; @Override - @Transactional public void taskGenerator(TaskContext taskContext) { SnailJobLog.LOCAL.debug("received report data. {}", JsonUtil.toJsonString(taskContext)); @@ -127,7 +125,13 @@ public abstract class AbstractGenerator implements TaskGenerator { retry.setGroupName(taskContext.getGroupName()); retry.setSceneName(taskContext.getSceneName()); retry.setRetryStatus(initStatus(taskContext)); - retry.setBizNo(Optional.ofNullable(retry.getBizNo()).orElse(StrUtil.EMPTY)); + if (StrUtil.isBlank(retry.getBizNo())) { + // 默认生成一个业务单据号方便用户查询 + retry.setBizNo(IdUtil.fastSimpleUUID()); + } else { + retry.setBizNo(retry.getBizNo()); + } + // 计算分桶逻辑 retry.setBucketIndex( HashUtil.bkdrHash(taskContext.getGroupName() + taskContext.getSceneName() + taskInfo.getIdempotentId()) @@ -208,21 +212,4 @@ public abstract class AbstractGenerator implements TaskGenerator { return retrySceneConfig; } - /** - * 获取分布式id - * - * @param groupName 组id - * @return 分布式id - */ - private String getIdGenerator(String groupName, String namespaceId) { - - GroupConfig groupConfig = accessTemplate.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId); - for (final IdGenerator idGenerator : idGeneratorList) { - if (idGenerator.supports(groupConfig.getIdGeneratorMode())) { - return idGenerator.idGenerator(groupName, namespaceId); - } - } - - throw new SnailJobServerException("id generator mode not configured. [{}]", groupName); - } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RateLimiterHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RateLimiterHandler.java index 8f7a98fbb..a64c83061 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RateLimiterHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/RateLimiterHandler.java @@ -23,7 +23,7 @@ public class RateLimiterHandler implements InitializingBean { private RateLimiter rateLimiter; public boolean tryAcquire(int permits) { - return rateLimiter.tryAcquire(permits, 0L, TimeUnit.MICROSECONDS); + return rateLimiter.tryAcquire(permits, 500L, TimeUnit.MILLISECONDS); } @@ -44,6 +44,7 @@ public class RateLimiterHandler implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { - rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity(), 1, TimeUnit.SECONDS); + rateLimiter = RateLimiter.create(systemProperties.getMaxDispatchCapacity()); } + } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java index 3150213a9..13899ee7c 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java @@ -152,11 +152,12 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle { // 删除重试任务 List retryTaskList = retryTaskMapper.selectList(new LambdaQueryWrapper() - .in(RetryTask::getId, totalWaitRetryIds)); + .in(RetryTask::getRetryId, totalWaitRetryIds)); // 删除重试日志信息 List retryTaskLogMessageList = retryTaskLogMessageMapper.selectList( - new LambdaQueryWrapper().in(RetryTaskLogMessage::getRetryId, retryIds)); + new LambdaQueryWrapper() + .in(RetryTaskLogMessage::getRetryId, totalWaitRetryIds)); List finalCbRetryIds = cbRetryIds; transactionTemplate.execute(new TransactionCallbackWithoutResult() { @@ -184,8 +185,10 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle { }); // 重试最大次数迁移死信表 - List maxCountRetries = retryPartitionTasks.stream().filter(retryPartitionTask -> - RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus())).toList(); + List maxCountRetries = retryPartitionTasks.stream() + .filter(retryPartitionTask -> + RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus())) + .toList(); moveDeadLetters(maxCountRetries); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java index 42d313940..7fba1c560 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/timer/AbstractTimerTask.java @@ -21,7 +21,7 @@ public abstract class AbstractTimerTask implements TimerTask { @Override public void run(Timeout timeout) throws Exception { - log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]", + log.debug("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName, retryId, retryTaskId, namespaceId); try { doRun(timeout); diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java index 1b8a88a7d..f2cfd61fc 100644 --- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java @@ -16,10 +16,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; +import java.util.*; /** * 消费当前节点分配的bucket并生成扫描任务 @@ -68,12 +65,15 @@ public class ConsumerBucketActor extends AbstractActor { // 刷新最新的配置 rateLimiterHandler.refreshRate(); - ScanTask scanTask = new ScanTask(); // 通过并行度配置计算拉取范围 - List> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel()); + Set totalBuckets = consumerBucket.getBuckets(); + int retryMaxPullParallel = systemProperties.getRetryMaxPullParallel(); + List> partitions = Lists.partition(new ArrayList<>(totalBuckets), + (totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel); for (List buckets : partitions) { + ScanTask scanTask = new ScanTask(); scanTask.setBuckets(new HashSet<>(buckets)); - ActorRef scanRetryActorRef = SyetemTaskTypeEnum.RETRY.getActorRef().get(); + ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor(); scanRetryActorRef.tell(scanTask, scanRetryActorRef); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java index a99b43095..d130948f1 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java @@ -74,7 +74,6 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { .eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryDeadLetter::getSceneName, queryVO.getSceneName()) .eq(StrUtil.isNotBlank(queryVO.getBizNo()), RetryDeadLetter::getBizNo, queryVO.getBizNo()) .eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), RetryDeadLetter::getIdempotentId, queryVO.getIdempotentId()) - .eq(StrUtil.isNotBlank(queryVO.getUniqueId()), RetryDeadLetter::getUniqueId, queryVO.getUniqueId()) .between(ObjUtil.isAllNotEmpty(queryVO.getStartDt(), queryVO.getEndDt()), RetryDeadLetter::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt()) .orderByDesc(RetryDeadLetter::getId)); @@ -177,7 +176,7 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { List tasks = retryDeadLetterAccess.list( new LambdaQueryWrapper() - .select(RetryDeadLetter::getUniqueId) + .select(RetryDeadLetter::getId) .eq(RetryDeadLetter::getNamespaceId, namespaceId) .eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName()) .in(RetryDeadLetter::getId, deadLetterVO.getIds())