feat(1.4.0-beta1): 1.新增单个节点支持的最大调度量配置 2. 优化重试完成和最大次数合并逻辑

This commit is contained in:
opensnail 2025-02-20 22:00:23 +08:00
parent d5599fdf16
commit 30bc414248
12 changed files with 285 additions and 547 deletions

View File

@ -3,7 +3,6 @@ package com.aizuda.snailjob.template.datasource.access.task;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
import com.aizuda.snailjob.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;

View File

@ -1,116 +0,0 @@
package com.aizuda.snailjob.template.datasource.utils;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.template.datasource.exception.SnailJobDatasourceException;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* 组分区处理类
*
* @author: opensnail
* @date : 2022-02-28 17:18
* @since 1.0.0
*/
public class RequestDataHelper {
/**
* 请求参数存取
*/
private static final ThreadLocal<Map<String, Object>> REQUEST_DATA = new ThreadLocal<>();
private static final String PARTITION = "group-partition";
/**
* 设置请求参数
*
* @param requestData 请求参数 MAP 对象
*/
public static void setRequestData(Map<String, Object> requestData) {
REQUEST_DATA.set(requestData);
}
/**
* 设置分区
*
* @param partition
*/
public static void setPartition(int partition) {
Map<String, Object> map = new HashMap<>();
map.put(PARTITION, partition);
RequestDataHelper.setRequestData(map);
}
/**
* 设置分区
*
* @param groupName 组名称
*/
public static void setPartition(String groupName, String namespaceId) {
if (StrUtil.isBlank(groupName) && StrUtil.isNotBlank(namespaceId)) {
throw new SnailJobDatasourceException("组名称或者命名空间ID不能为空");
}
GroupConfigMapper groupConfigMapper = SnailSpringContext.getBeanByType(GroupConfigMapper.class);
GroupConfig groupConfig = groupConfigMapper.selectOne(
new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupPartition)
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupName, groupName));
if (Objects.isNull(groupConfig)) {
throw new SnailJobDatasourceException("groupName:[{}]不存在", groupName);
}
setPartition(groupConfig.getGroupPartition());
}
/**
* 获取请求参数
*
* @param param 请求参数
* @return 请求参数 MAP 对象
*/
public static <T> T getRequestData(String param) {
Map<String, Object> dataMap = getRequestData();
if (CollectionUtils.isNotEmpty(dataMap)) {
return (T) dataMap.get(param);
}
return null;
}
/**
* 获取请求参数
*
* @return 请求参数 MAP 对象
*/
public static Map<String, Object> getRequestData() {
return REQUEST_DATA.get();
}
public static Integer getPartition() {
Map<String, Object> requestData = getRequestData();
if (CollUtil.isEmpty(requestData)) {
return null;
}
return (Integer) requestData.get(PARTITION);
}
public static void remove() {
REQUEST_DATA.remove();
}
}

View File

