diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java index 4c137932c..74dbc519a 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java @@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.common.Lifecycle; import com.aizuda.snailjob.client.common.annotation.Mapping; import com.aizuda.snailjob.client.common.annotation.SnailEndPoint; import com.aizuda.snailjob.client.common.config.SnailJobProperties; +import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager; import com.aizuda.snailjob.client.common.rpc.client.RequestMethod; import com.aizuda.snailjob.client.core.IdempotentIdGenerate; import com.aizuda.snailjob.client.core.RetryArgSerializer; @@ -18,6 +19,7 @@ import com.aizuda.snailjob.client.core.exception.SnailRetryClientException; import com.aizuda.snailjob.client.core.executor.RemoteCallbackExecutor; import com.aizuda.snailjob.client.core.executor.RemoteRetryExecutor; import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader; +import com.aizuda.snailjob.client.core.log.RetryLogMeta; import com.aizuda.snailjob.client.core.retryer.RetryerInfo; import com.aizuda.snailjob.client.core.serializer.JacksonSerializer; import com.aizuda.snailjob.client.core.timer.StopTaskTimerTask; @@ -31,6 +33,7 @@ import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.IdempotentIdContext; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.common.log.enums.LogTypeEnum; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -72,12 +75,17 @@ public class SnailRetryEndPoint implements Lifecycle { @Mapping(path = RETRY_DISPATCH, method = RequestMethod.POST) public Result dispatch(@Valid DispatchRetryRequest request) { + RemoteRetryContext retryContext = bulidRemoteRetryContext(request); + RetryerInfo retryerInfo = RetryerInfoCache.get(request.getSceneName(), request.getExecutorName()); if (Objects.isNull(retryerInfo)) { SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", request.getSceneName()); return new Result<>(StatusEnum.NO.getStatus(), MessageFormat.format("场景:[{0}]配置不存在, 请检查您的场景和执行器是否存在", request.getSceneName())); } + // 初始化实时日志上下文 + initLogContext(retryContext); + RetryArgSerializer retryArgSerializer = SnailRetrySpiLoader.loadRetryArgSerializer(); Object[] deSerialize; @@ -89,21 +97,10 @@ public class SnailRetryEndPoint implements Lifecycle { return new Result<>(StatusEnum.NO.getStatus(), MessageFormat.format("参数解析异常 args:[{0}]", request.getArgsStr())); } - RemoteRetryContext retryContext = new RemoteRetryContext(); retryContext.setDeSerialize(deSerialize); - retryContext.setRetryTaskId(request.getRetryTaskId()); - retryContext.setRetryId(request.getRetryId()); - retryContext.setRetryCount(request.getRetryCount()); - retryContext.setArgsStr(request.getArgsStr()); - retryContext.setGroupName(request.getGroupName()); - retryContext.setNamespaceId(request.getNamespaceId()); - retryContext.setScene(request.getSceneName()); - retryContext.setExecutorName(request.getExecutorName()); ListeningExecutorService decorator = MoreExecutors.listeningDecorator(dispatcherThreadPool); - ListenableFuture submit = decorator.submit(() -> { - return remoteRetryExecutor.doRetry(retryContext); - }); + ListenableFuture submit = decorator.submit(() -> remoteRetryExecutor.doRetry(retryContext)); FutureCache.addFuture(request.getRetryTaskId(), submit); Futures.addCallback(submit, new RetryTaskExecutorFutureCallback(retryContext), decorator); @@ -116,11 +113,35 @@ public class SnailRetryEndPoint implements Lifecycle { return new Result<>(Boolean.TRUE); } + private static RemoteRetryContext bulidRemoteRetryContext(DispatchRetryRequest request) { + RemoteRetryContext retryContext = new RemoteRetryContext(); + retryContext.setRetryTaskId(request.getRetryTaskId()); + retryContext.setRetryId(request.getRetryId()); + retryContext.setRetryCount(request.getRetryCount()); + retryContext.setArgsStr(request.getArgsStr()); + retryContext.setGroupName(request.getGroupName()); + retryContext.setNamespaceId(request.getNamespaceId()); + retryContext.setScene(request.getSceneName()); + retryContext.setExecutorName(request.getExecutorName()); + return retryContext; + } + + private static void initLogContext(RemoteRetryContext context) { + RetryLogMeta retryLogMeta = new RetryLogMeta(); + retryLogMeta.setGroupName(context.getGroupName()); + retryLogMeta.setNamespaceId(context.getNamespaceId()); + retryLogMeta.setRetryId(context.getRetryId()); + retryLogMeta.setRetryTaskId(context.getRetryTaskId()); + SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY); + } @Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST) public Result callback(@Valid RetryCallbackRequest callbackDTO) { - CallbackContext callbackContext = new CallbackContext(); + CallbackContext callbackContext = buildCallbackContext(callbackDTO); + try { + initLogContext(callbackContext); + RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getSceneName(), callbackDTO.getExecutorName()); if (Objects.isNull(retryerInfo)) { SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getSceneName()); @@ -138,13 +159,6 @@ public class SnailRetryEndPoint implements Lifecycle { return new Result<>(0, "回调失败", Boolean.FALSE); } - callbackContext.setRetryTaskId(callbackDTO.getRetryTaskId()); - callbackContext.setRetryId(callbackDTO.getRetryId()); - callbackContext.setGroupName(callbackDTO.getGroupName()); - callbackContext.setNamespaceId(callbackDTO.getNamespaceId()); - callbackContext.setSceneName(callbackDTO.getSceneName()); - callbackContext.setRetryStatus(callbackDTO.getRetryStatus()); - ListeningExecutorService decorator = MoreExecutors.listeningDecorator(dispatcherThreadPool); ListenableFuture submit = decorator.submit(() -> { remoteCallbackExecutor.doRetryCallback(callbackContext); @@ -161,6 +175,27 @@ public class SnailRetryEndPoint implements Lifecycle { return new Result<>(Boolean.TRUE); } + private static CallbackContext buildCallbackContext(RetryCallbackRequest callbackDTO) { + CallbackContext callbackContext = new CallbackContext(); + callbackContext.setRetryTaskId(callbackDTO.getRetryTaskId()); + callbackContext.setRetryId(callbackDTO.getRetryId()); + callbackContext.setGroupName(callbackDTO.getGroupName()); + callbackContext.setNamespaceId(callbackDTO.getNamespaceId()); + callbackContext.setSceneName(callbackDTO.getSceneName()); + callbackContext.setRetryStatus(callbackDTO.getRetryStatus()); + return callbackContext; + } + + private static void initLogContext(CallbackContext context) { + // 初始化实时日志上下文 + RetryLogMeta retryLogMeta = new RetryLogMeta(); + retryLogMeta.setGroupName(context.getGroupName()); + retryLogMeta.setNamespaceId(context.getNamespaceId()); + retryLogMeta.setRetryTaskId(context.getRetryTaskId()); + retryLogMeta.setRetryId(context.getRetryId()); + SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY); + } + /** * 手动新增重试数据,模拟生成idempotentId diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteCallbackExecutor.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteCallbackExecutor.java index e9a81a110..9fa75aa7f 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteCallbackExecutor.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteCallbackExecutor.java @@ -41,13 +41,8 @@ public class RemoteCallbackExecutor { public void doRetryCallback(CallbackContext context) throws NoSuchMethodException, InstantiationException, IllegalAccessException { try { - // 初始化实时日志上下文 - RetryLogMeta retryLogMeta = new RetryLogMeta(); - retryLogMeta.setGroupName(context.getGroupName()); - retryLogMeta.setNamespaceId(context.getNamespaceId()); - retryLogMeta.setRetryTaskId(context.getRetryTaskId()); - retryLogMeta.setRetryId(context.getRetryId()); - SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY); + + initLogContext(context); // 以Spring Bean模式回调 doCallbackForSpringBean(context); @@ -60,6 +55,16 @@ public class RemoteCallbackExecutor { } } + private static void initLogContext(CallbackContext context) { + // 初始化实时日志上下文 + RetryLogMeta retryLogMeta = new RetryLogMeta(); + retryLogMeta.setGroupName(context.getGroupName()); + retryLogMeta.setNamespaceId(context.getNamespaceId()); + retryLogMeta.setRetryTaskId(context.getRetryTaskId()); + retryLogMeta.setRetryId(context.getRetryId()); + SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY); + } + /** * 以Spring Bean模式回调 * diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteRetryExecutor.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteRetryExecutor.java index bc1f3960b..3a3e618f4 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteRetryExecutor.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/executor/RemoteRetryExecutor.java @@ -47,12 +47,7 @@ public class RemoteRetryExecutor { RetrySiteSnapshot.setAttemptNumber(context.getRetryCount()); // 初始化实时日志上下文 - RetryLogMeta retryLogMeta = new RetryLogMeta(); - retryLogMeta.setGroupName(context.getGroupName()); - retryLogMeta.setNamespaceId(context.getNamespaceId()); - retryLogMeta.setRetryId(context.getRetryId()); - retryLogMeta.setRetryTaskId(context.getRetryTaskId()); - SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY); + initLogContext(context); RetryerResultContext retryerResultContext = remoteRetryStrategies.openRetry(context.getScene(), context.getExecutorName(), context.getDeSerialize()); @@ -96,4 +91,13 @@ public class RemoteRetryExecutor { return executeRespDto; } + + private static void initLogContext(RemoteRetryContext context) { + RetryLogMeta retryLogMeta = new RetryLogMeta(); + retryLogMeta.setGroupName(context.getGroupName()); + retryLogMeta.setNamespaceId(context.getNamespaceId()); + retryLogMeta.setRetryId(context.getRetryId()); + retryLogMeta.setRetryTaskId(context.getRetryTaskId()); + SnailJobLogManager.initLogInfo(retryLogMeta, LogTypeEnum.RETRY); + } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 0a2bcf131..c5935553b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -151,8 +151,9 @@ public class ScanRetryActor extends AbstractActor { List retrySceneConfigs = accessTemplate.getSceneConfigAccess() .list(new LambdaQueryWrapper() .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, - RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType, - RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout) + RetrySceneConfig::getBlockStrategy, RetrySceneConfig::getSceneName, + RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval, + RetrySceneConfig::getExecutorTimeout) .eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus()) .in(RetrySceneConfig::getSceneName, sceneNameSet)); return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java index 13899ee7c..0bffe846b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java @@ -84,13 +84,13 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle { protected void doExecute() { try { // 清除日志默认保存天数大于零、最少保留最近一天的日志数据 - if (systemProperties.getLogStorage() <= 1) { - SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage()); - return; - } +// if (systemProperties.getLogStorage() <= 1) { +// SnailJobLog.LOCAL.error("retry clear log storage error", systemProperties.getLogStorage()); +// return; +// } // clean retry log - LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); + LocalDateTime endTime = LocalDateTime.now(); long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime), this::processRetryLogPartitionTasks, 0); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryDeadLetterController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryDeadLetterController.java index 594111ce0..47062db10 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryDeadLetterController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryDeadLetterController.java @@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.web.model.request.BatchRollBackRetryDeadLetter import com.aizuda.snailjob.server.web.model.request.RetryDeadLetterQueryVO; import com.aizuda.snailjob.server.web.model.response.RetryDeadLetterResponseVO; import com.aizuda.snailjob.server.web.service.RetryDeadLetterService; +import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; @@ -21,10 +22,9 @@ import java.util.List; */ @RestController @RequestMapping("/retry-dead-letter") +@RequiredArgsConstructor public class RetryDeadLetterController { - - @Autowired - private RetryDeadLetterService retryDeadLetterService; + private final RetryDeadLetterService retryDeadLetterService; @LoginRequired @GetMapping("list") diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchDeleteRetryDeadLetterVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchDeleteRetryDeadLetterVO.java index f4deb8890..04b13786a 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchDeleteRetryDeadLetterVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchDeleteRetryDeadLetterVO.java @@ -16,13 +16,6 @@ import java.util.List; @Data public class BatchDeleteRetryDeadLetterVO { - /** - * 组名称 - */ - @NotBlank(message = "groupName 不能为空") - @Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线") - private String groupName; - /** * 重试表id */ diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchRollBackRetryDeadLetterVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchRollBackRetryDeadLetterVO.java index 8134b9564..132f6c3cf 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchRollBackRetryDeadLetterVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/BatchRollBackRetryDeadLetterVO.java @@ -1,8 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.Pattern; import lombok.Data; import java.util.List; @@ -16,13 +14,6 @@ import java.util.List; @Data public class BatchRollBackRetryDeadLetterVO { - /** - * 组名称 - */ - @NotBlank(message = "groupName 不能为空") - @Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线") - private String groupName; - /** * 重试表id */ diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java index d130948f1..8d75d8ef1 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java @@ -60,10 +60,6 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { public PageResult> getRetryDeadLetterPage(RetryDeadLetterQueryVO queryVO) { PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); - if (StrUtil.isBlank(queryVO.getGroupName())) { - return new PageResult<>(pageDTO, new ArrayList<>()); - } - List groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName()); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); @@ -97,7 +93,6 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - String groupName = rollBackRetryDeadLetterVO.getGroupName(); List ids = rollBackRetryDeadLetterVO.getIds(); TaskAccess retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess(); List retryDeadLetterList = retryDeadLetterAccess.list( @@ -143,22 +138,12 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { Set waitDelRetryDeadLetterIdSet = StreamUtils.toSet(retryDeadLetterList, RetryDeadLetter::getId); Assert.isTrue(waitDelRetryDeadLetterIdSet.size() == retryDeadLetterAccess.delete( new LambdaQueryWrapper() - .eq(RetryDeadLetter::getGroupName, groupName) .in(RetryDeadLetter::getId, waitDelRetryDeadLetterIdSet)), () -> new SnailJobServerException("删除死信队列数据失败")); // 变更日志的状态 RetryTask retryTask = new RetryTask(); retryTask.setTaskStatus(RetryStatusEnum.RUNNING.getStatus()); - -// Set uniqueIdSet = StreamUtils.toSet(waitRollbackList, Retry::getId); -// int update = retryTaskMapper.update(retryTask, new LambdaUpdateWrapper() -// .eq(RetryTask::getNamespaceId, namespaceId) -// .in(RetryTask::getUniqueId, uniqueIdSet) -// .eq(RetryTask::getGroupName, groupName)); -// Assert.isTrue(update == uniqueIdSet.size(), -// () -> new SnailJobServerException("回滚日志状态失败, 可能原因: 日志信息缺失或存在多个相同uniqueId")); - return 1; } @@ -170,36 +155,9 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { Assert.isTrue(deadLetterVO.getIds().size() == retryDeadLetterAccess.delete( new LambdaQueryWrapper() .eq(RetryDeadLetter::getNamespaceId, namespaceId) - .eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName()) .in(RetryDeadLetter::getId, deadLetterVO.getIds())), () -> new SnailJobServerException("删除死信任务失败")); - List tasks = retryDeadLetterAccess.list( - new LambdaQueryWrapper() - .select(RetryDeadLetter::getId) - .eq(RetryDeadLetter::getNamespaceId, namespaceId) - .eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName()) - .in(RetryDeadLetter::getId, deadLetterVO.getIds()) - ); - - if (CollUtil.isEmpty(tasks)) { - return Boolean.TRUE; - } - - -// retryTaskMapper.delete(new LambdaQueryWrapper() -// .in(RetryTask::getTaskStatus, ALLOW_DELETE_STATUS) -// .eq(RetryTask::getNamespaceId, namespaceId) -// .eq(RetryTask::getGroupName, deadLetterVO.getGroupName()) -// .in(RetryTask::getUniqueId, uniqueIds)); - - retryTaskLogMessageMapper.delete( - new LambdaQueryWrapper() - .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) - .eq(RetryTaskLogMessage::getGroupName, deadLetterVO.getGroupName()) - .in(RetryTaskLogMessage::getRetryId, deadLetterVO.getIds())); - return Boolean.TRUE; - } }