From 30bc414248867677a07a5408a8ab6a3a2961b250 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Thu, 20 Feb 2025 22:00:23 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.4.0-beta1):=201.=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=8D=95=E4=B8=AA=E8=8A=82=E7=82=B9=E6=94=AF=E6=8C=81=E7=9A=84?= =?UTF-8?q?=E6=9C=80=E5=A4=A7=E8=B0=83=E5=BA=A6=E9=87=8F=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=202.=20=E4=BC=98=E5=8C=96=E9=87=8D=E8=AF=95=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=92=8C=E6=9C=80=E5=A4=A7=E6=AC=A1=E6=95=B0=E5=90=88=E5=B9=B6?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/task/AbstractTaskAccess.java | 1 - .../datasource/utils/RequestDataHelper.java | 116 --------- .../common/config/SystemProperties.java | 17 +- .../retry/task/dto/RetryPartitionTask.java | 16 ++ .../service/RetryDeadLetterConverter.java | 5 +- .../task/service/impl/RetryServiceImpl.java | 160 ------------ .../task/support/dispatch/ScanRetryActor.java | 16 +- .../support/schedule/CleanerSchedule.java | 229 ++++++++++++++++++ .../support/schedule/ClearLogSchedule.java | 147 ----------- .../support/schedule/RetryTaskSchedule.java | 82 ------- .../starter/dispatch/ConsumerBucketActor.java | 17 +- .../server/mapper/RetryMapperTest.java | 26 -- 12 files changed, 285 insertions(+), 547 deletions(-) delete mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/utils/RequestDataHelper.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/impl/RetryServiceImpl.java create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/ClearLogSchedule.java delete mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskSchedule.java delete mode 100644 snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/mapper/RetryMapperTest.java diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/task/AbstractTaskAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/task/AbstractTaskAccess.java index 29e0017e..88ee57d0 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/task/AbstractTaskAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/task/AbstractTaskAccess.java @@ -3,7 +3,6 @@ package com.aizuda.snailjob.template.datasource.access.task; import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum; import com.aizuda.snailjob.template.datasource.utils.DbUtils; -import com.aizuda.snailjob.template.datasource.utils.RequestDataHelper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/utils/RequestDataHelper.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/utils/RequestDataHelper.java deleted file mode 100644 index b352683b..00000000 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/utils/RequestDataHelper.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.aizuda.snailjob.template.datasource.utils; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.common.core.context.SnailSpringContext; -import com.aizuda.snailjob.template.datasource.exception.SnailJobDatasourceException; -import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -/** - * 组分区处理类 - * - * @author: opensnail - * @date : 2022-02-28 17:18 - * @since 1.0.0 - */ -public class RequestDataHelper { - - /** - * 请求参数存取 - */ - private static final ThreadLocal> REQUEST_DATA = new ThreadLocal<>(); - private static final String PARTITION = "group-partition"; - - /** - * 设置请求参数 - * - * @param requestData 请求参数 MAP 对象 - */ - public static void setRequestData(Map requestData) { - REQUEST_DATA.set(requestData); - } - - /** - * 设置分区 - * - * @param partition - */ - public static void setPartition(int partition) { - - Map map = new HashMap<>(); - map.put(PARTITION, partition); - RequestDataHelper.setRequestData(map); - - } - - - /** - * 设置分区 - * - * @param groupName 组名称 - */ - public static void setPartition(String groupName, String namespaceId) { - - if (StrUtil.isBlank(groupName) && StrUtil.isNotBlank(namespaceId)) { - throw new SnailJobDatasourceException("组名称或者命名空间ID不能为空"); - } - - GroupConfigMapper groupConfigMapper = SnailSpringContext.getBeanByType(GroupConfigMapper.class); - - GroupConfig groupConfig = groupConfigMapper.selectOne( - new LambdaQueryWrapper() - .select(GroupConfig::getGroupPartition) - .eq(GroupConfig::getNamespaceId, namespaceId) - .eq(GroupConfig::getGroupName, groupName)); - if (Objects.isNull(groupConfig)) { - throw new SnailJobDatasourceException("groupName:[{}]不存在", groupName); - } - - setPartition(groupConfig.getGroupPartition()); - } - - /** - * 获取请求参数 - * - * @param param 请求参数 - * @return 请求参数 MAP 对象 - */ - public static T getRequestData(String param) { - Map dataMap = getRequestData(); - if (CollectionUtils.isNotEmpty(dataMap)) { - return (T) dataMap.get(param); - } - return null; - } - - /** - * 获取请求参数 - * - * @return 请求参数 MAP 对象 - */ - public static Map getRequestData() { - return REQUEST_DATA.get(); - } - - public static Integer getPartition() { - Map requestData = getRequestData(); - if (CollUtil.isEmpty(requestData)) { - return null; - } - - return (Integer) requestData.get(PARTITION); - } - - - public static void remove() { - REQUEST_DATA.remove(); - } - -} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java index a141aa78..c62abadd 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java @@ -30,16 +30,16 @@ public class SystemProperties { */ private int retryPullPageSize = 1000; + /** + * 重试任务拉取的并行度 + */ + private int retryMaxPullParallel = 10; + /** * 任务调度每次拉取的条数 */ private int jobPullPageSize = 1000; - /** - * 重试每次拉取的次数 - */ - private int retryMaxPullCount = 10; - /** * netty 端口 * see: serverPort @@ -64,10 +64,11 @@ public class SystemProperties { * server token */ private String serverToken = "SJ_H9HGGmrX3QBVTfsAAG2mcKH3SR7bCLsK"; + /** - * 一个客户端每秒最多接收的重试数量指令 + * 单个节点支持的最大调度量 */ - private int limiter = 100; + private int maxDispatchCapacity = 5000; /** * 号段模式下步长配置 默认100 @@ -77,7 +78,7 @@ public class SystemProperties { /** * 日志默认保存天数 */ - private int logStorage = 90; + private int logStorage = 7; /** * 合并日志默认保存天数 diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java index 756736b3..9e414e3d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryPartitionTask.java @@ -30,4 +30,20 @@ public class RetryPartitionTask extends PartitionTask { private Integer retryCount; + private String idempotentId; + + private String bizNo; + + private String argsStr; + + private String extAttrs; + + private String executorName; + + private Integer retryStatus; + + private Long parentId; + + private Integer bucketIndex; + } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/RetryDeadLetterConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/RetryDeadLetterConverter.java index a43da573..8c393590 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/RetryDeadLetterConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/RetryDeadLetterConverter.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.retry.task.service; +import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import org.mapstruct.*; @@ -22,9 +23,9 @@ public interface RetryDeadLetterConverter { @Mapping(target = "id", ignore = true), @Mapping(target = "createDt", ignore = true) }) - RetryDeadLetter toRetryDeadLetter(Retry retryTasks); + RetryDeadLetter toRetryDeadLetter(RetryPartitionTask retryTasks); @IterableMapping(qualifiedByName = "ignoreId") - List toRetryDeadLetter(List retries); + List toRetryDeadLetter(List retries); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/impl/RetryServiceImpl.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/impl/RetryServiceImpl.java deleted file mode 100644 index ac6d26ff..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/service/impl/RetryServiceImpl.java +++ /dev/null @@ -1,160 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.service.impl; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter; -import com.aizuda.snailjob.server.retry.task.service.RetryService; -import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; -import com.aizuda.snailjob.template.datasource.access.AccessTemplate; -import com.aizuda.snailjob.template.datasource.access.TaskAccess; -import com.aizuda.snailjob.template.datasource.persistence.po.Retry; -import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; -import com.aizuda.snailjob.template.datasource.utils.RequestDataHelper; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * 重试服务层实现 - * todo 重新设计 - * @author: opensnail - * @date : 2021-11-26 15:19 - */ -@Service -@Slf4j -public class RetryServiceImpl implements RetryService { - - @Autowired - private AccessTemplate accessTemplate; - - @Autowired - private ApplicationContext context; - - @Transactional - @Override - public Boolean moveDeadLetterAndDelFinish(String groupName, String namespaceId) { - - TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); - RequestDataHelper.setPartition(groupName, namespaceId); - List callbackRetries = retryTaskAccess.listPage(new PageDTO<>(0, 100), - new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .in(Retry::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(), - RetryStatusEnum.FINISH.getStatus()) - .eq(Retry::getTaskType, SyetemTaskTypeEnum.CALLBACK.getType()) - .eq(Retry::getGroupName, groupName) - .orderByDesc(Retry::getId)).getRecords(); - - if (CollUtil.isEmpty(callbackRetries)) { - return Boolean.TRUE; - } - - Set uniqueIdSet = StreamUtils.toSet(callbackRetries, Retry::getId); - - List retries = accessTemplate.getRetryAccess() - .list(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType()) -// .in(Retry::getUniqueId, uniqueIdSet) - ); - - // 迁移重试失败的数据 - List waitMoveDeadLetters = new ArrayList<>(); - List maxCountRetryList = retries.stream() - .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect( - Collectors.toList()); - if (CollUtil.isNotEmpty(maxCountRetryList)) { - waitMoveDeadLetters.addAll(maxCountRetryList); - } - - List maxCountCallbackRetryList = callbackRetries.stream() - .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect( - Collectors.toList()); - - if (CollUtil.isNotEmpty(maxCountRetryList)) { - waitMoveDeadLetters.addAll(maxCountCallbackRetryList); - } - - moveDeadLetters(groupName, namespaceId, waitMoveDeadLetters); - - // 删除重试完成的数据 - Set waitDelRetryFinishSet = new HashSet<>(); - Set finishRetryIdList = retries.stream() - .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus())) - .map(Retry::getId) - .collect(Collectors.toSet()); - if (CollUtil.isNotEmpty(finishRetryIdList)) { - waitDelRetryFinishSet.addAll(finishRetryIdList); - } - - Set finishCallbackRetryIdList = callbackRetries.stream() - .filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus())) - .map(Retry::getId) - .collect(Collectors.toSet()); - - // 迁移重试失败的数据 - if (CollUtil.isNotEmpty(finishCallbackRetryIdList)) { - waitDelRetryFinishSet.addAll(finishCallbackRetryIdList); - } - - if (CollUtil.isEmpty(waitDelRetryFinishSet)) { - return Boolean.TRUE; - } - - Assert.isTrue(waitDelRetryFinishSet.size() == accessTemplate.getRetryAccess() - .delete(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getGroupName, groupName) - .in(Retry::getId, waitDelRetryFinishSet)), - () -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries))); - return Boolean.TRUE; - } - - /** - * 迁移死信队列数据 - * - * @param groupName 组id - * @param retries 待迁移数据 - */ - private void moveDeadLetters(String groupName, String namespaceId, List retries) { - if (CollUtil.isEmpty(retries)) { - return; - } - - List retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retries); - LocalDateTime now = LocalDateTime.now(); - for (RetryDeadLetter retryDeadLetter : retryDeadLetters) { - retryDeadLetter.setCreateDt(now); - } - Assert.isTrue(retryDeadLetters.size() == accessTemplate - .getRetryDeadLetterAccess().insertBatch(retryDeadLetters), - () -> new SnailJobServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters))); - - TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); - Assert.isTrue(retries.size() == retryTaskAccess.delete(new LambdaQueryWrapper() - .eq(Retry::getNamespaceId, namespaceId) - .eq(Retry::getGroupName, groupName) - .in(Retry::getId, StreamUtils.toList(retries, Retry::getId))), - () -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries))); - - context.publishEvent(new RetryTaskFailDeadLetterAlarmEvent(retryDeadLetters)); - } - -} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index ff8ee4b6..36030273 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -25,6 +25,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.util.concurrent.RateLimiter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -32,6 +33,7 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.*; +import java.util.concurrent.TimeUnit; /** *

@@ -49,10 +51,16 @@ public class ScanRetryActor extends AbstractActor { private final SystemProperties systemProperties; private final AccessTemplate accessTemplate; private final RetryMapper retryMapper; + private final static RateLimiter rateLimiter = RateLimiter.create(1000, 1, TimeUnit.SECONDS); @Override public Receive createReceive() { return receiveBuilder().match(ScanTask.class, config -> { + // 覆盖每秒产生多少个令牌的值 + double permitsPerSecond = systemProperties.getMaxDispatchCapacity(); + if (permitsPerSecond >= 1 && permitsPerSecond != rateLimiter.getRate()) { + rateLimiter.setRate(permitsPerSecond); + } try { doScan(config); @@ -68,7 +76,13 @@ public class ScanRetryActor extends AbstractActor { PartitionTaskUtils.process(startId -> listAvailableTasks(startId, scanTask.getBuckets()), partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), - 0); + partitionTasks -> { + if (!rateLimiter.tryAcquire(partitionTasks.size())) { + log.warn("当前节点触发限流 [{}]", rateLimiter.getRate()); + return false; + } + return true; + }, 0); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java new file mode 100644 index 00000000..3150213a --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java @@ -0,0 +1,229 @@ +package com.aizuda.snailjob.server.retry.task.support.schedule; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.common.config.SystemProperties; +import com.aizuda.snailjob.server.common.dto.PartitionTask; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; +import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; +import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; +import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.access.TaskAccess; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Retry; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.util.CollectionUtils; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; + +/** + * Retry清理线程 + * 1. 删除日志信息 + * 2. 删除重试已完成的数据 + * 3. 删除回调任务数据 + * 4. 删除调度日志 sj_retry_task + * 5. 迁移到达最大重试的数据 + * + * @author: opensnail + * @date : 2023-07-21 13:32 + * @since 2.1.0 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class CleanerSchedule extends AbstractSchedule implements Lifecycle { + private final RetryMapper retryMapper; + private final RetryTaskMapper retryTaskMapper; + private final SystemProperties systemProperties; + private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; + private final TransactionTemplate transactionTemplate; + private final AccessTemplate accessTemplate; + + @Override + public String lockName() { + return "clearLog"; + } + + @Override + public String lockAtMost() { + return "PT4H"; + } + + @Override + public String lockAtLeast() { + return "PT1M"; + } + + @Override + protected void doExecute() { + try { + // 清除日志默认保存天数大于零、最少保留最近一天的日志数据 + if (systemProperties.getLogStorage() <= 1) { + SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage()); + return; + } + + // clean retry log + LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); + long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime), + this::processRetryLogPartitionTasks, 0); + + SnailJobLog.LOCAL.debug("Retry clear success total:[{}]", total); + } catch (Exception e) { + SnailJobLog.LOCAL.error("clear log error", e); + } + } + + /** + * RetryLog List + * + * @param startId + * @param endTime + * @return + */ + private List retryTaskBatchList(Long startId, LocalDateTime endTime) { + + List retryTaskList = retryMapper.selectPage( + new Page<>(0, 500), + new LambdaUpdateWrapper() + .ge(Retry::getId, startId) + .le(Retry::getCreateDt, endTime) + .eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType()) + .ne(Retry::getDeleted, StatusEnum.NO.getStatus()) + .orderByAsc(Retry::getId)) + .getRecords(); + return RetryTaskConverter.INSTANCE.toRetryTaskLogPartitionTasks(retryTaskList); + } + + /** + * clean table RetryTaskLog & RetryTaskLogMessage + * + * @param partitionTasks + */ + public void processRetryLogPartitionTasks(List partitionTasks) { + + List retryIds = StreamUtils.toList(partitionTasks, PartitionTask::getId); + if (CollectionUtils.isEmpty(retryIds)) { + return; + } + + // 查询回调数据 + List cbRetries = retryMapper.selectList(new LambdaQueryWrapper() + .select(Retry::getId).in(Retry::getParentId, retryIds)); + + List totalWaitRetryIds = Lists.newArrayList(retryIds); + List cbRetryIds = Lists.newArrayList(); + if (!CollectionUtils.isEmpty(cbRetries)) { + cbRetryIds = StreamUtils.toList(cbRetries, Retry::getId); + totalWaitRetryIds.addAll(cbRetryIds); + } + + List retryPartitionTasks = (List) partitionTasks; + + List finishRetryIds = retryPartitionTasks.stream().filter(retryPartitionTask -> + RetryStatusEnum.FINISH.getStatus().equals(retryPartitionTask.getRetryStatus())) + .map(PartitionTask::getId).toList(); + + // 删除重试任务 + List retryTaskList = retryTaskMapper.selectList(new LambdaQueryWrapper() + .in(RetryTask::getId, totalWaitRetryIds)); + + // 删除重试日志信息 + List retryTaskLogMessageList = retryTaskLogMessageMapper.selectList( + new LambdaQueryWrapper().in(RetryTaskLogMessage::getRetryId, retryIds)); + + List finalCbRetryIds = cbRetryIds; + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + + // 删除回调数据 + retryMapper.deleteByIds(finalCbRetryIds); + + // 删除重试完成的数据 + retryMapper.deleteByIds(finishRetryIds); + + // 删除重试任务 + if (!CollectionUtils.isEmpty(retryTaskList)) { + List retryTaskIds = StreamUtils.toList(retryTaskList, RetryTask::getId); + Lists.partition(retryTaskIds, 500).forEach(retryTaskMapper::deleteByIds); + } + + if (!CollectionUtils.isEmpty(retryTaskLogMessageList)) { + List retryTaskLogMessageIds = StreamUtils.toList(retryTaskLogMessageList, RetryTaskLogMessage::getId); + Lists.partition(retryTaskLogMessageIds, 500).forEach(retryTaskLogMessageMapper::deleteByIds); + } + + } + }); + + // 重试最大次数迁移死信表 + List maxCountRetries = retryPartitionTasks.stream().filter(retryPartitionTask -> + RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus())).toList(); + moveDeadLetters(maxCountRetries); + } + + /** + * 迁移死信队列数据 + * + * @param retries 待迁移数据 + */ + private void moveDeadLetters(List retries) { + if (CollUtil.isEmpty(retries)) { + return; + } + + List retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retries); + LocalDateTime now = LocalDateTime.now(); + for (RetryDeadLetter retryDeadLetter : retryDeadLetters) { + retryDeadLetter.setCreateDt(now); + } + + Assert.isTrue(retryDeadLetters.size() == accessTemplate + .getRetryDeadLetterAccess().insertBatch(retryDeadLetters), + () -> new SnailJobServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters))); + + TaskAccess retryTaskAccess = accessTemplate.getRetryAccess(); + Assert.isTrue(retries.size() == retryTaskAccess.delete(new LambdaQueryWrapper() + .in(Retry::getId, StreamUtils.toList(retries, RetryPartitionTask::getId))), + () -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries))); + + } + + + @Override + public void start() { + taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT4H")); + } + + @Override + public void close() { + + } +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/ClearLogSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/ClearLogSchedule.java deleted file mode 100644 index f1444757..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/ClearLogSchedule.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.schedule; - -import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.Lifecycle; -import com.aizuda.snailjob.server.common.config.SystemProperties; -import com.aizuda.snailjob.server.common.dto.PartitionTask; -import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; -import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; -import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.Retry; -import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.List; - -/** - * Retry清理日志 一小时运行一次 - * - * @author: opensnail - * @date : 2023-07-21 13:32 - * @since 2.1.0 - */ -@Component -@Slf4j -public class ClearLogSchedule extends AbstractSchedule implements Lifecycle { - - @Autowired - private RetryMapper retryMapper; - @Autowired - private SystemProperties systemProperties; - @Autowired - private RetryTaskLogMessageMapper retryTaskLogMessageMapper; - @Autowired - private TransactionTemplate transactionTemplate; - - @Override - public String lockName() { - return "clearLog"; - } - - @Override - public String lockAtMost() { - return "PT4H"; - } - - @Override - public String lockAtLeast() { - return "PT1M"; - } - - @Override - protected void doExecute() { - try { - // 清除日志默认保存天数大于零、最少保留最近一天的日志数据 - if (systemProperties.getLogStorage() <= 1) { - SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage()); - return; - } - // clean retry log - LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); - long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime), - this::processRetryLogPartitionTasks, 0); - - SnailJobLog.LOCAL.debug("Retry clear success total:[{}]", total); - } catch (Exception e) { - SnailJobLog.LOCAL.error("clear log error", e); - } - } - - /** - * RetryLog List - * - * @param startId - * @param endTime - * @return - */ - private List retryTaskBatchList(Long startId, LocalDateTime endTime) { - - List retryTaskList = retryMapper.selectPage( - new Page<>(0, 1000), - new LambdaUpdateWrapper() - .ge(Retry::getId, startId) - .le(Retry::getCreateDt, endTime) - .orderByAsc(Retry::getId)) - .getRecords(); - return RetryTaskConverter.INSTANCE.toRetryTaskLogPartitionTasks(retryTaskList); - } - - /** - * clean table RetryTaskLog & RetryTaskLogMessage - * - * @param partitionTasks - */ - public void processRetryLogPartitionTasks(List partitionTasks) { - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(final TransactionStatus status) { - - List uniqueIdIds = StreamUtils.toList(partitionTasks, PartitionTask::getId); - if (uniqueIdIds == null || uniqueIdIds.isEmpty()) { - return; - } - // Waiting for deletion RetryLog -// List retryTaskList = retryMapper.selectList(new LambdaQueryWrapper().in(RetryTask::getId, uniqueIdIds)); -// if (retryTaskList != null && !retryTaskList.isEmpty()) { -// List retryTaskListIds = StreamUtils.toList(retryTaskList, RetryTask::getId); -// retryTaskMapper.deleteByIds(retryTaskListIds); -// } - - // Waiting for deletion RetryTaskLogMessage - List retryTaskLogMessageList = retryTaskLogMessageMapper.selectList(new LambdaQueryWrapper() - .in(RetryTaskLogMessage::getRetryId, uniqueIdIds)); - if (retryTaskLogMessageList != null && !retryTaskLogMessageList.isEmpty()) { - List retryTaskListIds = StreamUtils.toList(retryTaskLogMessageList, RetryTaskLogMessage::getId); - retryTaskLogMessageMapper.deleteByIds(retryTaskListIds); - } - } - }); - } - - @Override - public void start() { - taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT4H")); - } - - @Override - public void close() { - - } -} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskSchedule.java deleted file mode 100644 index 73d676f4..00000000 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskSchedule.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.aizuda.snailjob.server.retry.task.support.schedule; - -import cn.hutool.core.lang.Pair; -import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.Lifecycle; -import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; -import com.aizuda.snailjob.server.retry.task.service.RetryService; -import com.aizuda.snailjob.template.datasource.access.AccessTemplate; -import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.time.Duration; -import java.time.Instant; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表 - * - * @author: opensnail - * @date : 2023-07-21 17:19 - * @since 2.1.0 - */ -@Component -@Slf4j -@RequiredArgsConstructor -public class RetryTaskSchedule extends AbstractSchedule implements Lifecycle { - - private final RetryService retryService; - @Autowired - protected AccessTemplate accessTemplate; - - @Override - public void start() { - taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT1H")); - } - - @Override - public void close() { - - } - - @Override - protected void doExecute() { - try { - Set> groupNameList = accessTemplate.getGroupConfigAccess() - .list(new LambdaQueryWrapper() - .select(GroupConfig::getGroupName, GroupConfig::getNamespaceId) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())) - .stream() - .map(groupConfig -> Pair.of(groupConfig.getGroupName(), groupConfig.getNamespaceId())) - .collect(Collectors.toSet()); - - for (Pair pair : groupNameList) { - retryService.moveDeadLetterAndDelFinish(pair.getKey(), pair.getValue()); - } - - } catch (Exception e) { - SnailJobLog.LOCAL.error("clearFinishAndMoveDeadLetterRetryTask 失败", e); - } - } - - @Override - public String lockName() { - return "clearFinishAndMoveDeadLetterRetryTask"; - } - - @Override - public String lockAtMost() { - return "PT60s"; - } - - @Override - public String lockAtLeast() { - return "PT60s"; - } -} diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java index 372bc4c2..5c38a17a 100644 --- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java @@ -6,13 +6,18 @@ import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor; +import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.dto.ScanTask; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Objects; /** @@ -29,7 +34,7 @@ import java.util.Objects; public class ConsumerBucketActor extends AbstractActor { private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY"; private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY"; - private static final String DEFAULT_RETRY_KEY = "DEFAULT_RETRY_KEY"; + private final SystemProperties systemProperties; @Override public Receive createReceive() { @@ -58,9 +63,13 @@ public class ConsumerBucketActor extends AbstractActor { private void doScanRetry(final ConsumerBucket consumerBucket) { ScanTask scanTask = new ScanTask(); - scanTask.setBuckets(consumerBucket.getBuckets()); - ActorRef scanRetryActorRef = cacheActorRef(DEFAULT_RETRY_KEY, SyetemTaskTypeEnum.RETRY); - scanRetryActorRef.tell(scanTask, scanRetryActorRef); + // 通过并行度配置计算拉取范围 + List> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel()); + for (List buckets : partitions) { + scanTask.setBuckets(new HashSet<>(buckets)); + ActorRef scanRetryActorRef = SyetemTaskTypeEnum.RETRY.getActorRef().get(); + scanRetryActorRef.tell(scanTask, scanRetryActorRef); + } } private void doScanJobAndWorkflow(final ConsumerBucket consumerBucket) { diff --git a/snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/mapper/RetryMapperTest.java b/snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/mapper/RetryMapperTest.java deleted file mode 100644 index 5bae0ee7..00000000 --- a/snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/mapper/RetryMapperTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.aizuda.snailjob.server.mapper; - -import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.Retry; -import com.aizuda.snailjob.template.datasource.utils.RequestDataHelper; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; - -/** - * @author: opensnail - * @date : 2021-11-03 18:03 - */ -@SpringBootTest -public class RetryMapperTest { - - @Autowired - private RetryMapper retryMapper; - - @Test - public void test() { - RequestDataHelper.setPartition(0); - Retry retry = retryMapper.selectById(1); - System.out.println(retry); - } -}