@ -30,16 +30,16 @@ public class SystemProperties {
*/
private int retryPullPageSize = 1000;
/**
* 重试任务拉取的并行度
*/
private int retryMaxPullParallel = 10;
/**
* 任务调度每次拉取的条数
*/
private int jobPullPageSize = 1000;
/**
* 重试每次拉取的次数
*/
private int retryMaxPullCount = 10;
/**
* netty 端口
* see: serverPort
@ -64,10 +64,11 @@ public class SystemProperties {
* server token
*/
private String serverToken = "SJ_H9HGGmrX3QBVTfsAAG2mcKH3SR7bCLsK";
/**
* 一个客户端每秒最多接收的重试数量指令
* 单个节点支持的最大调度量
*/
private int limiter = 100;
private int maxDispatchCapacity = 5000;
/**
* 号段模式下步长配置 默认100
@ -77,7 +78,7 @@ public class SystemProperties {
/**
* 日志默认保存天数
*/
private int logStorage = 90;
private int logStorage = 7;
/**
* 合并日志默认保存天数

View File

@ -30,4 +30,20 @@ public class RetryPartitionTask extends PartitionTask {
private Integer retryCount;
private String idempotentId;
private String bizNo;
private String argsStr;
private String extAttrs;
private String executorName;
private Integer retryStatus;
private Long parentId;
private Integer bucketIndex;
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.retry.task.service;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import org.mapstruct.*;
@ -22,9 +23,9 @@ public interface RetryDeadLetterConverter {
@Mapping(target = "id", ignore = true),
@Mapping(target = "createDt", ignore = true)
})
RetryDeadLetter toRetryDeadLetter(Retry retryTasks);
RetryDeadLetter toRetryDeadLetter(RetryPartitionTask retryTasks);
@IterableMapping(qualifiedByName = "ignoreId")
List<RetryDeadLetter> toRetryDeadLetter(List<Retry> retries);
List<RetryDeadLetter> toRetryDeadLetter(List<RetryPartitionTask> retries);
}

View File

@ -1,160 +0,0 @@
package com.aizuda.snailjob.server.retry.task.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter;
import com.aizuda.snailjob.server.retry.task.service.RetryService;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 重试服务层实现
* todo 重新设计
* @author: opensnail
* @date : 2021-11-26 15:19
*/
@Service
@Slf4j
public class RetryServiceImpl implements RetryService {
@Autowired
private AccessTemplate accessTemplate;
@Autowired
private ApplicationContext context;
@Transactional
@Override
public Boolean moveDeadLetterAndDelFinish(String groupName, String namespaceId) {
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
RequestDataHelper.setPartition(groupName, namespaceId);
List<Retry> callbackRetries = retryTaskAccess.listPage(new PageDTO<>(0, 100),
new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.in(Retry::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(),
RetryStatusEnum.FINISH.getStatus())
.eq(Retry::getTaskType, SyetemTaskTypeEnum.CALLBACK.getType())
.eq(Retry::getGroupName, groupName)
.orderByDesc(Retry::getId)).getRecords();
if (CollUtil.isEmpty(callbackRetries)) {
return Boolean.TRUE;
}
Set<Long> uniqueIdSet = StreamUtils.toSet(callbackRetries, Retry::getId);
List<Retry> retries = accessTemplate.getRetryAccess()
.list(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
// .in(Retry::getUniqueId, uniqueIdSet)
);
// 迁移重试失败的数据
List<Retry> waitMoveDeadLetters = new ArrayList<>();
List<Retry> maxCountRetryList = retries.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
if (CollUtil.isNotEmpty(maxCountRetryList)) {
waitMoveDeadLetters.addAll(maxCountRetryList);
}
List<Retry> maxCountCallbackRetryList = callbackRetries.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
if (CollUtil.isNotEmpty(maxCountRetryList)) {
waitMoveDeadLetters.addAll(maxCountCallbackRetryList);
}
moveDeadLetters(groupName, namespaceId, waitMoveDeadLetters);
// 删除重试完成的数据
Set<Long> waitDelRetryFinishSet = new HashSet<>();
Set<Long> finishRetryIdList = retries.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(Retry::getId)
.collect(Collectors.toSet());
if (CollUtil.isNotEmpty(finishRetryIdList)) {
waitDelRetryFinishSet.addAll(finishRetryIdList);
}
Set<Long> finishCallbackRetryIdList = callbackRetries.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(Retry::getId)
.collect(Collectors.toSet());
// 迁移重试失败的数据
if (CollUtil.isNotEmpty(finishCallbackRetryIdList)) {
waitDelRetryFinishSet.addAll(finishCallbackRetryIdList);
}
if (CollUtil.isEmpty(waitDelRetryFinishSet)) {
return Boolean.TRUE;
}
Assert.isTrue(waitDelRetryFinishSet.size() == accessTemplate.getRetryAccess()
.delete(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, groupName)
.in(Retry::getId, waitDelRetryFinishSet)),
() -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries)));
return Boolean.TRUE;
}
/**
* 迁移死信队列数据
*
* @param groupName 组id
* @param retries 待迁移数据
*/
private void moveDeadLetters(String groupName, String namespaceId, List<Retry> retries) {
if (CollUtil.isEmpty(retries)) {
return;
}
List<RetryDeadLetter> retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retries);
LocalDateTime now = LocalDateTime.now();
for (RetryDeadLetter retryDeadLetter : retryDeadLetters) {
retryDeadLetter.setCreateDt(now);
}
Assert.isTrue(retryDeadLetters.size() == accessTemplate
.getRetryDeadLetterAccess().insertBatch(retryDeadLetters),
() -> new SnailJobServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
Assert.isTrue(retries.size() == retryTaskAccess.delete(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, groupName)
.in(Retry::getId, StreamUtils.toList(retries, Retry::getId))),
() -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries)));
context.publishEvent(new RetryTaskFailDeadLetterAlarmEvent(retryDeadLetters));
}
}

View File

