feat(1.4.0-beta1): 修复回调失败问题

This commit is contained in:
opensnail 2025-02-19 23:32:51 +08:00
parent b28bc2afde
commit 4b45378d07
16 changed files with 69 additions and 40 deletions

View File

@ -88,7 +88,6 @@ CREATE TABLE `sj_retry_dead_letter`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
@ -96,14 +95,12 @@ CREATE TABLE `sj_retry_dead_letter`
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
`args_str` text NOT NULL COMMENT '执行方法参数',
`ext_attrs` text NOT NULL COMMENT '扩展字段',
`task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`),
KEY `idx_idempotent_id` (`idempotent_id`),
KEY `idx_biz_no` (`biz_no`),
KEY `idx_create_dt` (`create_dt`),
UNIQUE KEY `uk_namespace_id_group_name_unique_id` (`namespace_id`, `group_name`, `unique_id`)
KEY `idx_create_dt` (`create_dt`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='死信队列表'
@ -126,7 +123,7 @@ CREATE TABLE `sj_retry`
`task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据',
`bucket_index` int(11) NOT NULL DEFAULT 0 COMMENT 'bucket',
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父节点id',
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
`deleted` bigint(20) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),

View File

@ -3,12 +3,16 @@ package com.aizuda.snailjob.client.core.callback.future;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.core.context.CallbackContext;
import com.aizuda.snailjob.client.core.client.RetryClient;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.client.model.request.RetryCallbackResultRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.google.common.util.concurrent.FutureCallback;
import java.util.Objects;
import java.util.concurrent.CancellationException;
/**
@ -40,11 +44,9 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback<Boolea
@Override
public void onSuccess(Boolean result) {
try {
RetryCallbackResultRequest request = new RetryCallbackResultRequest();
request.setRetryTaskId(context.getRetryTaskId());
request.setRetryId(context.getRetryId());
request.setResult(result);
CLIENT.callbaclResult(request);
DispatchRetryResultRequest request = buildDispatchRetryResultRequest();
request.setStatusCode(RetryResultStatusEnum.SUCCESS.getStatus());
CLIENT.dispatchResult(request);
} catch (Exception e) {
SnailJobLog.REMOTE.error("回调执行结果上报异常.[{}]", context.getRetryTaskId(), e);
@ -59,13 +61,23 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback<Boolea
return;
}
try {
RetryCallbackResultRequest request = new RetryCallbackResultRequest();
request.setRetryTaskId(context.getRetryTaskId());
request.setRetryId(context.getRetryId());
request.setResult(Boolean.FALSE);
CLIENT.callbaclResult(request);
DispatchRetryResultRequest request = buildDispatchRetryResultRequest();
request.setStatusCode(RetryResultStatusEnum.FAILURE.getStatus());
request.setExceptionMsg(t.getMessage());
CLIENT.dispatchResult(request);
} catch (Exception e) {
SnailJobLog.REMOTE.error("回调执行结果上报异常.[{}]", context.getRetryTaskId(), e);
}
}
private DispatchRetryResultRequest buildDispatchRetryResultRequest() {
DispatchRetryResultRequest request = new DispatchRetryResultRequest();
request.setRetryTaskId(context.getRetryTaskId());
request.setNamespaceId(context.getNamespaceId());
request.setGroupName(context.getGroupName());
request.setSceneName(context.getSceneName());
request.setRetryId(context.getRetryId());
request.setRetryTaskId(context.getRetryTaskId());
return request;
}
}

View File

@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.core.context.RemoteRetryContext;
import com.aizuda.snailjob.client.core.client.RetryClient;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -61,6 +62,7 @@ public class RetryTaskExecutorFutureCallback implements FutureCallback<DispatchR
try {
DispatchRetryResultRequest request = buildDispatchRetryResultRequest(null);
request.setExceptionMsg(t.getMessage());
request.setStatusCode(RetryResultStatusEnum.FAILURE.getStatus());
CLIENT.dispatchResult(request);
} catch (Exception e) {
SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", retryContext.getRetryTaskId(), e);

View File

@ -21,7 +21,4 @@ public interface RetryClient {
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_RETRY_DISPATCH_RESULT)
Result dispatchResult(DispatchRetryResultRequest request);
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_RETRY_CALLBACK_RESULT)
Result callbaclResult(RetryCallbackResultRequest request);
}

View File

@ -140,6 +140,10 @@ public class SnailRetryEndPoint implements Lifecycle {
callbackContext.setRetryTaskId(callbackDTO.getRetryTaskId());
callbackContext.setRetryId(callbackDTO.getRetryId());
callbackContext.setGroupName(callbackDTO.getGroupName());
callbackContext.setNamespaceId(callbackDTO.getNamespaceId());
callbackContext.setSceneName(callbackDTO.getSceneName());
callbackContext.setRetryStatus(callbackDTO.getRetryStatus());
ListeningExecutorService decorator = MoreExecutors.listeningDecorator(dispatcherThreadPool);
ListenableFuture<Boolean> submit = decorator.submit(() -> {

View File

@ -16,7 +16,7 @@ public final class CallbackContext {
private String namespaceId;
private String groupName;
private String scene;
private String sceneName;
private Long retryTaskId;
private Long retryId;
private Integer retryStatus;

View File

@ -20,8 +20,6 @@ public class RetryCallbackRequest {
private String sceneName;
@NotBlank(message = "参数 不能为空")
private String argsStr;
@NotBlank(message = "idempotentId 不能为空")
private String idempotentId;
@NotBlank(message = "executorName 不能为空")
private String executorName;
@NotNull(message = "retryStatus 不能为空")

View File

@ -15,4 +15,6 @@ public class RetryCallbackResultRequest {
private Long retryId;
private Long retryTaskId;
private Boolean result;
private Integer statusCode;
private String message;
}

View File

@ -80,11 +80,6 @@ public interface SystemConstants {
*/
String REPORT_RETRY_DISPATCH_RESULT = "/report/retry/dispatch/result";
/**
* 上报retry callback的运行结果
*/
String REPORT_RETRY_CALLBACK_RESULT = "/report/retry/callback/result";
/**
* 批量日志上报
*/

View File

@ -50,4 +50,6 @@ public class Retry extends CreateUpdateDt {
private Integer bucketIndex;
private Long deleted;
}

View File

@ -21,12 +21,10 @@ public class RequestCallbackExecutorDTO extends BaseDTO {
private Integer executorTimeout;
private Long deadlineRequest;
private String argsStr;
private String executorName;
private Integer retryCount;
private Long parentId;
}

View File

@ -21,8 +21,11 @@ import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.google.common.collect.Maps;
@ -47,6 +50,7 @@ import java.util.Objects;
@RequiredArgsConstructor
public class RequestCallbackClientActor extends AbstractActor {
private final RetryTaskMapper retryTaskMapper;
private final RetryMapper retryMapper;
@Override
public Receive createReceive() {
@ -77,7 +81,17 @@ public class RequestCallbackClientActor extends AbstractActor {
}
RetryCallbackRequest retryCallbackRequest = RetryTaskConverter.INSTANCE.toRetryCallbackDTO(executorDTO);
Retry retry = retryMapper.selectOne(new LambdaQueryWrapper<Retry>()
.select(Retry::getRetryStatus, Retry::getId)
.eq(Retry::getId, executorDTO.getParentId()));
if (Objects.isNull(retry)) {
JobLogMetaDTO jobLogMetaDTO = RetryTaskConverter.INSTANCE.toJobLogDTO(executorDTO);
jobLogMetaDTO.setTimestamp(nowMilli);
SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. 失败原因: 重试任务不存在 <|>{}<|>", executorDTO.getRetryTaskId(),
jobLogMetaDTO);
return;
}
retryCallbackRequest.setRetryStatus(retry.getRetryStatus());
try {
// 构建请求客户端对象
@ -130,7 +144,7 @@ public class RequestCallbackClientActor extends AbstractActor {
// 更新最新负载节点
String hostId = (String) properties.get("HOST_ID");
String hostIp = (String) properties.get("HOST_IP");
String hostPort = (String) properties.get("HOST_PORT");
Integer hostPort = (Integer) properties.get("HOST_PORT");
RetryTask retryTask = new RetryTask();
retryTask.setId(executorDTO.getRetryTaskId());

View File

@ -59,15 +59,13 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(
context.getGroupName(), context.getSceneName(), context.getNamespaceId());
Retry retry = retryMapper.selectOne(new LambdaQueryWrapper<Retry>()
.select(Retry::getId, Retry::getRetryCount)
.eq(Retry::getId, context.getRetryId()));
Retry retry = retryMapper.selectById(context.getRetryId());
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
Integer maxRetryCount;
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(context.getTaskType())) {
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
maxRetryCount = retrySceneConfig.getCbMaxCount();
} else {
maxRetryCount = retrySceneConfig.getMaxRetryCount();
@ -75,7 +73,9 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
if (maxRetryCount <= retry.getRetryCount() + 1) {
retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
// 创建一个回调任务
@ -83,12 +83,12 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
} else if (context.isIncrementRetryCount()) {
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
}
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());

View File

@ -59,6 +59,8 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
retry.setRetryCount(retry.getRetryCount() + 1);
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
retry.getGroupName()));

View File

@ -56,6 +56,8 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
protected void doInTransactionWithoutResult(TransactionStatus status) {
retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
retry.setUpdateDt(LocalDateTime.now());
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
retry.getGroupName()));

View File

@ -43,6 +43,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -50,6 +51,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -103,10 +105,12 @@ public class RetryTaskServiceImpl implements RetryTaskService {
pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper);
Set<Long> ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId);
List<Retry> callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>()
.in(Retry::getParentId, ids));
Map<Long, Retry> callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
Map<Long, Retry> callbackMap = Maps.newHashMap();
if (CollUtil.isNotEmpty(ids)) {
List<Retry> callbackTaskList = accessTemplate.getRetryAccess()
.list(new LambdaQueryWrapper<Retry>().in(Retry::getParentId, ids));
callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
}
List<RetryResponseVO> retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords());
for (RetryResponseVO retryResponseVO : retryResponseList) {