feat(1.4.0-beta1): 1. 调试死信列表

This commit is contained in:
opensnail 2025-02-23 21:31:09 +08:00
parent 5b05f6fe0d
commit 3f95f49e16
9 changed files with 88 additions and 101 deletions

View File

@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.annotation.SnailEndPoint;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.core.IdempotentIdGenerate;
import com.aizuda.snailjob.client.core.RetryArgSerializer;
@ -18,6 +19,7 @@ import com.aizuda.snailjob.client.core.exception.SnailRetryClientException;
import com.aizuda.snailjob.client.core.executor.RemoteCallbackExecutor;
import com.aizuda.snailjob.client.core.executor.RemoteRetryExecutor;
import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader;
import com.aizuda.snailjob.client.core.log.RetryLogMeta;
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
import com.aizuda.snailjob.client.core.serializer.JacksonSerializer;
import com.aizuda.snailjob.client.core.timer.StopTaskTimerTask;
@ -31,6 +33,7 @@ import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.IdempotentIdContext;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -72,12 +75,17 @@ public class SnailRetryEndPoint implements Lifecycle {
@Mapping(path = RETRY_DISPATCH, method = RequestMethod.POST)
public Result<Boolean> dispatch(@Valid DispatchRetryRequest request) {
RemoteRetryContext retryContext = bulidRemoteRetryContext(request);
RetryerInfo retryerInfo = RetryerInfoCache.get(request.getSceneName(), request.getExecutorName());
if (Objects.isNull(retryerInfo)) {
SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", request.getSceneName());
return new Result<>(StatusEnum.NO.getStatus(), MessageFormat.format("场景:[{0}]配置不存在, 请检查您的场景和执行器是否存在", request.getSceneName()));
}
// 初始化实时日志上下文
initLogContext(retryContext);
RetryArgSerializer retryArgSerializer = SnailRetrySpiLoader.loadRetryArgSerializer();
Object[] deSerialize;
@ -89,21 +97,10 @@ public class SnailRetryEndPoint implements Lifecycle {
return new Result<>(StatusEnum.NO.getStatus(), MessageFormat.format("参数解析异常 args:[{0}]", request.getArgsStr()));
}
RemoteRetryContext retryContext = new RemoteRetryContext();
retryContext.setDeSerialize(deSerialize);
retryContext.setRetryTaskId(request.getRetryTaskId());
retryContext.setRetryId(request.getRetryId());
retryContext.setRetryCount(request.getRetryCount());
retryContext.setArgsStr(request.getArgsStr());
retryContext.setGroupName(request.getGroupName());
retryContext.setNamespaceId(request.getNamespaceId());
retryContext.setScene(request.getSceneName());
retryContext.setExecutorName(request.getExecutorName());
ListeningExecutorService decorator = MoreExecutors.listeningDecorator(dispatcherThreadPool);
ListenableFuture<DispatchRetryResultDTO> submit = decorator.submit(() -> {
return remoteRetryExecutor.doRetry(retryContext);
});
ListenableFuture<DispatchRetryResultDTO> submit = decorator.submit(() -> remoteRetryExecutor.doRetry(retryContext));
FutureCache.addFuture(request.getRetryTaskId(), submit);
Futures.addCallback(submit, new RetryTaskExecutorFutureCallback(retryContext), decorator);
@ -116,11 +113,35 @@ public class SnailRetryEndPoint implements Lifecycle {
return new Result<>(Boolean.TRUE);
}
private static RemoteRetryContext bulidRemoteRetryContext(DispatchRetryRequest request) {
RemoteRetryContext retryContext = new RemoteRetryContext();
retryContext.setRetryTaskId(request.getRetryTaskId());
retryContext.setRetryId(request.getRetryId());
retryContext.setRetryCount(request.getRetryCount());
retryContext.setArgsStr(request.getArgsStr());
retryContext.setGroupName(request.getGroupName());
retryContext.setNamespaceId(request.getNamespaceId());
retryContext.setScene(request.getSceneName());
retryContext.setExecutorName(request.getExecutorName());
return retryContext;
}
private static void initLogContext(RemoteRetryContext context) {
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(context.getGroupName());
retryLogMeta.setNamespaceId(context.getNamespaceId());
retryLogMeta.setRetryId(context.getRetryId());
retryLogMeta.setRetryTaskId(context.getRetryTaskId());
SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
}
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
public Result<Boolean> callback(@Valid RetryCallbackRequest callbackDTO) {
CallbackContext callbackContext = new CallbackContext();
CallbackContext callbackContext = buildCallbackContext(callbackDTO);
try {
initLogContext(callbackContext);
RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getSceneName(), callbackDTO.getExecutorName());
if (Objects.isNull(retryerInfo)) {
SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getSceneName());
@ -138,13 +159,6 @@ public class SnailRetryEndPoint implements Lifecycle {
return new Result<>(0, "回调失败", Boolean.FALSE);
}
callbackContext.setRetryTaskId(callbackDTO.getRetryTaskId());
callbackContext.setRetryId(callbackDTO.getRetryId());
callbackContext.setGroupName(callbackDTO.getGroupName());
callbackContext.setNamespaceId(callbackDTO.getNamespaceId());
callbackContext.setSceneName(callbackDTO.getSceneName());
callbackContext.setRetryStatus(callbackDTO.getRetryStatus());
ListeningExecutorService decorator = MoreExecutors.listeningDecorator(dispatcherThreadPool);
ListenableFuture<Boolean> submit = decorator.submit(() -> {
remoteCallbackExecutor.doRetryCallback(callbackContext);
@ -161,6 +175,27 @@ public class SnailRetryEndPoint implements Lifecycle {
return new Result<>(Boolean.TRUE);
}
private static CallbackContext buildCallbackContext(RetryCallbackRequest callbackDTO) {
CallbackContext callbackContext = new CallbackContext();
callbackContext.setRetryTaskId(callbackDTO.getRetryTaskId());
callbackContext.setRetryId(callbackDTO.getRetryId());
callbackContext.setGroupName(callbackDTO.getGroupName());
callbackContext.setNamespaceId(callbackDTO.getNamespaceId());
callbackContext.setSceneName(callbackDTO.getSceneName());
callbackContext.setRetryStatus(callbackDTO.getRetryStatus());
return callbackContext;
}
private static void initLogContext(CallbackContext context) {
// 初始化实时日志上下文
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(context.getGroupName());
retryLogMeta.setNamespaceId(context.getNamespaceId());
retryLogMeta.setRetryTaskId(context.getRetryTaskId());
retryLogMeta.setRetryId(context.getRetryId());
SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
}
/**
* 手动新增重试数据模拟生成idempotentId

View File

@ -41,13 +41,8 @@ public class RemoteCallbackExecutor {
public void doRetryCallback(CallbackContext context) throws NoSuchMethodException, InstantiationException,
IllegalAccessException {
try {
// 初始化实时日志上下文
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(context.getGroupName());
retryLogMeta.setNamespaceId(context.getNamespaceId());
retryLogMeta.setRetryTaskId(context.getRetryTaskId());
retryLogMeta.setRetryId(context.getRetryId());
SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
initLogContext(context);
// 以Spring Bean模式回调
doCallbackForSpringBean(context);
@ -60,6 +55,16 @@ public class RemoteCallbackExecutor {
}
}
private static void initLogContext(CallbackContext context) {
// 初始化实时日志上下文
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(context.getGroupName());
retryLogMeta.setNamespaceId(context.getNamespaceId());
retryLogMeta.setRetryTaskId(context.getRetryTaskId());
retryLogMeta.setRetryId(context.getRetryId());
SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
}
/**
* 以Spring Bean模式回调
*

View File

@ -47,12 +47,7 @@ public class RemoteRetryExecutor {
RetrySiteSnapshot.setAttemptNumber(context.getRetryCount());
// 初始化实时日志上下文
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(context.getGroupName());
retryLogMeta.setNamespaceId(context.getNamespaceId());
retryLogMeta.setRetryId(context.getRetryId());
retryLogMeta.setRetryTaskId(context.getRetryTaskId());
SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
initLogContext(context);
RetryerResultContext retryerResultContext = remoteRetryStrategies.openRetry(context.getScene(),
context.getExecutorName(), context.getDeSerialize());
@ -96,4 +91,13 @@ public class RemoteRetryExecutor {
return executeRespDto;
}
private static void initLogContext(RemoteRetryContext context) {
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(context.getGroupName());
retryLogMeta.setNamespaceId(context.getNamespaceId());
retryLogMeta.setRetryId(context.getRetryId());
retryLogMeta.setRetryTaskId(context.getRetryTaskId());
SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
}
}

View File

@ -151,8 +151,9 @@ public class ScanRetryActor extends AbstractActor {
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval,
RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType,
RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout)
RetrySceneConfig::getBlockStrategy, RetrySceneConfig::getSceneName,
RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval,
RetrySceneConfig::getExecutorTimeout)
.eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
.in(RetrySceneConfig::getSceneName, sceneNameSet));
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);

View File

@ -84,13 +84,13 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
protected void doExecute() {
try {
// 清除日志默认保存天数大于零最少保留最近一天的日志数据
if (systemProperties.getLogStorage() <= 1) {
SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage());
return;
}
// if (systemProperties.getLogStorage() <= 1) {
// SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage());
// return;
// }
// clean retry log
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());
LocalDateTime endTime = LocalDateTime.now();
long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime),
this::processRetryLogPartitionTasks, 0);

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.web.model.request.BatchRollBackRetryDeadLetter
import com.aizuda.snailjob.server.web.model.request.RetryDeadLetterQueryVO;
import com.aizuda.snailjob.server.web.model.response.RetryDeadLetterResponseVO;
import com.aizuda.snailjob.server.web.service.RetryDeadLetterService;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@ -21,10 +22,9 @@ import java.util.List;
*/
@RestController
@RequestMapping("/retry-dead-letter")
@RequiredArgsConstructor
public class RetryDeadLetterController {
@Autowired
private RetryDeadLetterService retryDeadLetterService;
private final RetryDeadLetterService retryDeadLetterService;
@LoginRequired
@GetMapping("list")

View File

@ -16,13 +16,6 @@ import java.util.List;
@Data
public class BatchDeleteRetryDeadLetterVO {
/**
* 组名称
*/
@NotBlank(message = "groupName 不能为空")
@Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线")
private String groupName;
/**
* 重试表id
*/

View File

@ -1,8 +1,6 @@
package com.aizuda.snailjob.server.web.model.request;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Pattern;
import lombok.Data;
import java.util.List;
@ -16,13 +14,6 @@ import java.util.List;
@Data
public class BatchRollBackRetryDeadLetterVO {
/**
* 组名称
*/
@NotBlank(message = "groupName 不能为空")
@Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线")
private String groupName;
/**
* 重试表id
*/

View File

@ -60,10 +60,6 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
public PageResult<List<RetryDeadLetterResponseVO>> getRetryDeadLetterPage(RetryDeadLetterQueryVO queryVO) {
PageDTO<RetryDeadLetter> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
if (StrUtil.isBlank(queryVO.getGroupName())) {
return new PageResult<>(pageDTO, new ArrayList<>());
}
List<String> groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName());
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
@ -97,7 +93,6 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
String groupName = rollBackRetryDeadLetterVO.getGroupName();
List<Long> ids = rollBackRetryDeadLetterVO.getIds();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
List<RetryDeadLetter> retryDeadLetterList = retryDeadLetterAccess.list(
@ -143,22 +138,12 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
Set<Long> waitDelRetryDeadLetterIdSet = StreamUtils.toSet(retryDeadLetterList, RetryDeadLetter::getId);
Assert.isTrue(waitDelRetryDeadLetterIdSet.size() == retryDeadLetterAccess.delete(
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, groupName)
.in(RetryDeadLetter::getId, waitDelRetryDeadLetterIdSet)),
() -> new SnailJobServerException("删除死信队列数据失败"));
// 变更日志的状态
RetryTask retryTask = new RetryTask();
retryTask.setTaskStatus(RetryStatusEnum.RUNNING.getStatus());
// Set<String> uniqueIdSet = StreamUtils.toSet(waitRollbackList, Retry::getId);
// int update = retryTaskMapper.update(retryTask, new LambdaUpdateWrapper<RetryTask>()
// .eq(RetryTask::getNamespaceId, namespaceId)
// .in(RetryTask::getUniqueId, uniqueIdSet)
// .eq(RetryTask::getGroupName, groupName));
// Assert.isTrue(update == uniqueIdSet.size(),
// () -> new SnailJobServerException("回滚日志状态失败, 可能原因: 日志信息缺失或存在多个相同uniqueId"));
return 1;
}
@ -170,36 +155,9 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
Assert.isTrue(deadLetterVO.getIds().size() == retryDeadLetterAccess.delete(
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds())),
() -> new SnailJobServerException("删除死信任务失败"));
List<RetryDeadLetter> tasks = retryDeadLetterAccess.list(
new LambdaQueryWrapper<RetryDeadLetter>()
.select(RetryDeadLetter::getId)
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds())
);
if (CollUtil.isEmpty(tasks)) {
return Boolean.TRUE;
}
// retryTaskMapper.delete(new LambdaQueryWrapper<RetryTask>()
// .in(RetryTask::getTaskStatus, ALLOW_DELETE_STATUS)
// .eq(RetryTask::getNamespaceId, namespaceId)
// .eq(RetryTask::getGroupName, deadLetterVO.getGroupName())
// .in(RetryTask::getUniqueId, uniqueIds));
retryTaskLogMessageMapper.delete(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, deadLetterVO.getGroupName())
.in(RetryTaskLogMessage::getRetryId, deadLetterVO.getIds()));
return Boolean.TRUE;
}
}