diff --git a/doc/sql/snail_job_mysql.sql b/doc/sql/snail_job_mysql.sql index 9e6f34bd5..d8efb20a4 100644 --- a/doc/sql/snail_job_mysql.sql +++ b/doc/sql/snail_job_mysql.sql @@ -88,7 +88,6 @@ CREATE TABLE `sj_retry_dead_letter` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', - `unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一', `group_name` varchar(64) NOT NULL COMMENT '组名称', `scene_name` varchar(64) NOT NULL COMMENT '场景名称', `idempotent_id` varchar(64) NOT NULL COMMENT '幂等id', @@ -96,14 +95,12 @@ CREATE TABLE `sj_retry_dead_letter` `executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称', `args_str` text NOT NULL COMMENT '执行方法参数', `ext_attrs` text NOT NULL COMMENT '扩展字段', - `task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`), KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`), KEY `idx_idempotent_id` (`idempotent_id`), KEY `idx_biz_no` (`biz_no`), - KEY `idx_create_dt` (`create_dt`), - UNIQUE KEY `uk_namespace_id_group_name_unique_id` (`namespace_id`, `group_name`, `unique_id`) + KEY `idx_create_dt` (`create_dt`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='死信队列表' @@ -126,7 +123,7 @@ CREATE TABLE `sj_retry` `task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据', `bucket_index` int(11) NOT NULL DEFAULT 0 COMMENT 'bucket', `parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父节点id', - `deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除', + `deleted` bigint(20) NOT NULL DEFAULT 0 COMMENT '逻辑删除', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java index df1115d39..b6b53d265 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/callback/future/CallbackTaskExecutorFutureCallback.java @@ -3,12 +3,16 @@ package com.aizuda.snailjob.client.core.callback.future; import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.client.core.context.CallbackContext; import com.aizuda.snailjob.client.core.client.RetryClient; +import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; +import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest; import com.aizuda.snailjob.client.model.request.RetryCallbackResultRequest; +import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; import com.aizuda.snailjob.common.log.SnailJobLog; import com.google.common.util.concurrent.FutureCallback; +import java.util.Objects; import java.util.concurrent.CancellationException; /** @@ -40,11 +44,9 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback submit = decorator.submit(() -> { diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/context/CallbackContext.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/context/CallbackContext.java index 8e85c277c..1b722bcd5 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/context/CallbackContext.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/context/CallbackContext.java @@ -16,7 +16,7 @@ public final class CallbackContext { private String namespaceId; private String groupName; - private String scene; + private String sceneName; private Long retryTaskId; private Long retryId; private Integer retryStatus; diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackRequest.java index 8fd1b4877..16d3f29ce 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackRequest.java @@ -20,8 +20,6 @@ public class RetryCallbackRequest { private String sceneName; @NotBlank(message = "参数 不能为空") private String argsStr; - @NotBlank(message = "idempotentId 不能为空") - private String idempotentId; @NotBlank(message = "executorName 不能为空") private String executorName; @NotNull(message = "retryStatus 不能为空") diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackResultRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackResultRequest.java index ec23b6518..25745f809 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackResultRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/RetryCallbackResultRequest.java @@ -15,4 +15,6 @@ public class RetryCallbackResultRequest { private Long retryId; private Long retryTaskId; private Boolean result; + private Integer statusCode; + private String message; } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index 40d2d353f..7dc9a87b2 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -80,11 +80,6 @@ public interface SystemConstants { */ String REPORT_RETRY_DISPATCH_RESULT = "/report/retry/dispatch/result"; - /** - * 上报retry callback的运行结果 - */ - String REPORT_RETRY_CALLBACK_RESULT = "/report/retry/callback/result"; - /** * 批量日志上报 */ diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java index a5481c234..87fe6d266 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/Retry.java @@ -50,4 +50,6 @@ public class Retry extends CreateUpdateDt { private Integer bucketIndex; + private Long deleted; + } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java index 41a716d4a..ee3039b2c 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RequestCallbackExecutorDTO.java @@ -21,12 +21,10 @@ public class RequestCallbackExecutorDTO extends BaseDTO { private Integer executorTimeout; - private Long deadlineRequest; - private String argsStr; private String executorName; - private Integer retryCount; + private Long parentId; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java index 134295584..346cd421b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/RequestCallbackClientActor.java @@ -21,8 +21,11 @@ import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO; import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryException; import com.google.common.collect.Maps; @@ -47,6 +50,7 @@ import java.util.Objects; @RequiredArgsConstructor public class RequestCallbackClientActor extends AbstractActor { private final RetryTaskMapper retryTaskMapper; + private final RetryMapper retryMapper; @Override public Receive createReceive() { @@ -77,7 +81,17 @@ public class RequestCallbackClientActor extends AbstractActor { } RetryCallbackRequest retryCallbackRequest = RetryTaskConverter.INSTANCE.toRetryCallbackDTO(executorDTO); - + Retry retry = retryMapper.selectOne(new LambdaQueryWrapper() + .select(Retry::getRetryStatus, Retry::getId) + .eq(Retry::getId, executorDTO.getParentId())); + if (Objects.isNull(retry)) { + JobLogMetaDTO jobLogMetaDTO = RetryTaskConverter.INSTANCE.toJobLogDTO(executorDTO); + jobLogMetaDTO.setTimestamp(nowMilli); + SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. 失败原因: 重试任务不存在 <|>{}<|>", executorDTO.getRetryTaskId(), + jobLogMetaDTO); + return; + } + retryCallbackRequest.setRetryStatus(retry.getRetryStatus()); try { // 构建请求客户端对象 @@ -130,7 +144,7 @@ public class RequestCallbackClientActor extends AbstractActor { // 更新最新负载节点 String hostId = (String) properties.get("HOST_ID"); String hostIp = (String) properties.get("HOST_IP"); - String hostPort = (String) properties.get("HOST_PORT"); + Integer hostPort = (Integer) properties.get("HOST_PORT"); RetryTask retryTask = new RetryTask(); retryTask.setId(executorDTO.getRetryTaskId()); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java index a18cb8e3f..1e680c2fb 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java @@ -59,15 +59,13 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName( context.getGroupName(), context.getSceneName(), context.getNamespaceId()); - Retry retry = retryMapper.selectOne(new LambdaQueryWrapper() - .select(Retry::getId, Retry::getRetryCount) - .eq(Retry::getId, context.getRetryId())); + Retry retry = retryMapper.selectById(context.getRetryId()); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { Integer maxRetryCount; - if (SyetemTaskTypeEnum.CALLBACK.getType().equals(context.getTaskType())) { + if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) { maxRetryCount = retrySceneConfig.getCbMaxCount(); } else { maxRetryCount = retrySceneConfig.getMaxRetryCount(); @@ -75,7 +73,9 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { if (maxRetryCount <= retry.getRetryCount() + 1) { retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus()); + retry.setRetryCount(retry.getRetryCount() + 1); retry.setUpdateDt(LocalDateTime.now()); + retry.setDeleted(retry.getId()); Assert.isTrue(1 == retryMapper.updateById(retry), () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); // 创建一个回调任务 @@ -83,12 +83,12 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { } else if (context.isIncrementRetryCount()) { retry.setRetryCount(retry.getRetryCount() + 1); retry.setUpdateDt(LocalDateTime.now()); + retry.setDeleted(retry.getId()); Assert.isTrue(1 == retryMapper.updateById(retry), () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); } - RetryTask retryTask = new RetryTask(); retryTask.setId(context.getRetryTaskId()); retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus()); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java index 1629ba487..f306acc68 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryStopHandler.java @@ -59,6 +59,8 @@ public class RetryStopHandler extends AbstractRetryResultHandler { retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus()); retry.setUpdateDt(LocalDateTime.now()); + retry.setDeleted(retry.getId()); + retry.setRetryCount(retry.getRetryCount() + 1); Assert.isTrue(1 == retryMapper.updateById(retry), () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java index f8d365418..3eb6a9c98 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetrySuccessHandler.java @@ -56,6 +56,8 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler { protected void doInTransactionWithoutResult(TransactionStatus status) { retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus()); retry.setUpdateDt(LocalDateTime.now()); + retry.setRetryCount(retry.getRetryCount() + 1); + retry.setDeleted(retry.getId()); Assert.isTrue(1 == retryMapper.updateById(retry), () -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName())); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index aac86ddbe..b6d21a2f5 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -43,6 +43,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -50,6 +51,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -103,10 +105,12 @@ public class RetryTaskServiceImpl implements RetryTaskService { pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper); Set ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId); - List callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper() - .in(Retry::getParentId, ids)); - - Map callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId); + Map callbackMap = Maps.newHashMap(); + if (CollUtil.isNotEmpty(ids)) { + List callbackTaskList = accessTemplate.getRetryAccess() + .list(new LambdaQueryWrapper().in(Retry::getParentId, ids)); + callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId); + } List retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords()); for (RetryResponseVO retryResponseVO : retryResponseList) {