@ -25,6 +25,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.util.concurrent.RateLimiter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -32,6 +33,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* <p>
@ -49,10 +51,16 @@ public class ScanRetryActor extends AbstractActor {
private final SystemProperties systemProperties;
private final AccessTemplate accessTemplate;
private final RetryMapper retryMapper;
private final static RateLimiter rateLimiter = RateLimiter.create(1000, 1, TimeUnit.SECONDS);
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
// 覆盖每秒产生多少个令牌的值
double permitsPerSecond = systemProperties.getMaxDispatchCapacity();
if (permitsPerSecond >= 1 && permitsPerSecond != rateLimiter.getRate()) {
rateLimiter.setRate(permitsPerSecond);
}
try {
doScan(config);
@ -68,7 +76,13 @@ public class ScanRetryActor extends AbstractActor {
PartitionTaskUtils.process(startId ->
listAvailableTasks(startId, scanTask.getBuckets()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask),
0);
partitionTasks -> {
if (!rateLimiter.tryAcquire(partitionTasks.size())) {
log.warn("当前节点触发限流 [{}]", rateLimiter.getRate());
return false;
}
return true;
}, 0);
}

View File

@ -0,0 +1,229 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
/**
* Retry清理线程
* 1. 删除日志信息
* 2. 删除重试已完成的数据
* 3. 删除回调任务数据
* 4. 删除调度日志 sj_retry_task
* 5. 迁移到达最大重试的数据
*
* @author: opensnail
* @date : 2023-07-21 13:32
* @since 2.1.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
private final RetryMapper retryMapper;
private final RetryTaskMapper retryTaskMapper;
private final SystemProperties systemProperties;
private final RetryTaskLogMessageMapper retryTaskLogMessageMapper;
private final TransactionTemplate transactionTemplate;
private final AccessTemplate accessTemplate;
@Override
public String lockName() {
return "clearLog";
}
@Override
public String lockAtMost() {
return "PT4H";
}
@Override
public String lockAtLeast() {
return "PT1M";
}
@Override
protected void doExecute() {
try {
// 清除日志默认保存天数大于零最少保留最近一天的日志数据
if (systemProperties.getLogStorage() <= 1) {
SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage());
return;
}
// clean retry log
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());
long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime),
this::processRetryLogPartitionTasks, 0);
SnailJobLog.LOCAL.debug("Retry clear success total:[{}]", total);
} catch (Exception e) {
SnailJobLog.LOCAL.error("clear log error", e);
}
}
/**
* RetryLog List
*
* @param startId
* @param endTime
* @return
*/
private List<RetryPartitionTask> retryTaskBatchList(Long startId, LocalDateTime endTime) {
List<Retry> retryTaskList = retryMapper.selectPage(
new Page<>(0, 500),
new LambdaUpdateWrapper<Retry>()
.ge(Retry::getId, startId)
.le(Retry::getCreateDt, endTime)
.eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.ne(Retry::getDeleted, StatusEnum.NO.getStatus())
.orderByAsc(Retry::getId))
.getRecords();
return RetryTaskConverter.INSTANCE.toRetryTaskLogPartitionTasks(retryTaskList);
}
/**
* clean table RetryTaskLog & RetryTaskLogMessage
*
* @param partitionTasks
*/
public void processRetryLogPartitionTasks(List<? extends PartitionTask> partitionTasks) {
List<Long> retryIds = StreamUtils.toList(partitionTasks, PartitionTask::getId);
if (CollectionUtils.isEmpty(retryIds)) {
return;
}
// 查询回调数据
List<Retry> cbRetries = retryMapper.selectList(new LambdaQueryWrapper<Retry>()
.select(Retry::getId).in(Retry::getParentId, retryIds));
List<Long> totalWaitRetryIds = Lists.newArrayList(retryIds);
List<Long> cbRetryIds = Lists.newArrayList();
if (!CollectionUtils.isEmpty(cbRetries)) {
cbRetryIds = StreamUtils.toList(cbRetries, Retry::getId);
totalWaitRetryIds.addAll(cbRetryIds);
}
List<RetryPartitionTask> retryPartitionTasks = (List<RetryPartitionTask>) partitionTasks;
List<Long> finishRetryIds = retryPartitionTasks.stream().filter(retryPartitionTask ->
RetryStatusEnum.FINISH.getStatus().equals(retryPartitionTask.getRetryStatus()))
.map(PartitionTask::getId).toList();
// 删除重试任务
List<RetryTask> retryTaskList = retryTaskMapper.selectList(new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getId, totalWaitRetryIds));
// 删除重试日志信息
List<RetryTaskLogMessage> retryTaskLogMessageList = retryTaskLogMessageMapper.selectList(
new LambdaQueryWrapper<RetryTaskLogMessage>().in(RetryTaskLogMessage::getRetryId, retryIds));
List<Long> finalCbRetryIds = cbRetryIds;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// 删除回调数据
retryMapper.deleteByIds(finalCbRetryIds);
// 删除重试完成的数据
retryMapper.deleteByIds(finishRetryIds);
// 删除重试任务
if (!CollectionUtils.isEmpty(retryTaskList)) {
List<Long> retryTaskIds = StreamUtils.toList(retryTaskList, RetryTask::getId);
Lists.partition(retryTaskIds, 500).forEach(retryTaskMapper::deleteByIds);
}
if (!CollectionUtils.isEmpty(retryTaskLogMessageList)) {
List<Long> retryTaskLogMessageIds = StreamUtils.toList(retryTaskLogMessageList, RetryTaskLogMessage::getId);
Lists.partition(retryTaskLogMessageIds, 500).forEach(retryTaskLogMessageMapper::deleteByIds);
}
}
});
// 重试最大次数迁移死信表
List<RetryPartitionTask> maxCountRetries = retryPartitionTasks.stream().filter(retryPartitionTask ->
RetryStatusEnum.MAX_COUNT.getStatus().equals(retryPartitionTask.getRetryStatus())).toList();
moveDeadLetters(maxCountRetries);
}
/**
* 迁移死信队列数据
*
* @param retries 待迁移数据
*/
private void moveDeadLetters(List<RetryPartitionTask> retries) {
if (CollUtil.isEmpty(retries)) {
return;
}
List<RetryDeadLetter> retryDeadLetters = RetryDeadLetterConverter.INSTANCE.toRetryDeadLetter(retries);
LocalDateTime now = LocalDateTime.now();
for (RetryDeadLetter retryDeadLetter : retryDeadLetters) {
retryDeadLetter.setCreateDt(now);
}
Assert.isTrue(retryDeadLetters.size() == accessTemplate
.getRetryDeadLetterAccess().insertBatch(retryDeadLetters),
() -> new SnailJobServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
Assert.isTrue(retries.size() == retryTaskAccess.delete(new LambdaQueryWrapper<Retry>()
.in(Retry::getId, StreamUtils.toList(retries, RetryPartitionTask::getId))),
() -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries)));
}
@Override
public void start() {
taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT4H"));
}
@Override
public void close() {
}
}

