feat(1.4.0-beta1): 添加超时机制

This commit is contained in:
opensnail 2025-02-18 21:29:10 +08:00
parent d52690d8bf
commit b01ba347cf
23 changed files with 274 additions and 61 deletions

View File

@ -120,24 +120,23 @@ CREATE TABLE `sj_retry`
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
`args_str` text NOT NULL COMMENT '执行方法参数',
`ext_attrs` text NOT NULL COMMENT '扩展字段',
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
`retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数',
`retry_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '重试状态 0、重试中 1、成功 2、最大重试次数',
`task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据',
`bucket_index` int(11) NOT NULL DEFAULT 0 COMMENT 'bucket',
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父节点id',
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`),
KEY `idx_namespace_id_group_name_task_type` (`namespace_id`, `group_name`, `task_type`),
KEY `idx_namespace_id_group_name_retry_status` (`namespace_id`, `group_name`, `retry_status`),
KEY `idx_idempotent_id` (`idempotent_id`),
KEY `idx_biz_no` (`biz_no`),
KEY `idx_parent_id` (`parent_id`),
KEY `idx_create_dt` (`create_dt`),
UNIQUE KEY `uk_name_unique_id_deleted` (`namespace_id`, `group_name`, `idempotent_id`, `deleted`)
UNIQUE KEY `uk_name_task_type_idempotent_id_deleted` (`namespace_id`, `group_name`, `task_type`, `idempotent_id`, `deleted`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表'

View File

@ -1,7 +1,5 @@
package com.aizuda.snailjob.client.core.cache;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Optional;
@ -20,10 +18,10 @@ public class FutureCache {
futureCache.put(retryTaskId, future);
}
public static void remove(Long taskBatchId) {
Optional.ofNullable(futureCache.get(taskBatchId)).ifPresent(future -> {
public static void remove(Long retryTaskId) {
Optional.ofNullable(futureCache.get(retryTaskId)).ifPresent(future -> {
future.cancel(true);
futureCache.remove(taskBatchId);
futureCache.remove(retryTaskId);
});
}

View File

@ -20,9 +20,11 @@ import com.aizuda.snailjob.client.core.executor.RemoteRetryExecutor;
import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader;
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
import com.aizuda.snailjob.client.core.serializer.JacksonSerializer;
import com.aizuda.snailjob.client.core.timer.StopTaskTimerTask;
import com.aizuda.snailjob.client.core.timer.TimerManager;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.snailjob.client.model.RetryCallbackDTO;
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
@ -106,17 +108,20 @@ public class SnailRetryEndPoint implements Lifecycle {
FutureCache.addFuture(request.getRetryTaskId(), submit);
Futures.addCallback(submit, new RetryTaskExecutorFutureCallback(retryContext), decorator);
// 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(request.getRetryTaskId()), request.getExecutorTimeout(), TimeUnit.SECONDS);
return new Result<>(Boolean.TRUE);
}
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
public Result<Boolean> callback(@Valid RetryCallbackDTO callbackDTO) {
public Result<Boolean> callback(@Valid RetryCallbackRequest callbackDTO) {
CallbackContext callbackContext = new CallbackContext();
try {
RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName());
RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getSceneName(), callbackDTO.getExecutorName());
if (Objects.isNull(retryerInfo)) {
SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getScene());
SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getSceneName());
return new Result<>(0, "回调失败", Boolean.FALSE);
}
@ -143,6 +148,9 @@ public class SnailRetryEndPoint implements Lifecycle {
FutureCache.addFuture(callbackDTO.getRetryTaskId(), submit);
Futures.addCallback(submit, new CallbackTaskExecutorFutureCallback(callbackContext), decorator);
// 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(callbackDTO.getRetryTaskId()), callbackDTO.getExecutorTimeout(), TimeUnit.SECONDS);
return new Result<>(Boolean.TRUE);
}

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.client.core.timer;
import com.aizuda.snailjob.client.core.cache.FutureCache;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
/**
* @author opensnail
* @date 2025-02-18
* @since 1.4.0
*/
public class StopTaskTimerTask implements TimerTask {
private Long retryTaskId;
public StopTaskTimerTask(Long retryTaskId) {
this.retryTaskId = retryTaskId;
}
@Override
public void run(Timeout timeout) throws Exception {
FutureCache.remove(retryTaskId);
}
}

View File

@ -0,0 +1,31 @@
package com.aizuda.snailjob.client.core.timer;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* @author opensnail
* @date 2023-10-08 22:23:57
* @since 2.4.0
*/
public class TimerManager {
private static final HashedWheelTimer wheelTimer;
static {
wheelTimer = new HashedWheelTimer(
new CustomizableThreadFactory("retry-task-timer-wheel-"), 1,
TimeUnit.SECONDS, 1024);
}
private TimerManager() {
}
public static Timeout add(TimerTask task, long delay, TimeUnit unit) {
return wheelTimer.newTimeout(task, delay, unit);
}
}

View File

@ -29,4 +29,6 @@ public class DispatchRetryRequest {
private Long retryTaskId;
@NotNull(message = "retryId 不能为空")
private Long retryId;
@NotNull(message = "executorTimeout 不能为空")
private Integer executorTimeout;
}

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.client.model;
package com.aizuda.snailjob.client.model.request;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
@ -11,11 +11,13 @@ import lombok.Data;
* @date 2022/03/25 10:06
*/
@Data
public class RetryCallbackDTO {
@NotBlank(message = "group 不能为空")
private String group;
@NotBlank(message = "scene 不能为空")
private String scene;
public class RetryCallbackRequest {
@NotBlank(message = "namespaceId 不能为空")
private String namespaceId;
@NotBlank(message = "groupName 不能为空")
private String groupName;
@NotBlank(message = "sceneName 不能为空")
private String sceneName;
@NotBlank(message = "参数 不能为空")
private String argsStr;
@NotBlank(message = "idempotentId 不能为空")
@ -28,6 +30,6 @@ public class RetryCallbackDTO {
private Long retryTaskId;
@NotNull(message = "retryId 不能为空")
private Long retryId;
@NotBlank(message = "namespaceId 不能为空")
private String namespaceId;
@NotNull(message = "executorTimeout 不能为空")
private Integer executorTimeout;
}

View File

@ -1,8 +1,7 @@
package com.aizuda.snailjob.server.retry.task.client;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.snailjob.client.model.RetryCallbackDTO;
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
import com.aizuda.snailjob.common.core.model.Result;
@ -30,7 +29,7 @@ public interface RetryRpcClient {
Result<Boolean> stop(@Body StopRetryRequest stopRetryRequest);
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
Result<Boolean> callback(@Body RetryCallbackDTO retryCallbackDTO);
Result<Boolean> callback(@Body RetryCallbackRequest retryCallbackRequest);
@Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST)
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);

View File

@ -32,4 +32,6 @@ public class RetryTaskPrepareDTO {
private Integer blockStrategy;
private boolean onlyTimeoutCheck;
private Integer executorTimeout;
}

View File

@ -19,4 +19,11 @@ public class TaskStopJobDTO extends BaseDTO {
* 操作原因
*/
private Integer operationReason;
private String message;
/**
* 是否需要变更任务状态
*/
private boolean needUpdateTaskStatus;
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -125,6 +126,8 @@ public interface RetryTaskConverter {
RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(TaskStopJobDTO resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestRetryExecutorDTO resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestCallbackExecutorDTO resultDTO);
@ -137,7 +140,9 @@ public interface RetryTaskConverter {
TaskStopJobDTO toTaskStopJobDTO(BlockStrategyContext context);
StopRetryRequest toStopRetryRequest(RequestCallbackExecutorDTO executorDTO);
TaskStopJobDTO toTaskStopJobDTO(Retry retry);
TaskStopJobDTO toTaskStopJobDTO(RetryTaskPrepareDTO context);
StopRetryRequest toStopRetryRequest(RequestStopRetryTaskExecutorDTO executorDTO);
@ -167,4 +172,6 @@ public interface RetryTaskConverter {
@Mapping(target = "taskType", source = "retry.taskType"),
})
RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry);
RetryCallbackRequest toRetryCallbackDTO(RequestCallbackExecutorDTO executorDTO);
}

View File

@ -36,9 +36,10 @@ public class OverlayRetryBlockStrategy extends AbstracJobBlockStrategy {
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(context);
if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) {
stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_DISCARD.getReason());
stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_OVERLAY.getReason());
}
stopJobDTO.setNeedUpdateTaskStatus(true);
retryTaskStopHandler.stop(stopJobDTO);
}

View File

@ -2,7 +2,8 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -17,16 +18,13 @@ import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -78,13 +76,13 @@ public class RequestCallbackClientActor extends AbstractActor {
return;
}
StopRetryRequest stopRetryRequest = RetryTaskConverter.INSTANCE.toStopRetryRequest(executorDTO);
RetryCallbackRequest retryCallbackRequest = RetryTaskConverter.INSTANCE.toRetryCallbackDTO(executorDTO);
try {
// 构建请求客户端对象
RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO);
Result<Boolean> dispatch = rpcClient.stop(stopRetryRequest);
Result<Boolean> dispatch = rpcClient.callback(retryCallbackRequest);
if (dispatch.getStatus() == StatusEnum.YES.getStatus()) {
SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId());
} else {
@ -128,7 +126,7 @@ public class RequestCallbackClientActor extends AbstractActor {
@Override
public <V> void onRetry(final Attempt<V> attempt) {
if (attempt.getAttemptNumber() > 0) {
if (attempt.getAttemptNumber() > 1) {
// 更新最新负载节点
String hostId = (String) properties.get("HOST_ID");
String hostIp = (String) properties.get("HOST_IP");
@ -176,6 +174,7 @@ public class RequestCallbackClientActor extends AbstractActor {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.model.SnailJobHeaders;
@ -181,6 +182,7 @@ public class RequestRetryClientActor extends AbstractActor {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -15,12 +15,15 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecuteDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimeoutCheckTask;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerWheel;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper;
@ -34,6 +37,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Objects;
/**
@ -53,6 +57,7 @@ public class RetryExecutor extends AbstractActor {
private final RetryTaskMapper retryTaskMapper;
private final SceneConfigMapper sceneConfigMapper;
private final ClientNodeAllocateHandler clientNodeAllocateHandler;
private final RetryTaskStopHandler retryTaskStopHandler;
@Override
public Receive createReceive() {
@ -103,14 +108,14 @@ public class RetryExecutor extends AbstractActor {
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(),
ClientInfoUtils.generate(serverNode));
Object executorDTO;
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
// 请求客户端
RequestCallbackExecutorDTO callbackExecutorDTO = RetryTaskConverter.INSTANCE.toRequestCallbackExecutorDTO(retrySceneConfig, retry);
callbackExecutorDTO.setClientId(serverNode.getHostId());
callbackExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
executorDTO = callbackExecutorDTO;
ActorRef actorRef = ActorGenerator.callbackRealTaskExecutorActor();
actorRef.tell(callbackExecutorDTO, actorRef);
} else {
// 请求客户端
@ -118,21 +123,21 @@ public class RetryExecutor extends AbstractActor {
retryExecutorDTO.setClientId(serverNode.getHostId());
retryExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
executorDTO = retryExecutorDTO;
ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor();
actorRef.tell(retryExecutorDTO, actorRef);
}
ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor();
actorRef.tell(executorDTO, actorRef);
// 运行中的任务需要进行超时检查
RetryTimerWheel.registerWithRetry(() -> new RetryTimeoutCheckTask(
execute.getRetryTaskId(), execute.getRetryId(), retryTaskStopHandler, retryMapper, retryTaskMapper),
// 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断
Duration.ofMillis(DateUtils.toEpochMilli(retrySceneConfig.getExecutorTimeout()) + 500));
}
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, String clientInfo) {
updateRetryTaskStatus(retryTaskId, taskStatus, RetryOperationReasonEnum.NONE, clientInfo);
}
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus) {
updateRetryTaskStatus(retryTaskId, taskStatus, RetryOperationReasonEnum.NONE, null);
}
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, RetryOperationReasonEnum reasonEnum) {
updateRetryTaskStatus(retryTaskId, taskStatus, reasonEnum, null);
}

View File

@ -119,7 +119,8 @@ public class ScanRetryActor extends AbstractActor {
partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName());
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName)
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName,
RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval)
.in(RetrySceneConfig::getSceneName, sceneNameSet));
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
}
@ -132,6 +133,7 @@ public class ScanRetryActor extends AbstractActor {
RetryTaskPrepareDTO retryTaskPrepareDTO = RetryTaskConverter.INSTANCE.toRetryTaskPrepareDTO(partitionTask);
retryTaskPrepareDTO.setBlockStrategy(retrySceneConfig.getBackOff());
retryTaskPrepareDTO.setExecutorTimeout(retrySceneConfig.getExecutorTimeout());
waitExecRetries.add(retryTaskPrepareDTO);
}
@ -148,13 +150,15 @@ public class ScanRetryActor extends AbstractActor {
}
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1);
// 更新触发时间, 任务进入时间轮
WaitStrategy waitStrategy;
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(partitionTask.getTaskType())) {
waitStrategyContext.setTriggerInterval(retrySceneConfig.getCbTriggerInterval());
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType());
} else {
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.generator.retry;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
@ -10,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.enums.DelayLevelEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
@ -49,7 +51,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
@Autowired
private List<IdGenerator> idGeneratorList;
@Autowired
private RetryTaskMapper retryTaskMapper;
private SystemProperties systemProperties;
@Override
@Transactional
@ -79,13 +81,11 @@ public abstract class AbstractGenerator implements TaskGenerator {
Map<String/*幂等ID*/, List<Retry>> retryTaskMap = StreamUtils.groupByKey(retries, Retry::getIdempotentId);
List<Retry> waitInsertTasks = new ArrayList<>();
List<RetryTask> waitInsertTaskLogs = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
for (TaskContext.TaskInfo taskInfo : taskInfos) {
Pair<List<Retry>, List<RetryTask>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo,
retrySceneConfig);
waitInsertTasks.addAll(pair.getKey());
waitInsertTaskLogs.addAll(pair.getValue());
}
if (CollUtil.isEmpty(waitInsertTasks)) {
@ -95,8 +95,6 @@ public abstract class AbstractGenerator implements TaskGenerator {
Assert.isTrue(
waitInsertTasks.size() == retryTaskAccess.insertBatch(waitInsertTasks),
() -> new SnailJobServerException("failed to report data"));
/*Assert.isTrue(waitInsertTaskLogs.size() == retryTaskMapper.insertBatch(waitInsertTaskLogs),
() -> new SnailJobServerException("新增重试日志失败"));*/
}
/**
@ -130,6 +128,12 @@ public abstract class AbstractGenerator implements TaskGenerator {
retry.setSceneName(taskContext.getSceneName());
retry.setRetryStatus(initStatus(taskContext));
retry.setBizNo(Optional.ofNullable(retry.getBizNo()).orElse(StrUtil.EMPTY));
// 计算分桶逻辑
retry.setBucketIndex(
HashUtil.bkdrHash(taskContext.getGroupName() + taskContext.getSceneName() + taskInfo.getIdempotentId())
% systemProperties.getBucketTotal()
);
retry.setCreateDt(now);
retry.setUpdateDt(now);

View File

@ -16,6 +16,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Objects;
/**
* <p>
@ -38,10 +39,20 @@ public class RetryTaskGeneratorHandler {
public void generateRetryTask(RetryTaskGeneratorDTO generator) {
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(generator);
retryTask.setTaskStatus(RetryTaskStatusEnum.WAITING.getStatus());
Integer taskStatus = generator.getTaskStatus();
if (Objects.isNull(taskStatus)) {
taskStatus = RetryTaskStatusEnum.WAITING.getStatus();
}
retryTask.setTaskStatus(taskStatus);
retryTask.setOperationReason(generator.getOperationReason());
retryTask.setExtAttrs(StrUtil.EMPTY);
Assert.isTrue(1 == retryTaskMapper.insert(retryTask), () -> new SnailJobServerException("插入重试任务失败"));
if (!RetryTaskStatusEnum.WAITING.getStatus().equals(taskStatus)) {
return;
}
// 放到到时间轮
long delay = generator.getNextTriggerAt() - DateUtils.toNowMilli();
RetryTimerContext timerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(generator);

View File

@ -2,10 +2,12 @@ package com.aizuda.snailjob.server.retry.task.support.handler;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
@ -42,6 +44,20 @@ public class RetryTaskStopHandler {
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
actorRef.tell(executorDTO, actorRef);
// 更新结果为失败
doHandleResult(stopJobDTO);
}
private static void doHandleResult(TaskStopJobDTO stopJobDTO) {
if (!stopJobDTO.isNeedUpdateTaskStatus()) {
return;
}
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO);
executorResultDTO.setExceptionMsg(stopJobDTO.getMessage());
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
actorRef.tell(executorResultDTO, actorRef);
}

View File

@ -1,13 +1,19 @@
package com.aizuda.snailjob.server.retry.task.support.prepare;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.BlockStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryPrePareHandler;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.block.BlockStrategyContext;
import com.aizuda.snailjob.server.retry.task.support.block.RetryBlockStrategyFactory;
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -23,7 +29,9 @@ import java.util.Objects;
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class RunningRetryPrepareHandler implements RetryPrePareHandler {
private final RetryTaskStopHandler retryTaskStopHandler;
@Override
public boolean matches(Integer status) {
@ -35,13 +43,19 @@ public class RunningRetryPrepareHandler implements RetryPrePareHandler {
// 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理
int blockStrategy = prepare.getBlockStrategy();
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
// CompleteJobBatchDTO completeJobBatchDTO = RetryTaskConverter.INSTANCE.completeJobBatchDTO(prepare);
// completeJobBatchDTO.setJobOperationReason(jobOperationReasonEnum.getReason());
// if (jobTaskBatchHandler.handleResult(completeJobBatchDTO)) {
// blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
// } else {
//
// }
// 计算超时时间
long delay = DateUtils.toNowMilli() - prepare.getNextTriggerAt();
// 计算超时时间到达超时时间中断任务
if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) {
log.info("任务执行超时.retryTaskId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getRetryTaskId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(prepare);
stopJobDTO.setOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobDTO.setNeedUpdateTaskStatus(true);
retryTaskStopHandler.stop(stopJobDTO);
}
// 仅是超时检测的不执行阻塞策略
if (prepare.isOnlyTimeoutCheck()) {

View File

@ -17,15 +17,17 @@ public abstract class AbstractTimerTask implements TimerTask<String> {
protected String groupName;
protected String namespaceId;
protected Long retryId;
protected Long retryTaskId;
@Override
public void run(Timeout timeout) throws Exception {
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName,
retryId, namespaceId);
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
LocalDateTime.now(), groupName, retryId, retryTaskId, namespaceId);
try {
doRun(timeout);
} catch (Exception e) {
log.error("重试任务执行失败 groupName:[{}] retryId:[{}] namespaceId:[{}]", groupName, retryId, namespaceId, e);
log.error("重试任务执行失败 groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
groupName, retryId, retryTaskId, namespaceId, e);
} finally {
// 先清除时间轮的缓存
RetryTimerWheel.clearCache(idempotentKey());

View File

@ -0,0 +1,74 @@
package com.aizuda.snailjob.server.retry.task.support.timer;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import java.text.MessageFormat;
import java.util.Objects;
/**
* 任务超时检查
*
* @author opensnail
* @date 2024-05-20 21:16:09
* @since sj_1.0.0
*/
@AllArgsConstructor
public class RetryTimeoutCheckTask implements TimerTask<String> {
private static final String IDEMPOTENT_KEY_PREFIX = "retry_timeout_check_{0}";
private final Long retryTaskId;
private final Long retryId;
private final RetryTaskStopHandler retryTaskStopHandler;
private final RetryMapper retryMapper;
private final RetryTaskMapper retryTaskMapper;
@Override
public void run(Timeout timeout) throws Exception {
RetryTimerWheel.clearCache(idempotentKey());
RetryTask retryTask = retryTaskMapper.selectById(retryTaskId);
if (Objects.isNull(retryTask)) {
SnailJobLog.LOCAL.error("retryTaskId:[{}] 不存在", retryTaskId);
return;
}
// 已经完成了无需重复停止任务
if (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus())) {
return;
}
Retry retry = retryMapper.selectById(retryId);
if (Objects.isNull(retry)) {
SnailJobLog.LOCAL.error("retryId:[{}]不存在", retryId);
return;
}
// 超时停止任务
String reason = "超时中断.retryTaskId:[" + retryTaskId + "]";
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(retry);
stopJobDTO.setRetryTaskId(retryTaskId);
stopJobDTO.setRetryId(retryId);
stopJobDTO.setOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobDTO.setNeedUpdateTaskStatus(true);
retryTaskStopHandler.stop(stopJobDTO);
SnailJobLog.LOCAL.info(reason);
}
@Override
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, retryId);
}
}

View File

@ -38,6 +38,8 @@ public class RetryTimerTask extends AbstractTimerTask {
this.context = context;
super.groupName = context.getGroupName();
super.namespaceId = context.getNamespaceId();
super.retryId = context.getRetryId();
super.retryTaskId = context.getRetryTaskId();
}
@Override