feat(1.4.0-beta1): 添加超时机制
This commit is contained in:
parent
5d33be270a
commit
88311328d4
@ -120,24 +120,23 @@ CREATE TABLE `sj_retry`
|
|||||||
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
|
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
|
||||||
`args_str` text NOT NULL COMMENT '执行方法参数',
|
`args_str` text NOT NULL COMMENT '执行方法参数',
|
||||||
`ext_attrs` text NOT NULL COMMENT '扩展字段',
|
`ext_attrs` text NOT NULL COMMENT '扩展字段',
|
||||||
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
|
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
|
||||||
`retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数',
|
`retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数',
|
||||||
`retry_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '重试状态 0、重试中 1、成功 2、最大重试次数',
|
`retry_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '重试状态 0、重试中 1、成功 2、最大重试次数',
|
||||||
`task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据',
|
`task_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '任务类型 1、重试数据 2、回调数据',
|
||||||
`bucket_index` int(11) NOT NULL DEFAULT 0 COMMENT 'bucket',
|
`bucket_index` int(11) NOT NULL DEFAULT 0 COMMENT 'bucket',
|
||||||
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父节点id',
|
`parent_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '父节点id',
|
||||||
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
|
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
PRIMARY KEY (`id`),
|
PRIMARY KEY (`id`),
|
||||||
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`),
|
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`),
|
||||||
KEY `idx_namespace_id_group_name_task_type` (`namespace_id`, `group_name`, `task_type`),
|
|
||||||
KEY `idx_namespace_id_group_name_retry_status` (`namespace_id`, `group_name`, `retry_status`),
|
KEY `idx_namespace_id_group_name_retry_status` (`namespace_id`, `group_name`, `retry_status`),
|
||||||
KEY `idx_idempotent_id` (`idempotent_id`),
|
KEY `idx_idempotent_id` (`idempotent_id`),
|
||||||
KEY `idx_biz_no` (`biz_no`),
|
KEY `idx_biz_no` (`biz_no`),
|
||||||
KEY `idx_parent_id` (`parent_id`),
|
KEY `idx_parent_id` (`parent_id`),
|
||||||
KEY `idx_create_dt` (`create_dt`),
|
KEY `idx_create_dt` (`create_dt`),
|
||||||
UNIQUE KEY `uk_name_unique_id_deleted` (`namespace_id`, `group_name`, `idempotent_id`, `deleted`)
|
UNIQUE KEY `uk_name_task_type_idempotent_id_deleted` (`namespace_id`, `group_name`, `task_type`, `idempotent_id`, `deleted`)
|
||||||
) ENGINE = InnoDB
|
) ENGINE = InnoDB
|
||||||
AUTO_INCREMENT = 0
|
AUTO_INCREMENT = 0
|
||||||
DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表'
|
DEFAULT CHARSET = utf8mb4 COMMENT ='重试信息表'
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
package com.aizuda.snailjob.client.core.cache;
|
package com.aizuda.snailjob.client.core.cache;
|
||||||
|
|
||||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
|
||||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -20,10 +18,10 @@ public class FutureCache {
|
|||||||
futureCache.put(retryTaskId, future);
|
futureCache.put(retryTaskId, future);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void remove(Long taskBatchId) {
|
public static void remove(Long retryTaskId) {
|
||||||
Optional.ofNullable(futureCache.get(taskBatchId)).ifPresent(future -> {
|
Optional.ofNullable(futureCache.get(retryTaskId)).ifPresent(future -> {
|
||||||
future.cancel(true);
|
future.cancel(true);
|
||||||
futureCache.remove(taskBatchId);
|
futureCache.remove(retryTaskId);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,9 +20,11 @@ import com.aizuda.snailjob.client.core.executor.RemoteRetryExecutor;
|
|||||||
import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader;
|
import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader;
|
||||||
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
|
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
|
||||||
import com.aizuda.snailjob.client.core.serializer.JacksonSerializer;
|
import com.aizuda.snailjob.client.core.serializer.JacksonSerializer;
|
||||||
|
import com.aizuda.snailjob.client.core.timer.StopTaskTimerTask;
|
||||||
|
import com.aizuda.snailjob.client.core.timer.TimerManager;
|
||||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
||||||
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
|
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
|
||||||
import com.aizuda.snailjob.client.model.RetryCallbackDTO;
|
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
|
||||||
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
||||||
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
|
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
|
||||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
@ -106,17 +108,20 @@ public class SnailRetryEndPoint implements Lifecycle {
|
|||||||
FutureCache.addFuture(request.getRetryTaskId(), submit);
|
FutureCache.addFuture(request.getRetryTaskId(), submit);
|
||||||
Futures.addCallback(submit, new RetryTaskExecutorFutureCallback(retryContext), decorator);
|
Futures.addCallback(submit, new RetryTaskExecutorFutureCallback(retryContext), decorator);
|
||||||
|
|
||||||
|
// 将任务添加到时间轮中,到期停止任务
|
||||||
|
TimerManager.add(new StopTaskTimerTask(request.getRetryTaskId()), request.getExecutorTimeout(), TimeUnit.SECONDS);
|
||||||
|
|
||||||
return new Result<>(Boolean.TRUE);
|
return new Result<>(Boolean.TRUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
|
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
|
||||||
public Result<Boolean> callback(@Valid RetryCallbackDTO callbackDTO) {
|
public Result<Boolean> callback(@Valid RetryCallbackRequest callbackDTO) {
|
||||||
CallbackContext callbackContext = new CallbackContext();
|
CallbackContext callbackContext = new CallbackContext();
|
||||||
try {
|
try {
|
||||||
RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName());
|
RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getSceneName(), callbackDTO.getExecutorName());
|
||||||
if (Objects.isNull(retryerInfo)) {
|
if (Objects.isNull(retryerInfo)) {
|
||||||
SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getScene());
|
SnailJobLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getSceneName());
|
||||||
return new Result<>(0, "回调失败", Boolean.FALSE);
|
return new Result<>(0, "回调失败", Boolean.FALSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,6 +148,9 @@ public class SnailRetryEndPoint implements Lifecycle {
|
|||||||
FutureCache.addFuture(callbackDTO.getRetryTaskId(), submit);
|
FutureCache.addFuture(callbackDTO.getRetryTaskId(), submit);
|
||||||
Futures.addCallback(submit, new CallbackTaskExecutorFutureCallback(callbackContext), decorator);
|
Futures.addCallback(submit, new CallbackTaskExecutorFutureCallback(callbackContext), decorator);
|
||||||
|
|
||||||
|
// 将任务添加到时间轮中,到期停止任务
|
||||||
|
TimerManager.add(new StopTaskTimerTask(callbackDTO.getRetryTaskId()), callbackDTO.getExecutorTimeout(), TimeUnit.SECONDS);
|
||||||
|
|
||||||
return new Result<>(Boolean.TRUE);
|
return new Result<>(Boolean.TRUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
package com.aizuda.snailjob.client.core.timer;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.client.core.cache.FutureCache;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author opensnail
|
||||||
|
* @date 2025-02-18
|
||||||
|
* @since 1.4.0
|
||||||
|
*/
|
||||||
|
public class StopTaskTimerTask implements TimerTask {
|
||||||
|
|
||||||
|
private Long retryTaskId;
|
||||||
|
|
||||||
|
public StopTaskTimerTask(Long retryTaskId) {
|
||||||
|
this.retryTaskId = retryTaskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(Timeout timeout) throws Exception {
|
||||||
|
FutureCache.remove(retryTaskId);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
package com.aizuda.snailjob.client.core.timer;
|
||||||
|
|
||||||
|
import io.netty.util.HashedWheelTimer;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author opensnail
|
||||||
|
* @date 2023-10-08 22:23:57
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
public class TimerManager {
|
||||||
|
|
||||||
|
private static final HashedWheelTimer wheelTimer;
|
||||||
|
|
||||||
|
static {
|
||||||
|
wheelTimer = new HashedWheelTimer(
|
||||||
|
new CustomizableThreadFactory("retry-task-timer-wheel-"), 1,
|
||||||
|
TimeUnit.SECONDS, 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimerManager() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Timeout add(TimerTask task, long delay, TimeUnit unit) {
|
||||||
|
return wheelTimer.newTimeout(task, delay, unit);
|
||||||
|
}
|
||||||
|
}
|
@ -29,4 +29,6 @@ public class DispatchRetryRequest {
|
|||||||
private Long retryTaskId;
|
private Long retryTaskId;
|
||||||
@NotNull(message = "retryId 不能为空")
|
@NotNull(message = "retryId 不能为空")
|
||||||
private Long retryId;
|
private Long retryId;
|
||||||
|
@NotNull(message = "executorTimeout 不能为空")
|
||||||
|
private Integer executorTimeout;
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package com.aizuda.snailjob.client.model;
|
package com.aizuda.snailjob.client.model.request;
|
||||||
|
|
||||||
import jakarta.validation.constraints.NotBlank;
|
import jakarta.validation.constraints.NotBlank;
|
||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
@ -11,11 +11,13 @@ import lombok.Data;
|
|||||||
* @date 2022/03/25 10:06
|
* @date 2022/03/25 10:06
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class RetryCallbackDTO {
|
public class RetryCallbackRequest {
|
||||||
@NotBlank(message = "group 不能为空")
|
@NotBlank(message = "namespaceId 不能为空")
|
||||||
private String group;
|
private String namespaceId;
|
||||||
@NotBlank(message = "scene 不能为空")
|
@NotBlank(message = "groupName 不能为空")
|
||||||
private String scene;
|
private String groupName;
|
||||||
|
@NotBlank(message = "sceneName 不能为空")
|
||||||
|
private String sceneName;
|
||||||
@NotBlank(message = "参数 不能为空")
|
@NotBlank(message = "参数 不能为空")
|
||||||
private String argsStr;
|
private String argsStr;
|
||||||
@NotBlank(message = "idempotentId 不能为空")
|
@NotBlank(message = "idempotentId 不能为空")
|
||||||
@ -28,6 +30,6 @@ public class RetryCallbackDTO {
|
|||||||
private Long retryTaskId;
|
private Long retryTaskId;
|
||||||
@NotNull(message = "retryId 不能为空")
|
@NotNull(message = "retryId 不能为空")
|
||||||
private Long retryId;
|
private Long retryId;
|
||||||
@NotBlank(message = "namespaceId 不能为空")
|
@NotNull(message = "executorTimeout 不能为空")
|
||||||
private String namespaceId;
|
private Integer executorTimeout;
|
||||||
}
|
}
|
@ -1,8 +1,7 @@
|
|||||||
package com.aizuda.snailjob.server.retry.task.client;
|
package com.aizuda.snailjob.server.retry.task.client;
|
||||||
|
|
||||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
|
||||||
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
|
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
|
||||||
import com.aizuda.snailjob.client.model.RetryCallbackDTO;
|
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
|
||||||
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
||||||
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
|
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
|
||||||
import com.aizuda.snailjob.common.core.model.Result;
|
import com.aizuda.snailjob.common.core.model.Result;
|
||||||
@ -30,7 +29,7 @@ public interface RetryRpcClient {
|
|||||||
Result<Boolean> stop(@Body StopRetryRequest stopRetryRequest);
|
Result<Boolean> stop(@Body StopRetryRequest stopRetryRequest);
|
||||||
|
|
||||||
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
|
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
|
||||||
Result<Boolean> callback(@Body RetryCallbackDTO retryCallbackDTO);
|
Result<Boolean> callback(@Body RetryCallbackRequest retryCallbackRequest);
|
||||||
|
|
||||||
@Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST)
|
@Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST)
|
||||||
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);
|
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);
|
||||||
|
@ -32,4 +32,6 @@ public class RetryTaskPrepareDTO {
|
|||||||
private Integer blockStrategy;
|
private Integer blockStrategy;
|
||||||
|
|
||||||
private boolean onlyTimeoutCheck;
|
private boolean onlyTimeoutCheck;
|
||||||
|
|
||||||
|
private Integer executorTimeout;
|
||||||
}
|
}
|
||||||
|
@ -19,4 +19,11 @@ public class TaskStopJobDTO extends BaseDTO {
|
|||||||
* 操作原因
|
* 操作原因
|
||||||
*/
|
*/
|
||||||
private Integer operationReason;
|
private Integer operationReason;
|
||||||
|
|
||||||
|
private String message;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否需要变更任务状态
|
||||||
|
*/
|
||||||
|
private boolean needUpdateTaskStatus;
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support;
|
|||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
||||||
|
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
|
||||||
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
||||||
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
|
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
@ -125,6 +126,8 @@ public interface RetryTaskConverter {
|
|||||||
|
|
||||||
RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO);
|
RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO);
|
||||||
|
|
||||||
|
RetryExecutorResultDTO toRetryExecutorResultDTO(TaskStopJobDTO resultDTO);
|
||||||
|
|
||||||
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestRetryExecutorDTO resultDTO);
|
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestRetryExecutorDTO resultDTO);
|
||||||
|
|
||||||
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestCallbackExecutorDTO resultDTO);
|
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestCallbackExecutorDTO resultDTO);
|
||||||
@ -137,7 +140,9 @@ public interface RetryTaskConverter {
|
|||||||
|
|
||||||
TaskStopJobDTO toTaskStopJobDTO(BlockStrategyContext context);
|
TaskStopJobDTO toTaskStopJobDTO(BlockStrategyContext context);
|
||||||
|
|
||||||
StopRetryRequest toStopRetryRequest(RequestCallbackExecutorDTO executorDTO);
|
TaskStopJobDTO toTaskStopJobDTO(Retry retry);
|
||||||
|
|
||||||
|
TaskStopJobDTO toTaskStopJobDTO(RetryTaskPrepareDTO context);
|
||||||
|
|
||||||
StopRetryRequest toStopRetryRequest(RequestStopRetryTaskExecutorDTO executorDTO);
|
StopRetryRequest toStopRetryRequest(RequestStopRetryTaskExecutorDTO executorDTO);
|
||||||
|
|
||||||
@ -167,4 +172,6 @@ public interface RetryTaskConverter {
|
|||||||
@Mapping(target = "taskType", source = "retry.taskType"),
|
@Mapping(target = "taskType", source = "retry.taskType"),
|
||||||
})
|
})
|
||||||
RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry);
|
RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry);
|
||||||
|
|
||||||
|
RetryCallbackRequest toRetryCallbackDTO(RequestCallbackExecutorDTO executorDTO);
|
||||||
}
|
}
|
||||||
|
@ -36,9 +36,10 @@ public class OverlayRetryBlockStrategy extends AbstracJobBlockStrategy {
|
|||||||
|
|
||||||
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(context);
|
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(context);
|
||||||
if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) {
|
if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) {
|
||||||
stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_DISCARD.getReason());
|
stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_OVERLAY.getReason());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stopJobDTO.setNeedUpdateTaskStatus(true);
|
||||||
retryTaskStopHandler.stop(stopJobDTO);
|
retryTaskStopHandler.stop(stopJobDTO);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,8 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch;
|
|||||||
|
|
||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
|
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.model.Result;
|
import com.aizuda.snailjob.common.core.model.Result;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
@ -17,16 +18,13 @@ import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
|||||||
import com.aizuda.snailjob.server.common.util.DateUtils;
|
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||||
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
|
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
|
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO;
|
|
||||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
|
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
|
||||||
import com.github.rholder.retry.Attempt;
|
import com.github.rholder.retry.Attempt;
|
||||||
import com.github.rholder.retry.RetryException;
|
import com.github.rholder.retry.RetryException;
|
||||||
import com.github.rholder.retry.RetryListener;
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -78,13 +76,13 @@ public class RequestCallbackClientActor extends AbstractActor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
StopRetryRequest stopRetryRequest = RetryTaskConverter.INSTANCE.toStopRetryRequest(executorDTO);
|
RetryCallbackRequest retryCallbackRequest = RetryTaskConverter.INSTANCE.toRetryCallbackDTO(executorDTO);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// 构建请求客户端对象
|
// 构建请求客户端对象
|
||||||
RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO);
|
RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO);
|
||||||
Result<Boolean> dispatch = rpcClient.stop(stopRetryRequest);
|
Result<Boolean> dispatch = rpcClient.callback(retryCallbackRequest);
|
||||||
if (dispatch.getStatus() == StatusEnum.YES.getStatus()) {
|
if (dispatch.getStatus() == StatusEnum.YES.getStatus()) {
|
||||||
SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId());
|
SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId());
|
||||||
} else {
|
} else {
|
||||||
@ -128,7 +126,7 @@ public class RequestCallbackClientActor extends AbstractActor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> void onRetry(final Attempt<V> attempt) {
|
public <V> void onRetry(final Attempt<V> attempt) {
|
||||||
if (attempt.getAttemptNumber() > 0) {
|
if (attempt.getAttemptNumber() > 1) {
|
||||||
// 更新最新负载节点
|
// 更新最新负载节点
|
||||||
String hostId = (String) properties.get("HOST_ID");
|
String hostId = (String) properties.get("HOST_ID");
|
||||||
String hostIp = (String) properties.get("HOST_IP");
|
String hostIp = (String) properties.get("HOST_IP");
|
||||||
@ -176,6 +174,7 @@ public class RequestCallbackClientActor extends AbstractActor {
|
|||||||
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||||
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
||||||
executorResultDTO.setExceptionMsg(message);
|
executorResultDTO.setExceptionMsg(message);
|
||||||
|
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
|
||||||
actorRef.tell(executorResultDTO, actorRef);
|
actorRef.tell(executorResultDTO, actorRef);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch;
|
|||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.model.Result;
|
import com.aizuda.snailjob.common.core.model.Result;
|
||||||
import com.aizuda.snailjob.common.core.model.SnailJobHeaders;
|
import com.aizuda.snailjob.common.core.model.SnailJobHeaders;
|
||||||
@ -181,6 +182,7 @@ public class RequestRetryClientActor extends AbstractActor {
|
|||||||
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||||
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
|
||||||
executorResultDTO.setExceptionMsg(message);
|
executorResultDTO.setExceptionMsg(message);
|
||||||
|
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
|
||||||
actorRef.tell(executorResultDTO, actorRef);
|
actorRef.tell(executorResultDTO, actorRef);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,12 +15,15 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
|||||||
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
|
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
|
||||||
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
|
|
||||||
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
||||||
|
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecuteDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecuteDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimeoutCheckTask;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerWheel;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper;
|
||||||
@ -34,6 +37,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -53,6 +57,7 @@ public class RetryExecutor extends AbstractActor {
|
|||||||
private final RetryTaskMapper retryTaskMapper;
|
private final RetryTaskMapper retryTaskMapper;
|
||||||
private final SceneConfigMapper sceneConfigMapper;
|
private final SceneConfigMapper sceneConfigMapper;
|
||||||
private final ClientNodeAllocateHandler clientNodeAllocateHandler;
|
private final ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||||
|
private final RetryTaskStopHandler retryTaskStopHandler;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
@ -103,14 +108,14 @@ public class RetryExecutor extends AbstractActor {
|
|||||||
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(),
|
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(),
|
||||||
ClientInfoUtils.generate(serverNode));
|
ClientInfoUtils.generate(serverNode));
|
||||||
|
|
||||||
Object executorDTO;
|
|
||||||
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
|
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
|
||||||
// 请求客户端
|
// 请求客户端
|
||||||
RequestCallbackExecutorDTO callbackExecutorDTO = RetryTaskConverter.INSTANCE.toRequestCallbackExecutorDTO(retrySceneConfig, retry);
|
RequestCallbackExecutorDTO callbackExecutorDTO = RetryTaskConverter.INSTANCE.toRequestCallbackExecutorDTO(retrySceneConfig, retry);
|
||||||
callbackExecutorDTO.setClientId(serverNode.getHostId());
|
callbackExecutorDTO.setClientId(serverNode.getHostId());
|
||||||
callbackExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
|
callbackExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
|
||||||
|
|
||||||
executorDTO = callbackExecutorDTO;
|
ActorRef actorRef = ActorGenerator.callbackRealTaskExecutorActor();
|
||||||
|
actorRef.tell(callbackExecutorDTO, actorRef);
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// 请求客户端
|
// 请求客户端
|
||||||
@ -118,21 +123,21 @@ public class RetryExecutor extends AbstractActor {
|
|||||||
retryExecutorDTO.setClientId(serverNode.getHostId());
|
retryExecutorDTO.setClientId(serverNode.getHostId());
|
||||||
retryExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
|
retryExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
|
||||||
|
|
||||||
executorDTO = retryExecutorDTO;
|
ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor();
|
||||||
|
actorRef.tell(retryExecutorDTO, actorRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor();
|
// 运行中的任务,需要进行超时检查
|
||||||
actorRef.tell(executorDTO, actorRef);
|
RetryTimerWheel.registerWithRetry(() -> new RetryTimeoutCheckTask(
|
||||||
|
execute.getRetryTaskId(), execute.getRetryId(), retryTaskStopHandler, retryMapper, retryTaskMapper),
|
||||||
|
// 加500ms是为了让尽量保证客户端自己先超时中断,防止客户端上报成功但是服务端已触发超时中断
|
||||||
|
Duration.ofMillis(DateUtils.toEpochMilli(retrySceneConfig.getExecutorTimeout()) + 500));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, String clientInfo) {
|
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, String clientInfo) {
|
||||||
updateRetryTaskStatus(retryTaskId, taskStatus, RetryOperationReasonEnum.NONE, clientInfo);
|
updateRetryTaskStatus(retryTaskId, taskStatus, RetryOperationReasonEnum.NONE, clientInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus) {
|
|
||||||
updateRetryTaskStatus(retryTaskId, taskStatus, RetryOperationReasonEnum.NONE, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, RetryOperationReasonEnum reasonEnum) {
|
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, RetryOperationReasonEnum reasonEnum) {
|
||||||
updateRetryTaskStatus(retryTaskId, taskStatus, reasonEnum, null);
|
updateRetryTaskStatus(retryTaskId, taskStatus, reasonEnum, null);
|
||||||
}
|
}
|
||||||
|
@ -119,7 +119,8 @@ public class ScanRetryActor extends AbstractActor {
|
|||||||
partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName());
|
partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName());
|
||||||
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
|
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
|
||||||
.list(new LambdaQueryWrapper<RetrySceneConfig>()
|
.list(new LambdaQueryWrapper<RetrySceneConfig>()
|
||||||
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName)
|
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName,
|
||||||
|
RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval)
|
||||||
.in(RetrySceneConfig::getSceneName, sceneNameSet));
|
.in(RetrySceneConfig::getSceneName, sceneNameSet));
|
||||||
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
|
return StreamUtils.toIdentityMap(retrySceneConfigs, RetrySceneConfig::getSceneName);
|
||||||
}
|
}
|
||||||
@ -132,6 +133,7 @@ public class ScanRetryActor extends AbstractActor {
|
|||||||
|
|
||||||
RetryTaskPrepareDTO retryTaskPrepareDTO = RetryTaskConverter.INSTANCE.toRetryTaskPrepareDTO(partitionTask);
|
RetryTaskPrepareDTO retryTaskPrepareDTO = RetryTaskConverter.INSTANCE.toRetryTaskPrepareDTO(partitionTask);
|
||||||
retryTaskPrepareDTO.setBlockStrategy(retrySceneConfig.getBackOff());
|
retryTaskPrepareDTO.setBlockStrategy(retrySceneConfig.getBackOff());
|
||||||
|
retryTaskPrepareDTO.setExecutorTimeout(retrySceneConfig.getExecutorTimeout());
|
||||||
waitExecRetries.add(retryTaskPrepareDTO);
|
waitExecRetries.add(retryTaskPrepareDTO);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,13 +150,15 @@ public class ScanRetryActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
|
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
|
||||||
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
|
|
||||||
waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1);
|
waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1);
|
||||||
|
|
||||||
// 更新触发时间, 任务进入时间轮
|
// 更新触发时间, 任务进入时间轮
|
||||||
WaitStrategy waitStrategy;
|
WaitStrategy waitStrategy;
|
||||||
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(partitionTask.getTaskType())) {
|
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(partitionTask.getTaskType())) {
|
||||||
|
waitStrategyContext.setTriggerInterval(retrySceneConfig.getCbTriggerInterval());
|
||||||
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType());
|
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType());
|
||||||
} else {
|
} else {
|
||||||
|
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
|
||||||
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
|
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.generator.retry;
|
|||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import cn.hutool.core.lang.Pair;
|
import cn.hutool.core.lang.Pair;
|
||||||
|
import cn.hutool.core.util.HashUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
@ -10,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
|
|||||||
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import com.aizuda.snailjob.server.common.WaitStrategy;
|
import com.aizuda.snailjob.server.common.WaitStrategy;
|
||||||
|
import com.aizuda.snailjob.server.common.config.SystemProperties;
|
||||||
import com.aizuda.snailjob.server.common.enums.DelayLevelEnum;
|
import com.aizuda.snailjob.server.common.enums.DelayLevelEnum;
|
||||||
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
@ -49,7 +51,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private List<IdGenerator> idGeneratorList;
|
private List<IdGenerator> idGeneratorList;
|
||||||
@Autowired
|
@Autowired
|
||||||
private RetryTaskMapper retryTaskMapper;
|
private SystemProperties systemProperties;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
@ -79,13 +81,11 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
|||||||
Map<String/*幂等ID*/, List<Retry>> retryTaskMap = StreamUtils.groupByKey(retries, Retry::getIdempotentId);
|
Map<String/*幂等ID*/, List<Retry>> retryTaskMap = StreamUtils.groupByKey(retries, Retry::getIdempotentId);
|
||||||
|
|
||||||
List<Retry> waitInsertTasks = new ArrayList<>();
|
List<Retry> waitInsertTasks = new ArrayList<>();
|
||||||
List<RetryTask> waitInsertTaskLogs = new ArrayList<>();
|
|
||||||
LocalDateTime now = LocalDateTime.now();
|
LocalDateTime now = LocalDateTime.now();
|
||||||
for (TaskContext.TaskInfo taskInfo : taskInfos) {
|
for (TaskContext.TaskInfo taskInfo : taskInfos) {
|
||||||
Pair<List<Retry>, List<RetryTask>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo,
|
Pair<List<Retry>, List<RetryTask>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo,
|
||||||
retrySceneConfig);
|
retrySceneConfig);
|
||||||
waitInsertTasks.addAll(pair.getKey());
|
waitInsertTasks.addAll(pair.getKey());
|
||||||
waitInsertTaskLogs.addAll(pair.getValue());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CollUtil.isEmpty(waitInsertTasks)) {
|
if (CollUtil.isEmpty(waitInsertTasks)) {
|
||||||
@ -95,8 +95,6 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
|||||||
Assert.isTrue(
|
Assert.isTrue(
|
||||||
waitInsertTasks.size() == retryTaskAccess.insertBatch(waitInsertTasks),
|
waitInsertTasks.size() == retryTaskAccess.insertBatch(waitInsertTasks),
|
||||||
() -> new SnailJobServerException("failed to report data"));
|
() -> new SnailJobServerException("failed to report data"));
|
||||||
/*Assert.isTrue(waitInsertTaskLogs.size() == retryTaskMapper.insertBatch(waitInsertTaskLogs),
|
|
||||||
() -> new SnailJobServerException("新增重试日志失败"));*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -130,6 +128,12 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
|||||||
retry.setSceneName(taskContext.getSceneName());
|
retry.setSceneName(taskContext.getSceneName());
|
||||||
retry.setRetryStatus(initStatus(taskContext));
|
retry.setRetryStatus(initStatus(taskContext));
|
||||||
retry.setBizNo(Optional.ofNullable(retry.getBizNo()).orElse(StrUtil.EMPTY));
|
retry.setBizNo(Optional.ofNullable(retry.getBizNo()).orElse(StrUtil.EMPTY));
|
||||||
|
// 计算分桶逻辑
|
||||||
|
retry.setBucketIndex(
|
||||||
|
HashUtil.bkdrHash(taskContext.getGroupName() + taskContext.getSceneName() + taskInfo.getIdempotentId())
|
||||||
|
% systemProperties.getBucketTotal()
|
||||||
|
);
|
||||||
|
|
||||||
retry.setCreateDt(now);
|
retry.setCreateDt(now);
|
||||||
retry.setUpdateDt(now);
|
retry.setUpdateDt(now);
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
@ -38,10 +39,20 @@ public class RetryTaskGeneratorHandler {
|
|||||||
public void generateRetryTask(RetryTaskGeneratorDTO generator) {
|
public void generateRetryTask(RetryTaskGeneratorDTO generator) {
|
||||||
|
|
||||||
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(generator);
|
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(generator);
|
||||||
retryTask.setTaskStatus(RetryTaskStatusEnum.WAITING.getStatus());
|
Integer taskStatus = generator.getTaskStatus();
|
||||||
|
if (Objects.isNull(taskStatus)) {
|
||||||
|
taskStatus = RetryTaskStatusEnum.WAITING.getStatus();
|
||||||
|
}
|
||||||
|
retryTask.setTaskStatus(taskStatus);
|
||||||
|
retryTask.setOperationReason(generator.getOperationReason());
|
||||||
|
|
||||||
retryTask.setExtAttrs(StrUtil.EMPTY);
|
retryTask.setExtAttrs(StrUtil.EMPTY);
|
||||||
Assert.isTrue(1 == retryTaskMapper.insert(retryTask), () -> new SnailJobServerException("插入重试任务失败"));
|
Assert.isTrue(1 == retryTaskMapper.insert(retryTask), () -> new SnailJobServerException("插入重试任务失败"));
|
||||||
|
|
||||||
|
if (!RetryTaskStatusEnum.WAITING.getStatus().equals(taskStatus)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 放到到时间轮
|
// 放到到时间轮
|
||||||
long delay = generator.getNextTriggerAt() - DateUtils.toNowMilli();
|
long delay = generator.getNextTriggerAt() - DateUtils.toNowMilli();
|
||||||
RetryTimerContext timerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(generator);
|
RetryTimerContext timerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(generator);
|
||||||
|
@ -2,10 +2,12 @@ package com.aizuda.snailjob.server.retry.task.support.handler;
|
|||||||
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
||||||
@ -42,6 +44,20 @@ public class RetryTaskStopHandler {
|
|||||||
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
|
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
|
||||||
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
|
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
|
||||||
actorRef.tell(executorDTO, actorRef);
|
actorRef.tell(executorDTO, actorRef);
|
||||||
|
|
||||||
|
// 更新结果为失败
|
||||||
|
doHandleResult(stopJobDTO);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void doHandleResult(TaskStopJobDTO stopJobDTO) {
|
||||||
|
if (!stopJobDTO.isNeedUpdateTaskStatus()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO);
|
||||||
|
executorResultDTO.setExceptionMsg(stopJobDTO.getMessage());
|
||||||
|
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE);
|
||||||
|
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
|
||||||
|
actorRef.tell(executorResultDTO, actorRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,13 +1,19 @@
|
|||||||
package com.aizuda.snailjob.server.retry.task.support.prepare;
|
package com.aizuda.snailjob.server.retry.task.support.prepare;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
||||||
|
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.BlockStrategy;
|
import com.aizuda.snailjob.server.retry.task.support.BlockStrategy;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.RetryPrePareHandler;
|
import com.aizuda.snailjob.server.retry.task.support.RetryPrePareHandler;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.block.BlockStrategyContext;
|
import com.aizuda.snailjob.server.retry.task.support.block.BlockStrategyContext;
|
||||||
import com.aizuda.snailjob.server.retry.task.support.block.RetryBlockStrategyFactory;
|
import com.aizuda.snailjob.server.retry.task.support.block.RetryBlockStrategyFactory;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@ -23,7 +29,9 @@ import java.util.Objects;
|
|||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
public class RunningRetryPrepareHandler implements RetryPrePareHandler {
|
public class RunningRetryPrepareHandler implements RetryPrePareHandler {
|
||||||
|
private final RetryTaskStopHandler retryTaskStopHandler;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean matches(Integer status) {
|
public boolean matches(Integer status) {
|
||||||
@ -35,13 +43,19 @@ public class RunningRetryPrepareHandler implements RetryPrePareHandler {
|
|||||||
// 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理
|
// 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理
|
||||||
int blockStrategy = prepare.getBlockStrategy();
|
int blockStrategy = prepare.getBlockStrategy();
|
||||||
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
|
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
|
||||||
// CompleteJobBatchDTO completeJobBatchDTO = RetryTaskConverter.INSTANCE.completeJobBatchDTO(prepare);
|
|
||||||
// completeJobBatchDTO.setJobOperationReason(jobOperationReasonEnum.getReason());
|
// 计算超时时间
|
||||||
// if (jobTaskBatchHandler.handleResult(completeJobBatchDTO)) {
|
long delay = DateUtils.toNowMilli() - prepare.getNextTriggerAt();
|
||||||
// blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
|
|
||||||
// } else {
|
// 计算超时时间,到达超时时间中断任务
|
||||||
//
|
if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) {
|
||||||
// }
|
log.info("任务执行超时.retryTaskId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getRetryTaskId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
|
||||||
|
// 超时停止任务
|
||||||
|
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(prepare);
|
||||||
|
stopJobDTO.setOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
|
||||||
|
stopJobDTO.setNeedUpdateTaskStatus(true);
|
||||||
|
retryTaskStopHandler.stop(stopJobDTO);
|
||||||
|
}
|
||||||
|
|
||||||
// 仅是超时检测的,不执行阻塞策略
|
// 仅是超时检测的,不执行阻塞策略
|
||||||
if (prepare.isOnlyTimeoutCheck()) {
|
if (prepare.isOnlyTimeoutCheck()) {
|
||||||
|
@ -17,15 +17,17 @@ public abstract class AbstractTimerTask implements TimerTask<String> {
|
|||||||
protected String groupName;
|
protected String groupName;
|
||||||
protected String namespaceId;
|
protected String namespaceId;
|
||||||
protected Long retryId;
|
protected Long retryId;
|
||||||
|
protected Long retryTaskId;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(Timeout timeout) throws Exception {
|
public void run(Timeout timeout) throws Exception {
|
||||||
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName,
|
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
|
||||||
retryId, namespaceId);
|
LocalDateTime.now(), groupName, retryId, retryTaskId, namespaceId);
|
||||||
try {
|
try {
|
||||||
doRun(timeout);
|
doRun(timeout);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("重试任务执行失败 groupName:[{}] retryId:[{}] namespaceId:[{}]", groupName, retryId, namespaceId, e);
|
log.error("重试任务执行失败 groupName:[{}] retryId:[{}] retryTaskId:[{}] namespaceId:[{}]",
|
||||||
|
groupName, retryId, retryTaskId, namespaceId, e);
|
||||||
} finally {
|
} finally {
|
||||||
// 先清除时间轮的缓存
|
// 先清除时间轮的缓存
|
||||||
RetryTimerWheel.clearCache(idempotentKey());
|
RetryTimerWheel.clearCache(idempotentKey());
|
||||||
|
@ -0,0 +1,74 @@
|
|||||||
|
package com.aizuda.snailjob.server.retry.task.support.timer;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
||||||
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
|
import com.aizuda.snailjob.server.common.TimerTask;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
|
||||||
|
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
|
||||||
|
import java.text.MessageFormat;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 任务超时检查
|
||||||
|
*
|
||||||
|
* @author opensnail
|
||||||
|
* @date 2024-05-20 21:16:09
|
||||||
|
* @since sj_1.0.0
|
||||||
|
*/
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class RetryTimeoutCheckTask implements TimerTask<String> {
|
||||||
|
private static final String IDEMPOTENT_KEY_PREFIX = "retry_timeout_check_{0}";
|
||||||
|
|
||||||
|
private final Long retryTaskId;
|
||||||
|
private final Long retryId;
|
||||||
|
private final RetryTaskStopHandler retryTaskStopHandler;
|
||||||
|
private final RetryMapper retryMapper;
|
||||||
|
private final RetryTaskMapper retryTaskMapper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(Timeout timeout) throws Exception {
|
||||||
|
RetryTimerWheel.clearCache(idempotentKey());
|
||||||
|
RetryTask retryTask = retryTaskMapper.selectById(retryTaskId);
|
||||||
|
if (Objects.isNull(retryTask)) {
|
||||||
|
SnailJobLog.LOCAL.error("retryTaskId:[{}] 不存在", retryTaskId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 已经完成了,无需重复停止任务
|
||||||
|
if (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Retry retry = retryMapper.selectById(retryId);
|
||||||
|
if (Objects.isNull(retry)) {
|
||||||
|
SnailJobLog.LOCAL.error("retryId:[{}]不存在", retryId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 超时停止任务
|
||||||
|
String reason = "超时中断.retryTaskId:[" + retryTaskId + "]";
|
||||||
|
|
||||||
|
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(retry);
|
||||||
|
stopJobDTO.setRetryTaskId(retryTaskId);
|
||||||
|
stopJobDTO.setRetryId(retryId);
|
||||||
|
stopJobDTO.setOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
|
||||||
|
stopJobDTO.setNeedUpdateTaskStatus(true);
|
||||||
|
retryTaskStopHandler.stop(stopJobDTO);
|
||||||
|
|
||||||
|
SnailJobLog.LOCAL.info(reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String idempotentKey() {
|
||||||
|
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, retryId);
|
||||||
|
}
|
||||||
|
}
|
@ -38,6 +38,8 @@ public class RetryTimerTask extends AbstractTimerTask {
|
|||||||
this.context = context;
|
this.context = context;
|
||||||
super.groupName = context.getGroupName();
|
super.groupName = context.getGroupName();
|
||||||
super.namespaceId = context.getNamespaceId();
|
super.namespaceId = context.getNamespaceId();
|
||||||
|
super.retryId = context.getRetryId();
|
||||||
|
super.retryTaskId = context.getRetryTaskId();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user