View File

@ -1,147 +0,0 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
/**
* Retry清理日志 一小时运行一次
*
* @author: opensnail
* @date : 2023-07-21 13:32
* @since 2.1.0
*/
@Component
@Slf4j
public class ClearLogSchedule extends AbstractSchedule implements Lifecycle {
@Autowired
private RetryMapper retryMapper;
@Autowired
private SystemProperties systemProperties;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Autowired
private TransactionTemplate transactionTemplate;
@Override
public String lockName() {
return "clearLog";
}
@Override
public String lockAtMost() {
return "PT4H";
}
@Override
public String lockAtLeast() {
return "PT1M";
}
@Override
protected void doExecute() {
try {
// 清除日志默认保存天数大于零最少保留最近一天的日志数据
if (systemProperties.getLogStorage() <= 1) {
SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage());
return;
}
// clean retry log
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());
long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime),
this::processRetryLogPartitionTasks, 0);
SnailJobLog.LOCAL.debug("Retry clear success total:[{}]", total);
} catch (Exception e) {
SnailJobLog.LOCAL.error("clear log error", e);
}
}
/**
* RetryLog List
*
* @param startId
* @param endTime
* @return
*/
private List<RetryPartitionTask> retryTaskBatchList(Long startId, LocalDateTime endTime) {
List<Retry> retryTaskList = retryMapper.selectPage(
new Page<>(0, 1000),
new LambdaUpdateWrapper<Retry>()
.ge(Retry::getId, startId)
.le(Retry::getCreateDt, endTime)
.orderByAsc(Retry::getId))
.getRecords();
return RetryTaskConverter.INSTANCE.toRetryTaskLogPartitionTasks(retryTaskList);
}
/**
* clean table RetryTaskLog & RetryTaskLogMessage
*
* @param partitionTasks
*/
public void processRetryLogPartitionTasks(List<? extends PartitionTask> partitionTasks) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
List<Long> uniqueIdIds = StreamUtils.toList(partitionTasks, PartitionTask::getId);
if (uniqueIdIds == null || uniqueIdIds.isEmpty()) {
return;
}
// Waiting for deletion RetryLog
// List<RetryTask> retryTaskList = retryMapper.selectList(new LambdaQueryWrapper<RetryTask>().in(RetryTask::getId, uniqueIdIds));
// if (retryTaskList != null && !retryTaskList.isEmpty()) {
// List<Long> retryTaskListIds = StreamUtils.toList(retryTaskList, RetryTask::getId);
// retryTaskMapper.deleteByIds(retryTaskListIds);
// }
// Waiting for deletion RetryTaskLogMessage
List<RetryTaskLogMessage> retryTaskLogMessageList = retryTaskLogMessageMapper.selectList(new LambdaQueryWrapper<RetryTaskLogMessage>()
.in(RetryTaskLogMessage::getRetryId, uniqueIdIds));
if (retryTaskLogMessageList != null && !retryTaskLogMessageList.isEmpty()) {
List<Long> retryTaskListIds = StreamUtils.toList(retryTaskLogMessageList, RetryTaskLogMessage::getId);
retryTaskLogMessageMapper.deleteByIds(retryTaskListIds);
}
}
});
}
@Override
public void start() {
taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT4H"));
}
@Override
public void close() {
}
}

