feat(1.4.0-beta1): 修复回调失败问题
This commit is contained in:
parent
8b05c6f43a
commit
d5599fdf16
@ -88,7 +88,6 @@ CREATE TABLE `sj_retry_dead_letter`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
|
||||
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
|
||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
|
||||
`idempotent_id` varchar(64) NOT NULL COMMENT '幂等id',
|
||||
@ -96,14 +95,12 @@ CREATE TABLE `sj_retry_dead_letter`
|
||||
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
|
||||
`args_str` text NOT NULL COMMENT '执行方法参数',
|
||||
`ext_attrs` text NOT NULL COMMENT '扩展字段',
|
||||
`task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`),
|
||||
KEY `idx_idempotent_id` (`idempotent_id`),
|
||||
KEY `idx_biz_no` (`biz_no`),
|
||||
KEY `idx_create_dt` (`create_dt`),
|
||||
UNIQUE KEY `uk_namespace_id_group_name_unique_id` (`namespace_id`, `group_name`, `unique_id`)
|
||||
KEY `idx_create_dt` (`create_dt`)
|
||||
) ENGINE = InnoDB
|
||||
AUTO_INCREMENT = 0
|
||||
DEFAULT CHARSET = utf8mb4 COMMENT ='死信队列表'
|
||||
@ -126,7 +123,7 @@ CREATE TABLE `sj_retry`
|
||||
`task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据',
|
||||
`bucket_index` int(11) NOT NULL DEFAULT 0 COMMENT 'bucket',
|
||||
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父节点id',
|
||||
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
|
||||
`deleted` bigint(20) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
|
@ -3,12 +3,16 @@ package com.aizuda.snailjob.client.core.callback.future;
|
||||
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
|
||||
import com.aizuda.snailjob.client.core.context.CallbackContext;
|
||||
import com.aizuda.snailjob.client.core.client.RetryClient;
|
||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
||||
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
|
||||
import com.aizuda.snailjob.client.model.request.RetryCallbackResultRequest;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CancellationException;
|
||||
|
||||
/**
|
||||
@ -40,11 +44,9 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback<Boolea
|
||||
@Override
|
||||
public void onSuccess(Boolean result) {
|
||||
try {
|
||||
RetryCallbackResultRequest request = new RetryCallbackResultRequest();
|
||||
request.setRetryTaskId(context.getRetryTaskId());
|
||||
request.setRetryId(context.getRetryId());
|
||||
request.setResult(result);
|
||||
CLIENT.callbaclResult(request);
|
||||
DispatchRetryResultRequest request = buildDispatchRetryResultRequest();
|
||||
request.setStatusCode(RetryResultStatusEnum.SUCCESS.getStatus());
|
||||
CLIENT.dispatchResult(request);
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.REMOTE.error("回调执行结果上报异常.[{}]", context.getRetryTaskId(), e);
|
||||
|
||||
@ -59,13 +61,23 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback<Boolea
|
||||
return;
|
||||
}
|
||||
try {
|
||||
RetryCallbackResultRequest request = new RetryCallbackResultRequest();
|
||||
request.setRetryTaskId(context.getRetryTaskId());
|
||||
request.setRetryId(context.getRetryId());
|
||||
request.setResult(Boolean.FALSE);
|
||||
CLIENT.callbaclResult(request);
|
||||
DispatchRetryResultRequest request = buildDispatchRetryResultRequest();
|
||||
request.setStatusCode(RetryResultStatusEnum.FAILURE.getStatus());
|
||||
request.setExceptionMsg(t.getMessage());
|
||||
CLIENT.dispatchResult(request);
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.REMOTE.error("回调执行结果上报异常.[{}]", context.getRetryTaskId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private DispatchRetryResultRequest buildDispatchRetryResultRequest() {
|
||||
DispatchRetryResultRequest request = new DispatchRetryResultRequest();
|
||||
request.setRetryTaskId(context.getRetryTaskId());
|
||||
request.setNamespaceId(context.getNamespaceId());
|
||||
request.setGroupName(context.getGroupName());
|
||||
request.setSceneName(context.getSceneName());
|
||||
request.setRetryId(context.getRetryId());
|
||||
request.setRetryTaskId(context.getRetryTaskId());
|
||||
return request;
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.core.context.RemoteRetryContext;
|
||||
import com.aizuda.snailjob.client.core.client.RetryClient;
|
||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
||||
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
@ -61,6 +62,7 @@ public class RetryTaskExecutorFutureCallback implements FutureCallback<DispatchR
|
||||
try {
|
||||
DispatchRetryResultRequest request = buildDispatchRetryResultRequest(null);
|
||||
request.setExceptionMsg(t.getMessage());
|
||||
request.setStatusCode(RetryResultStatusEnum.FAILURE.getStatus());
|
||||
CLIENT.dispatchResult(request);
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", retryContext.getRetryTaskId(), e);
|
||||
|
@ -21,7 +21,4 @@ public interface RetryClient {
|
||||
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_RETRY_DISPATCH_RESULT)
|
||||
Result dispatchResult(DispatchRetryResultRequest request);
|
||||
|
||||
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_RETRY_CALLBACK_RESULT)
|
||||
Result callbaclResult(RetryCallbackResultRequest request);
|
||||
|
||||
}
|
||||
|
@ -140,6 +140,10 @@ public class SnailRetryEndPoint implements Lifecycle {
|
||||
|
||||
callbackContext.setRetryTaskId(callbackDTO.getRetryTaskId());
|
||||
callbackContext.setRetryId(callbackDTO.getRetryId());
|
||||
callbackContext.setGroupName(callbackDTO.getGroupName());
|
||||
callbackContext.setNamespaceId(callbackDTO.getNamespaceId());
|
||||
callbackContext.setSceneName(callbackDTO.getSceneName());
|
||||
callbackContext.setRetryStatus(callbackDTO.getRetryStatus());
|
||||
|
||||
ListeningExecutorService decorator = MoreExecutors.listeningDecorator(dispatcherThreadPool);
|
||||
ListenableFuture<Boolean> submit = decorator.submit(() -> {
|
||||
|
@ -16,7 +16,7 @@ public final class CallbackContext {
|
||||
|
||||
private String namespaceId;
|
||||
private String groupName;
|
||||
private String scene;
|
||||
private String sceneName;
|
||||
private Long retryTaskId;
|
||||
private Long retryId;
|
||||
private Integer retryStatus;
|
||||
|
@ -20,8 +20,6 @@ public class RetryCallbackRequest {
|
||||
private String sceneName;
|
||||
@NotBlank(message = "参数 不能为空")
|
||||
private String argsStr;
|
||||
@NotBlank(message = "idempotentId 不能为空")
|
||||
private String idempotentId;
|
||||
@NotBlank(message = "executorName 不能为空")
|
||||
private String executorName;
|
||||
@NotNull(message = "retryStatus 不能为空")
|
||||
|
@ -15,4 +15,6 @@ public class RetryCallbackResultRequest {
|
||||
private Long retryId;
|
||||
private Long retryTaskId;
|
||||
private Boolean result;
|
||||
private Integer statusCode;
|
||||
private String message;
|
||||
}
|
||||
|
@ -80,11 +80,6 @@ public interface SystemConstants {
|
||||
*/
|
||||
String REPORT_RETRY_DISPATCH_RESULT = "/report/retry/dispatch/result";
|
||||
|
||||
/**
|
||||
* 上报retry callback的运行结果
|
||||
*/
|
||||
String REPORT_RETRY_CALLBACK_RESULT = "/report/retry/callback/result";
|
||||
|
||||
/**
|
||||
* 批量日志上报
|
||||
*/
|
||||
|
@ -50,4 +50,6 @@ public class Retry extends CreateUpdateDt {
|
||||
|
||||
private Integer bucketIndex;
|
||||
|
||||
private Long deleted;
|
||||
|
||||
}
|
||||
|
@ -21,12 +21,10 @@ public class RequestCallbackExecutorDTO extends BaseDTO {
|
||||
|
||||
private Integer executorTimeout;
|
||||
|
||||
private Long deadlineRequest;
|
||||
|
||||
private String argsStr;
|
||||
|
||||
private String executorName;
|
||||
|
||||
private Integer retryCount;
|
||||
private Long parentId;
|
||||
|
||||
}
|
||||
|
@ -21,8 +21,11 @@ import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
|
||||
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
|
||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.github.rholder.retry.Attempt;
|
||||
import com.github.rholder.retry.RetryException;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -47,6 +50,7 @@ import java.util.Objects;
|
||||
@RequiredArgsConstructor
|
||||
public class RequestCallbackClientActor extends AbstractActor {
|
||||
private final RetryTaskMapper retryTaskMapper;
|
||||
private final RetryMapper retryMapper;
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
@ -77,7 +81,17 @@ public class RequestCallbackClientActor extends AbstractActor {
|
||||
}
|
||||
|
||||
RetryCallbackRequest retryCallbackRequest = RetryTaskConverter.INSTANCE.toRetryCallbackDTO(executorDTO);
|
||||
|
||||
Retry retry = retryMapper.selectOne(new LambdaQueryWrapper<Retry>()
|
||||
.select(Retry::getRetryStatus, Retry::getId)
|
||||
.eq(Retry::getId, executorDTO.getParentId()));
|
||||
if (Objects.isNull(retry)) {
|
||||
JobLogMetaDTO jobLogMetaDTO = RetryTaskConverter.INSTANCE.toJobLogDTO(executorDTO);
|
||||
jobLogMetaDTO.setTimestamp(nowMilli);
|
||||
SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. 失败原因: 重试任务不存在 <|>{}<|>", executorDTO.getRetryTaskId(),
|
||||
jobLogMetaDTO);
|
||||
return;
|
||||
}
|
||||
retryCallbackRequest.setRetryStatus(retry.getRetryStatus());
|
||||
try {
|
||||
|
||||
// 构建请求客户端对象
|
||||
@ -130,7 +144,7 @@ public class RequestCallbackClientActor extends AbstractActor {
|
||||
// 更新最新负载节点
|
||||
String hostId = (String) properties.get("HOST_ID");
|
||||
String hostIp = (String) properties.get("HOST_IP");
|
||||
String hostPort = (String) properties.get("HOST_PORT");
|
||||
Integer hostPort = (Integer) properties.get("HOST_PORT");
|
||||
|
||||
RetryTask retryTask = new RetryTask();
|
||||
retryTask.setId(executorDTO.getRetryTaskId());
|
||||
|
@ -59,15 +59,13 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
|
||||
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(
|
||||
context.getGroupName(), context.getSceneName(), context.getNamespaceId());
|
||||
|
||||
Retry retry = retryMapper.selectOne(new LambdaQueryWrapper<Retry>()
|
||||
.select(Retry::getId, Retry::getRetryCount)
|
||||
.eq(Retry::getId, context.getRetryId()));
|
||||
Retry retry = retryMapper.selectById(context.getRetryId());
|
||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
@Override
|
||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
|
||||
Integer maxRetryCount;
|
||||
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(context.getTaskType())) {
|
||||
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
|
||||
maxRetryCount = retrySceneConfig.getCbMaxCount();
|
||||
} else {
|
||||
maxRetryCount = retrySceneConfig.getMaxRetryCount();
|
||||
@ -75,7 +73,9 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
|
||||
|
||||
if (maxRetryCount <= retry.getRetryCount() + 1) {
|
||||
retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
|
||||
retry.setRetryCount(retry.getRetryCount() + 1);
|
||||
retry.setUpdateDt(LocalDateTime.now());
|
||||
retry.setDeleted(retry.getId());
|
||||
Assert.isTrue(1 == retryMapper.updateById(retry),
|
||||
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
|
||||
// 创建一个回调任务
|
||||
@ -83,12 +83,12 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
|
||||
} else if (context.isIncrementRetryCount()) {
|
||||
retry.setRetryCount(retry.getRetryCount() + 1);
|
||||
retry.setUpdateDt(LocalDateTime.now());
|
||||
retry.setDeleted(retry.getId());
|
||||
Assert.isTrue(1 == retryMapper.updateById(retry),
|
||||
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
|
||||
|
||||
}
|
||||
|
||||
|
||||
RetryTask retryTask = new RetryTask();
|
||||
retryTask.setId(context.getRetryTaskId());
|
||||
retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
|
||||
|
@ -59,6 +59,8 @@ public class RetryStopHandler extends AbstractRetryResultHandler {
|
||||
|
||||
retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
|
||||
retry.setUpdateDt(LocalDateTime.now());
|
||||
retry.setDeleted(retry.getId());
|
||||
retry.setRetryCount(retry.getRetryCount() + 1);
|
||||
Assert.isTrue(1 == retryMapper.updateById(retry),
|
||||
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
|
||||
retry.getGroupName()));
|
||||
|
@ -56,6 +56,8 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
|
||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
|
||||
retry.setUpdateDt(LocalDateTime.now());
|
||||
retry.setRetryCount(retry.getRetryCount() + 1);
|
||||
retry.setDeleted(retry.getId());
|
||||
Assert.isTrue(1 == retryMapper.updateById(retry),
|
||||
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
|
||||
retry.getGroupName()));
|
||||
|
@ -43,6 +43,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -50,6 +51,7 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ -103,10 +105,12 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
||||
pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper);
|
||||
|
||||
Set<Long> ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId);
|
||||
List<Retry> callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>()
|
||||
.in(Retry::getParentId, ids));
|
||||
|
||||
Map<Long, Retry> callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
|
||||
Map<Long, Retry> callbackMap = Maps.newHashMap();
|
||||
if (CollUtil.isNotEmpty(ids)) {
|
||||
List<Retry> callbackTaskList = accessTemplate.getRetryAccess()
|
||||
.list(new LambdaQueryWrapper<Retry>().in(Retry::getParentId, ids));
|
||||
callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
|
||||
}
|
||||
|
||||
List<RetryResponseVO> retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords());
|
||||
for (RetryResponseVO retryResponseVO : retryResponseList) {
|
||||
|
Loading…
Reference in New Issue
Block a user