View File

@ -1,82 +0,0 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.retry.task.service.RetryService;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表
*
* @author: opensnail
* @date : 2023-07-21 17:19
* @since 2.1.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class RetryTaskSchedule extends AbstractSchedule implements Lifecycle {
private final RetryService retryService;
@Autowired
protected AccessTemplate accessTemplate;
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT1H"));
}
@Override
public void close() {
}
@Override
protected void doExecute() {
try {
Set<Pair<String/*groupName*/, String/*namespaceId*/>> groupNameList = accessTemplate.getGroupConfigAccess()
.list(new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName, GroupConfig::getNamespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()))
.stream()
.map(groupConfig -> Pair.of(groupConfig.getGroupName(), groupConfig.getNamespaceId()))
.collect(Collectors.toSet());
for (Pair<String/*groupName*/, String/*namespaceId*/> pair : groupNameList) {
retryService.moveDeadLetterAndDelFinish(pair.getKey(), pair.getValue());
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("clearFinishAndMoveDeadLetterRetryTask 失败", e);
}
}
@Override
public String lockName() {
return "clearFinishAndMoveDeadLetterRetryTask";
}
@Override
public String lockAtMost() {
return "PT60s";
}
@Override
public String lockAtLeast() {
return "PT60s";
}
}

View File

@ -6,13 +6,18 @@ import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheGroupScanActor;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
/**
@ -29,7 +34,7 @@ import java.util.Objects;
public class ConsumerBucketActor extends AbstractActor {
private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
private static final String DEFAULT_RETRY_KEY = "DEFAULT_RETRY_KEY";
private final SystemProperties systemProperties;
@Override
public Receive createReceive() {
@ -58,9 +63,13 @@ public class ConsumerBucketActor extends AbstractActor {
private void doScanRetry(final ConsumerBucket consumerBucket) {
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
ActorRef scanRetryActorRef = cacheActorRef(DEFAULT_RETRY_KEY, SyetemTaskTypeEnum.RETRY);
scanRetryActorRef.tell(scanTask, scanRetryActorRef);
// 通过并行度配置计算拉取范围
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(consumerBucket.getBuckets()), systemProperties.getRetryMaxPullParallel());
for (List<Integer> buckets : partitions) {
scanTask.setBuckets(new HashSet<>(buckets));
ActorRef scanRetryActorRef = SyetemTaskTypeEnum.RETRY.getActorRef().get();
scanRetryActorRef.tell(scanTask, scanRetryActorRef);
}
}
private void doScanJobAndWorkflow(final ConsumerBucket consumerBucket) {

View File

@ -1,26 +0,0 @@
package com.aizuda.snailjob.server.mapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.utils.RequestDataHelper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author: opensnail
* @date : 2021-11-03 18:03
*/
@SpringBootTest
public class RetryMapperTest {
@Autowired
private RetryMapper retryMapper;
@Test
public void test() {
RequestDataHelper.setPartition(0);
Retry retry = retryMapper.selectById(1);
System.out.println(retry);
}
}