feat(1.4.0-beta1): 页面调试

This commit is contained in:
opensnail 2025-02-17 23:01:12 +08:00
parent 0216e02f31
commit 9e7c785ed5
40 changed files with 575 additions and 1038 deletions

View File

@ -19,16 +19,8 @@ public class StopRetryRequest {
private String groupName;
@NotBlank(message = "scene 不能为空")
private String scene;
@NotBlank(message = "参数 不能为空")
private String argsStr;
@NotBlank(message = "idempotentId 不能为空")
private String idempotentId;
@NotBlank(message = "executorName 不能为空")
private String executorName;
@NotNull(message = "retryTaskId 不能为空")
private Long retryTaskId;
@NotNull(message = "retryId 不能为空")
private Long retryId;
@NotNull(message = "retryCount 不能为空")
private Integer retryCount;
}

View File

@ -35,8 +35,9 @@ public class ActorGenerator {
public static final String RETRY_EXECUTOR_ACTOR = "RetryExecutorActor";
public static final String RETRY_TASK_PREPARE_ACTOR = "RetryTaskPrepareActor";
public static final String LOG_ACTOR = "RetryLogActor";
public static final String REAL_RETRY_EXECUTOR_ACTOR = "RealRetryExecutorActor";
public static final String RETRY_EXECUTOR_RESULT_ACTOR = "RetryExecutorResultActor";
public static final String REAL_RETRY_EXECUTOR_ACTOR = "RealRetryExecutorActor";
public static final String REAL_CALLBACK_EXECUTOR_ACTOR = "RealCallbackExecutorActor";
public static final String RETRY_REAL_STOP_TASK_INSTANCE_ACTOR = "RetryRealStopTaskInstanceActor";
@ -72,64 +73,6 @@ public class ActorGenerator {
private ActorGenerator() {
}
/**
* 生成重试完成的actor
*
* @return actor 引用
*/
@Deprecated
public static ActorRef finishActor() {
return getRetryActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
}
/**
* 生成重试失败的actor
*
* @return actor 引用
*/
@Deprecated
public static ActorRef failureActor() {
return getRetryActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
}
/**
* 回调处理
*
* @return actor 引用
*/
@Deprecated
public static ActorRef execCallbackUnitActor() {
return getRetryActorSystem().actorOf(getSpringExtension()
.props(EXEC_CALLBACK_UNIT_ACTOR)
.withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
}
/**
* 生成重试执行的actor
*
* @return actor 引用
*/
@Deprecated
public static ActorRef execUnitActor() {
return getRetryActorSystem().actorOf(getSpringExtension()
.props(EXEC_UNIT_ACTOR)
.withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
}
/**
* 生成重试执行的actor
*
* @return actor 引用
*/
public static ActorRef stopRetryTaskActor() {
return getRetryActorSystem().actorOf(getSpringExtension()
.props(RETRY_REAL_STOP_TASK_INSTANCE_ACTOR)
.withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
}
/**
* Retry任务执行结果actor
*
@ -164,7 +107,6 @@ public class ActorGenerator {
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
}
/**
* actor
*
@ -176,9 +118,21 @@ public class ActorGenerator {
.withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
}
/**
* 尝试停止执行中的任务
*
* @return ActorRef
*/
public static ActorRef stopRetryTaskActor() {
return getRetryActorSystem().actorOf(getSpringExtension()
.props(RETRY_REAL_STOP_TASK_INSTANCE_ACTOR)
.withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
}
/**
* 调用客户端执行重试
* @return
*
* @return ActorRef
*/
public static ActorRef retryRealTaskExecutorActor() {
return getRetryActorSystem().actorOf(getSpringExtension()
@ -186,6 +140,18 @@ public class ActorGenerator {
.withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
}
/**
* 调用客户端执行回调
*
* @return ActorRef
*/
public static ActorRef callbackRealTaskExecutorActor() {
return getRetryActorSystem().actorOf(getSpringExtension()
.props(REAL_CALLBACK_EXECUTOR_ACTOR)
.withDispatcher(RETRY_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
}
/**

View File

@ -119,6 +119,15 @@ public class GrpcClientInvokeHandler implements InvocationHandler {
for (int count = 1; count <= size; count++) {
log.debug("Start request client. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
hostIp, hostPort, NetUtil.getLocalIpStr());
if (retryListener instanceof SnailJobRetryListener) {
// 传递修改之后的客户端节点信息
SnailJobRetryListener listener = (SnailJobRetryListener) retryListener;
Map<String, Object> properties = listener.properties();
properties.put("HOST_ID", hostId);
properties.put("HOST_IP", hostIp);
properties.put("HOST_PORT", hostPort);
}
Result result = requestRemote(method, args, annotation, count);
if (Objects.nonNull(result)) {
return result;

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.server.common.rpc.client;
import com.github.rholder.retry.RetryListener;
import java.util.Map;
/**
* author: zhangshuguang
* date: 2025-02-17
*/
public interface SnailJobRetryListener extends RetryListener {
/**
* 传递属性信息
*
* @return Map<String, Object>
*/
Map<String, Object> properties();
}

View File

@ -30,7 +30,7 @@ public interface RetryRpcClient {
Result<Boolean> stop(@Body StopRetryRequest stopRetryRequest);
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
Result callback(@Body RetryCallbackDTO retryCallbackDTO);
Result<Boolean> callback(@Body RetryCallbackDTO retryCallbackDTO);
@Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST)
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);

View File

@ -0,0 +1,32 @@
package com.aizuda.snailjob.server.retry.task.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
*
* </p>
*
* @author opensnail
* @date 2025-01-26
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RequestCallbackExecutorDTO extends BaseDTO {
private String clientId;
private Integer routeKey;
private Integer executorTimeout;
private Long deadlineRequest;
private String argsStr;
private String executorName;
private Integer retryCount;
}

View File

@ -13,7 +13,7 @@ import lombok.EqualsAndHashCode;
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RealRetryExecutorDTO extends BaseDTO {
public class RequestRetryExecutorDTO extends BaseDTO {
private String clientId;

View File

@ -0,0 +1,32 @@
package com.aizuda.snailjob.server.retry.task.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
*
* </p>
*
* @author opensnail
* @date 2025-01-26
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RequestStopRetryTaskExecutorDTO extends BaseDTO {
private String clientId;
private Integer routeKey;
private Integer executorTimeout;
private Long deadlineRequest;
private String argsStr;
private String executorName;
private Integer retryCount;
}

View File

@ -110,7 +110,7 @@ public interface RetryTaskConverter {
RetryTask toRetryTask(RetryTaskGeneratorDTO context);
DispatchRetryRequest toDispatchRetryRequest(RealRetryExecutorDTO executorDTO);
DispatchRetryRequest toDispatchRetryRequest(RequestRetryExecutorDTO executorDTO);
@Mappings({
@Mapping(target = "namespaceId", source = "retry.namespaceId"),
@ -119,12 +119,16 @@ public interface RetryTaskConverter {
@Mapping(target = "retryId", source = "retry.id"),
@Mapping(target = "taskType", source = "retry.taskType"),
})
RealRetryExecutorDTO toRealRetryExecutorDTO(RetrySceneConfig execute, Retry retry);
RequestRetryExecutorDTO toRealRetryExecutorDTO(RetrySceneConfig execute, Retry retry);
RealRetryExecutorDTO toRealRetryExecutorDTO(TaskStopJobDTO stopJobDTO);
RequestRetryExecutorDTO toRealRetryExecutorDTO(TaskStopJobDTO stopJobDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestRetryExecutorDTO resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(RequestCallbackExecutorDTO resultDTO);
RetryTaskGeneratorDTO toRetryTaskGeneratorDTO(RetryTaskPrepareDTO jobPrepareDTO);
RetryTaskGeneratorDTO toRetryTaskGeneratorDTO(BlockStrategyContext context);
@ -133,7 +137,9 @@ public interface RetryTaskConverter {
TaskStopJobDTO toTaskStopJobDTO(BlockStrategyContext context);
StopRetryRequest toStopRetryRequest(RealRetryExecutorDTO executorDTO);
StopRetryRequest toStopRetryRequest(RequestCallbackExecutorDTO executorDTO);
StopRetryRequest toStopRetryRequest(RequestStopRetryTaskExecutorDTO executorDTO);
@Mappings({
@Mapping(source = "retry.id", target = "retryId"),
@ -147,7 +153,18 @@ public interface RetryTaskConverter {
RetryTaskExecuteDTO toRetryTaskExecuteDTO(RetryTimerContext context);
JobLogMetaDTO toJobLogDTO(RealRetryExecutorDTO executorDTO);
JobLogMetaDTO toJobLogDTO(RequestRetryExecutorDTO executorDTO);
JobLogMetaDTO toJobLogDTO(RequestCallbackExecutorDTO executorDTO);
RetryResultContext toRetryResultContext(RetryExecutorResultDTO resultDTO);
@Mappings({
@Mapping(target = "namespaceId", source = "retry.namespaceId"),
@Mapping(target = "groupName", source = "retry.groupName"),
@Mapping(target = "sceneName", source = "retry.sceneName"),
@Mapping(target = "retryId", source = "retry.id"),
@Mapping(target = "taskType", source = "retry.taskType"),
})
RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry);
}

View File

@ -1,6 +1,8 @@
package com.aizuda.snailjob.server.retry.task.support;
import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryMergePartitionTaskDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
@ -29,9 +31,13 @@ public interface RetryTaskLogConverter {
RetryTaskLogDTO toRetryTaskLogDTO(Retry retry);
RetryTaskLogDTO toRetryTaskLogDTO(RealRetryExecutorDTO retry);
RetryTaskLogDTO toRetryTaskLogDTO(RequestRetryExecutorDTO retry);
List<RetryMergePartitionTaskDTO> toRetryMergePartitionTaskDTOs(List<RetryTask> retryTaskList);
RetryTaskLogMessage toRetryTaskLogMessage(RetryTaskLogMessage message);
RetryLogMetaDTO toRetryLogMetaDTO(RequestRetryExecutorDTO executorDTO);
RetryLogMetaDTO toRetryLogMetaDTO(RequestCallbackExecutorDTO executorDTO);
}

View File

@ -0,0 +1,181 @@
package com.aizuda.snailjob.server.retry.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.rpc.client.SnailJobRetryListener;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.Objects;
/**
* @author opensnail
* @date 2023-10-06 16:42:08
* @since 2.4.0
*/
@Component(ActorGenerator.REAL_CALLBACK_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class RequestCallbackClientActor extends AbstractActor {
private final RetryTaskMapper retryTaskMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(RequestCallbackExecutorDTO.class, executorDTO -> {
try {
doCallback(executorDTO);
} catch (Exception e) {
log.error("请求客户端发生异常", e);
}
}).build();
}
private void doCallback(RequestCallbackExecutorDTO executorDTO) {
long nowMilli = DateUtils.toNowMilli();
// 检查客户端是否存在
RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(
executorDTO.getGroupName(),
executorDTO.getNamespaceId(),
executorDTO.getClientId()
);
if (Objects.isNull(registerNodeInfo)) {
taskExecuteFailure(executorDTO, "客户端不存在");
JobLogMetaDTO jobLogMetaDTO = RetryTaskConverter.INSTANCE.toJobLogDTO(executorDTO);
jobLogMetaDTO.setTimestamp(nowMilli);
SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. 失败原因: 无可执行的客户端 <|>{}<|>", executorDTO.getRetryTaskId(),
jobLogMetaDTO);
return;
}
StopRetryRequest stopRetryRequest = RetryTaskConverter.INSTANCE.toStopRetryRequest(executorDTO);
try {
// 构建请求客户端对象
RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO);
Result<Boolean> dispatch = rpcClient.stop(stopRetryRequest);
if (dispatch.getStatus() == StatusEnum.YES.getStatus()) {
SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId());
} else {
// 客户端返回失败则认为任务执行失败
SnailJobLog.LOCAL.error("retryTaskId:[{}] 任务调度失败. msg:[{}]", executorDTO.getRetryTaskId(), dispatch.getMessage());
taskExecuteFailure(executorDTO, dispatch.getMessage());
}
} catch (Exception e) {
Throwable throwable;
if (e.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) e;
throwable = re.getLastFailedAttempt().getExceptionCause();
} else if (e.getClass().isAssignableFrom(UndeclaredThrowableException.class)) {
UndeclaredThrowableException re = (UndeclaredThrowableException) e;
throwable = re.getUndeclaredThrowable();
} else {
throwable = e;
}
RetryLogMetaDTO retryTaskLogDTO = RetryTaskLogConverter.INSTANCE.toRetryLogMetaDTO(executorDTO);
retryTaskLogDTO.setTimestamp(nowMilli);
SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. <|>{}<|>", retryTaskLogDTO.getRetryTaskId(),
retryTaskLogDTO, throwable);
taskExecuteFailure(executorDTO, throwable.getMessage());
}
}
public class RetryExecutorRetryListener implements SnailJobRetryListener {
private final Map<String, Object> properties;
private final RequestCallbackExecutorDTO executorDTO;
public RetryExecutorRetryListener(final RequestCallbackExecutorDTO executorDTO) {
this.executorDTO = executorDTO;
this.properties = Maps.newHashMap();
}
@Override
public <V> void onRetry(final Attempt<V> attempt) {
if (attempt.getAttemptNumber() > 0) {
// 更新最新负载节点
String hostId = (String) properties.get("HOST_ID");
String hostIp = (String) properties.get("HOST_IP");
String hostPort = (String) properties.get("HOST_PORT");
RetryTask retryTask = new RetryTask();
retryTask.setId(executorDTO.getRetryTaskId());
RegisterNodeInfo realNodeInfo = new RegisterNodeInfo();
realNodeInfo.setHostIp(hostIp);
realNodeInfo.setHostPort(Integer.valueOf(hostPort));
realNodeInfo.setHostId(hostId);
retryTask.setClientInfo(ClientInfoUtils.generate(realNodeInfo));
retryTaskMapper.updateById(retryTask);
}
}
@Override
public Map<String, Object> properties() {
return properties;
}
}
private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RequestCallbackExecutorDTO executorDTO) {
return RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.failRetry(true)
.failover(true)
.retryTimes(3)
.retryInterval(1)
.routeKey(executorDTO.getRouteKey())
.allocKey(String.valueOf(executorDTO.getRetryTaskId()))
.retryListener(new RetryExecutorRetryListener(executorDTO))
.client(RetryRpcClient.class)
.build();
}
/**
* 更新是执行状态
*
* @param executorDTO RequestRetryExecutorDTO
* @param message 失败原因
*/
private static void taskExecuteFailure(RequestCallbackExecutorDTO executorDTO, String message) {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.retry.task.support.dispatch;
import akka.actor.AbstractActor;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import akka.actor.ActorRef;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
@ -11,22 +11,29 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.rpc.client.SnailJobRetryListener;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.Objects;
/**
@ -37,11 +44,13 @@ import java.util.Objects;
@Component(ActorGenerator.REAL_RETRY_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class RequestRetryClientActor extends AbstractActor {
private final RetryTaskMapper retryTaskMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(RealRetryExecutorDTO.class, realRetryExecutorDTO -> {
return receiveBuilder().match(RequestRetryExecutorDTO.class, realRetryExecutorDTO -> {
try {
doExecute(realRetryExecutorDTO);
} catch (Exception e) {
@ -50,7 +59,7 @@ public class RequestRetryClientActor extends AbstractActor {
}).build();
}
private void doExecute(RealRetryExecutorDTO executorDTO) {
private void doExecute(RequestRetryExecutorDTO executorDTO) {
long nowMilli = DateUtils.toNowMilli();
// 检查客户端是否存在
RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(
@ -82,12 +91,12 @@ public class RequestRetryClientActor extends AbstractActor {
RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO);
Result<Boolean> dispatch = rpcClient.dispatch(dispatchJobRequest, snailJobHeaders);
Boolean data = dispatch.getData();
// todo 是否需要根据DispatchRetryResultDTO
if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.nonNull(data) && data) {
SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId());
} else {
SnailJobLog.LOCAL.error("retryTaskId:[{}] 任务调度失败. msg:[{}]", executorDTO.getRetryTaskId(), dispatch.getMessage());
// 客户端返回失败则认为任务执行失败
taskExecuteFailure(executorDTO, dispatch.getMessage());
}
} catch (Exception e) {
@ -102,66 +111,76 @@ public class RequestRetryClientActor extends AbstractActor {
throwable = e;
}
RetryTaskLogDTO jobLogMetaDTO = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(executorDTO);
// jobLogMetaDTO.setTimestamp(nowMilli);
// if (realJobExecutorDTO.getRetryStatus()) {
// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试 重试次数:[{}]. <|>{}<|>", jobLogMetaDTO.getTaskId(),
// realJobExecutorDTO.getRetryCount(), jobLogMetaDTO, throwable);
// } else {
// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>",
// jobLogMetaDTO.getTaskId(),
// jobLogMetaDTO, throwable);
// }
RetryLogMetaDTO retryTaskLogDTO = RetryTaskLogConverter.INSTANCE.toRetryLogMetaDTO(executorDTO);
retryTaskLogDTO.setTimestamp(nowMilli);
SnailJobLog.REMOTE.error("retryTaskId:[{}] 任务调度失败. <|>{}<|>", retryTaskLogDTO.getRetryTaskId(),
retryTaskLogDTO, throwable);
// taskExecuteFailure(realJobExecutorDTO, throwable.getMessage());
// SnailSpringContext.getContext().publishEvent(
// new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
// .jobTaskBatchId(dispatchJobRequest.getTaskBatchId())
// .reason(throwable.getMessage())
// .notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene())
// .build())
// );
taskExecuteFailure(executorDTO, throwable.getMessage());
}
}
public static class RetryExecutorRetryListener implements RetryListener {
public class RetryExecutorRetryListener implements SnailJobRetryListener {
private final RealRetryExecutorDTO realRetryExecutorDTO;
private final Map<String, Object> properties;
private final RequestRetryExecutorDTO executorDTO;
public RetryExecutorRetryListener(final RealRetryExecutorDTO realJobExecutorDTO) {
this.realRetryExecutorDTO = realJobExecutorDTO;
public RetryExecutorRetryListener(final RequestRetryExecutorDTO realJobExecutorDTO) {
this.executorDTO = realJobExecutorDTO;
this.properties = Maps.newHashMap();
}
@Override
public <V> void onRetry(final Attempt<V> attempt) {
// 负载节点
// todo 重新更新任务的客户端信息
if (!attempt.hasException()) {
//
if (attempt.getAttemptNumber() > 1) {
// 更新最新负载节点
String hostId = (String) properties.get("HOST_ID");
String hostIp = (String) properties.get("HOST_IP");
Integer hostPort = (Integer) properties.get("HOST_PORT");
RetryTask retryTask = new RetryTask();
retryTask.setId(executorDTO.getRetryTaskId());
RegisterNodeInfo realNodeInfo = new RegisterNodeInfo();
realNodeInfo.setHostIp(hostIp);
realNodeInfo.setHostPort(Integer.valueOf(hostPort));
realNodeInfo.setHostId(hostId);
retryTask.setClientInfo(ClientInfoUtils.generate(realNodeInfo));
retryTaskMapper.updateById(retryTask);
}
}
@Override
public Map<String, Object> properties() {
return properties;
}
}
private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealRetryExecutorDTO realRetryExecutorDTO) {
private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RequestRetryExecutorDTO executorDTO) {
return RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.failRetry(true)
.failover(true)
.retryTimes(6)
.retryTimes(3)
.retryInterval(1)
.routeKey(realRetryExecutorDTO.getRouteKey())
.allocKey(String.valueOf(realRetryExecutorDTO.getRetryTaskId()))
.retryListener(new RetryExecutorRetryListener(realRetryExecutorDTO))
.routeKey(executorDTO.getRouteKey())
.allocKey(String.valueOf(executorDTO.getRetryTaskId()))
.retryListener(new RetryExecutorRetryListener(executorDTO))
.client(RetryRpcClient.class)
.build();
}
private static void taskExecuteFailure(RealRetryExecutorDTO realRetryExecutorDTO, String message) {
// ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
// JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(realRetryExecutorDTO);
// jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
// jobExecutorResultDTO.setMessage(message);
// actorRef.tell(jobExecutorResultDTO, actorRef);
/**
* 更新是执行状态
*
* @param executorDTO RequestRetryExecutorDTO
* @param message 失败原因
*/
private static void taskExecuteFailure(RequestRetryExecutorDTO executorDTO, String message) {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -9,21 +9,15 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestStopRetryTaskExecutorDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskLogDTO;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Objects;
/**
* @author opensnail
@ -37,135 +31,52 @@ public class RequestStopClientActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(RealRetryExecutorDTO.class, realRetryExecutorDTO -> {
return receiveBuilder().match(RequestStopRetryTaskExecutorDTO.class, taskExecutorDTO -> {
try {
doExecute(realRetryExecutorDTO);
doStop(taskExecutorDTO);
} catch (Exception e) {
log.error("请求客户端发生异常", e);
}
}).build();
}
private void doExecute(RealRetryExecutorDTO executorDTO) {
long nowMilli = DateUtils.toNowMilli();
private void doStop(RequestStopRetryTaskExecutorDTO executorDTO) {
// 检查客户端是否存在
RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(
executorDTO.getGroupName(),
executorDTO.getNamespaceId(),
executorDTO.getClientId()
);
// if (Objects.isNull(registerNodeInfo)) {
// taskExecuteFailure(executorDTO, "客户端不存在");
// JobLogMetaDTO jobLogMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(executorDTO);
// jobLogMetaDTO.setTimestamp(nowMilli);
// if (executorDTO.getRetryStatus()) {
// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试. 失败原因: 无可执行的客户端. 重试次数:[{}]. <|>{}<|>",
// executorDTO.getTaskId(), executorDTO.getRetryCount(), jobLogMetaDTO);
// } else {
// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. 失败原因: 无可执行的客户端 <|>{}<|>", realJobExecutorDTO.getTaskId(),
// jobLogMetaDTO);
// }
// return;
// }
executorDTO.getClientId());
if (Objects.isNull(registerNodeInfo)) {
return;
}
// 不用关心停止的结果若服务端尝试终止失败,客户端会兜底进行关闭
StopRetryRequest stopRetryRequest = RetryTaskConverter.INSTANCE.toStopRetryRequest(executorDTO);
try {
// 构建请求客户端对象
RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo, executorDTO);
RetryRpcClient rpcClient = buildRpcClient(registerNodeInfo);
Result<Boolean> dispatch = rpcClient.stop(stopRetryRequest);
// todo 是否需要根据DispatchRetryResultDTO
if (dispatch.getStatus() == StatusEnum.YES.getStatus()) {
SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务调度成功.", executorDTO.getRetryTaskId());
SnailJobLog.LOCAL.info("retryTaskId:[{}] 任务停止成功.", executorDTO.getRetryTaskId());
} else {
// 客户端返回失败则认为任务执行失败
// ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType());
// ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO);
// context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
// context.setExecuteResult(ExecuteResult.failure(null, dispatch.getMessage()));
// clientCallback.callback(context);
SnailJobLog.LOCAL.warn("retryTaskId:[{}] 任务停止失败.", executorDTO.getRetryTaskId());
}
} catch (Exception e) {
Throwable throwable;
if (e.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) e;
throwable = re.getLastFailedAttempt().getExceptionCause();
} else if (e.getClass().isAssignableFrom(UndeclaredThrowableException.class)) {
UndeclaredThrowableException re = (UndeclaredThrowableException) e;
throwable = re.getUndeclaredThrowable();
} else {
throwable = e;
}
RetryTaskLogDTO jobLogMetaDTO = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(executorDTO);
// jobLogMetaDTO.setTimestamp(nowMilli);
// if (realJobExecutorDTO.getRetryStatus()) {
// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试 重试次数:[{}]. <|>{}<|>", jobLogMetaDTO.getTaskId(),
// realJobExecutorDTO.getRetryCount(), jobLogMetaDTO, throwable);
// } else {
// SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>",
// jobLogMetaDTO.getTaskId(),
// jobLogMetaDTO, throwable);
// }
// taskExecuteFailure(realJobExecutorDTO, throwable.getMessage());
// SnailSpringContext.getContext().publishEvent(
// new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
// .jobTaskBatchId(dispatchJobRequest.getTaskBatchId())
// .reason(throwable.getMessage())
// .notifyScene(JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene())
// .build())
// );
SnailJobLog.LOCAL.error("retryTaskId:[{}] 任务停止失败.", executorDTO.getRetryTaskId(), e);
}
}
public static class RetryExecutorRetryListener implements RetryListener {
private final RealRetryExecutorDTO realRetryExecutorDTO;
public RetryExecutorRetryListener(final RealRetryExecutorDTO realJobExecutorDTO) {
this.realRetryExecutorDTO = realJobExecutorDTO;
}
@Override
public <V> void onRetry(final Attempt<V> attempt) {
// 负载节点
if (attempt.hasException()) {
// SnailJobLog.LOCAL.error("任务调度失败. taskInstanceId:[{}] count:[{}]",
// realRetryExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause());
// ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType());
// ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO);
// context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
// context.setExecuteResult(ExecuteResult.failure(null, "网络请求失败"));
// clientCallback.callback(context);
}
}
}
private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealRetryExecutorDTO realRetryExecutorDTO) {
// int maxRetryTimes = realRetryExecutorDTO.getMaxRetryTimes();
// boolean retry = realJobExecutorDTO.getRetryStatus();
private RetryRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) {
return RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.failRetry(true)
.failover(true)
.retryTimes(6)
.retryTimes(3)
.retryInterval(1)
.retryListener(new RetryExecutorRetryListener(realRetryExecutorDTO))
.client(RetryRpcClient.class)
.build();
}
private static void taskExecuteFailure(RealRetryExecutorDTO realRetryExecutorDTO, String message) {
// ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
// JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(realRetryExecutorDTO);
// jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
// jobExecutorResultDTO.setMessage(message);
// actorRef.tell(jobExecutorResultDTO, actorRef);
}
}

View File

@ -12,11 +12,13 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestCallbackExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecuteDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
@ -101,13 +103,26 @@ public class RetryExecutor extends AbstractActor {
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(),
ClientInfoUtils.generate(serverNode));
// 请求客户端
RealRetryExecutorDTO realJobExecutor = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(retrySceneConfig, retry);
realJobExecutor.setClientId(serverNode.getHostId());
realJobExecutor.setRetryTaskId(execute.getRetryTaskId());
ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
Object executorDTO;
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
// 请求客户端
RequestCallbackExecutorDTO callbackExecutorDTO = RetryTaskConverter.INSTANCE.toRequestCallbackExecutorDTO(retrySceneConfig, retry);
callbackExecutorDTO.setClientId(serverNode.getHostId());
callbackExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
executorDTO = callbackExecutorDTO;
} else {
// 请求客户端
RequestRetryExecutorDTO retryExecutorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(retrySceneConfig, retry);
retryExecutorDTO.setClientId(serverNode.getHostId());
retryExecutorDTO.setRetryTaskId(execute.getRetryTaskId());
executorDTO = retryExecutorDTO;
}
ActorRef actorRef = ActorGenerator.retryRealTaskExecutorActor();
actorRef.tell(executorDTO, actorRef);
}
private void updateRetryTaskStatus(Long retryTaskId, Integer taskStatus, String clientInfo) {

View File

@ -12,6 +12,7 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
@ -147,11 +148,16 @@ public class ScanRetryActor extends AbstractActor {
}
waitStrategyContext.setNextTriggerAt(DateUtils.toEpochMilli(nextTriggerAt));
// todo 这里区分一下是重试还是回调任务即可
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1);
// 更新触发时间, 任务进入时间轮
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
WaitStrategy waitStrategy;
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(partitionTask.getTaskType())) {
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getCbTriggerType());
} else {
waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
}
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
@ -162,8 +168,6 @@ public class ScanRetryActor extends AbstractActor {
.select(Retry::getId, Retry::getNextTriggerAt, Retry::getGroupName, Retry::getRetryCount,
Retry::getSceneName, Retry::getNamespaceId, Retry::getTaskType)
.eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
// todo
// .eq(Retry::getTaskType, taskActuatorScene().getTaskType().getType())
.in(Retry::getBucketIndex, buckets)
.le(Retry::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
.gt(Retry::getId, startId)

View File

@ -5,7 +5,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.dto.RealRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
@ -39,7 +39,7 @@ public class RetryTaskStopHandler {
retryTask.setOperationReason(stopJobDTO.getOperationReason());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed"));
RealRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
actorRef.tell(executorDTO, actorRef);
}

View File

@ -10,16 +10,11 @@ public class IdempotentHolder {
private IdempotentHolder() {
}
public static RetryIdempotentStrategyHandler getRetryIdempotent() {
return SingletonHolder.RETRY_IDEMPOTENT_INSTANCE;
}
public static TimerIdempotent getTimerIdempotent() {
return SingletonHolder.TIMER_IDEMPOTENT;
}
private static class SingletonHolder {
private static final RetryIdempotentStrategyHandler RETRY_IDEMPOTENT_INSTANCE = new RetryIdempotentStrategyHandler();
private static final TimerIdempotent TIMER_IDEMPOTENT = new TimerIdempotent();
}
}

View File

@ -1,46 +0,0 @@
package com.aizuda.snailjob.server.retry.task.support.idempotent;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 重试任务幂等校验器
*
* @author: opensnail
* @date : 2021-11-23 09:26
*/
@Component
public class RetryIdempotentStrategyHandler implements IdempotentStrategy<String> {
private static final Cache<String, String> cache;
static {
cache = CacheBuilder.newBuilder()
.concurrencyLevel(16) // 并发级别
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}
@Override
public boolean set(String key) {
cache.put(key, key);
return Boolean.TRUE;
}
@Override
public boolean isExist(String key) {
return cache.asMap().containsKey(key);
}
@Override
public boolean clear(String key) {
cache.invalidate(key);
return Boolean.TRUE;
}
}

View File

@ -1,84 +0,0 @@
package com.aizuda.snailjob.server.retry.task.support.retry;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.StopStrategy;
import java.util.*;
/**
* 重试构建
*
* @author: opensnail
* @date : 2021-11-29 18:42
*/
public class RetryBuilder<V> {
private List<StopStrategy> stopStrategies;
private WaitStrategy waitStrategy;
private List<FilterStrategy> filterStrategies;
private RetryContext<V> retryContext;
public static <V> RetryBuilder<V> newBuilder() {
return new RetryBuilder<>();
}
public RetryBuilder<V> withWaitStrategy(WaitStrategy waitStrategy) {
this.waitStrategy = waitStrategy;
return this;
}
public RetryBuilder<V> withFilterStrategy(FilterStrategy filterStrategy) {
if (CollUtil.isEmpty(filterStrategies)) {
filterStrategies = new ArrayList<>();
}
filterStrategies.add(filterStrategy);
return this;
}
public RetryBuilder<V> withStopStrategy(StopStrategy stopStrategy) {
if (CollUtil.isEmpty(stopStrategies)) {
stopStrategies = new ArrayList<>();
}
stopStrategies.add(stopStrategy);
return this;
}
public RetryBuilder<V> withRetryContext(RetryContext<V> retryContext) {
this.retryContext = retryContext;
return this;
}
public RetryExecutor<V> build() {
if (Objects.isNull(waitStrategy)) {
throw new SnailJobServerException("waitStrategy 不能为null");
}
if (Objects.isNull(retryContext)) {
throw new SnailJobServerException("retryContext 不能为null");
}
if (CollUtil.isEmpty(stopStrategies)) {
stopStrategies = Collections.EMPTY_LIST;
} else {
stopStrategies.sort(Comparator.comparingInt(StopStrategy::order));
}
if (CollUtil.isEmpty(filterStrategies)) {
filterStrategies = Collections.EMPTY_LIST;
} else {
filterStrategies.sort(Comparator.comparingInt(FilterStrategy::order));
}
retryContext.setWaitStrategy(waitStrategy);
return new RetryExecutor<V>(stopStrategies, waitStrategy, filterStrategies, retryContext);
}
}

View File

@ -1,133 +0,0 @@
package com.aizuda.snailjob.server.retry.task.support.retry;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO;
import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.StopStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
/**
* 重试执行器
*
* @author: opensnail
* @date : 2021-11-29 18:57
*/
@Slf4j
public class RetryExecutor<V> {
private final List<StopStrategy> stopStrategies;
private final WaitStrategy waitStrategy;
private final List<FilterStrategy> filterStrategies;
private final RetryContext<V> retryContext;
public RetryExecutor(List<StopStrategy> stopStrategies,
WaitStrategy waitStrategy,
List<FilterStrategy> filterStrategies,
RetryContext<V> retryContext) {
this.stopStrategies = stopStrategies;
this.waitStrategy = waitStrategy;
this.filterStrategies = filterStrategies;
this.retryContext = retryContext;
}
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter() {
for (FilterStrategy filterStrategy : filterStrategies) {
Pair<Boolean, StringBuilder> pair = filterStrategy.filter(retryContext);
if (!pair.getKey()) {
return pair;
}
}
return Pair.of(Boolean.TRUE, new StringBuilder());
}
/**
* 重试执行
*
* @param callable 重试执行回调
* @return 重试结果
* @throws Exception
*/
public V call(Callable<V> callable) throws Exception {
// 这里调用客户端可能会出现网络异常
V call = null;
try {
call = callable.call();
retryContext.setCallResult(call);
} catch (Exception e) {
RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryContext.getRetryTask());
SnailJobLog.REMOTE.error("请求客户端执行失败. uniqueId:[{}] <|>{}<|>", e);
retryContext.setException(e);
}
boolean isStop = Boolean.TRUE;
// 触发停止策略判断
for (StopStrategy stopStrategy : stopStrategies) {
if (stopStrategy.supports(retryContext)) {
// 必须责任链中的所有停止策略都判断为停止此时才判定为重试完成
if (!stopStrategy.shouldStop(retryContext)) {
isStop = Boolean.FALSE;
}
}
}
ActorRef actorRef;
if (isStop) {
// 状态变为完成 FinishActor
actorRef = ActorGenerator.finishActor();
} else {
// 失败 FailureActor
actorRef = ActorGenerator.failureActor();
}
// 获取失败原因
String reason = StrUtil.EMPTY;
if (retryContext.hasException()) {
Exception exception = retryContext.getException();
if (Objects.nonNull(exception)) {
reason = exception.getCause().getMessage();
}
} else {
if (Objects.nonNull(call)) {
Result<DispatchRetryResultDTO> result = (Result<DispatchRetryResultDTO>) call;
DispatchRetryResultDTO data = result.getData();
if (StrUtil.isBlank(result.getMessage()) && Objects.nonNull(data)) {
reason = data.getExceptionMsg();
} else {
reason = result.getMessage();
}
}
}
RetryTaskExecutorDTO retryTaskExecutorDTO =
RetryTaskConverter.INSTANCE.toRetryTaskExecutorDTO(
retryContext.getRetryTask(), reason,
RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR.getNotifyScene());
actorRef.tell(retryTaskExecutorDTO, actorRef);
return call;
}
public RetryContext<V> getRetryContext() {
return retryContext;
}
}

View File

@ -1,277 +0,0 @@
package com.aizuda.snailjob.server.retry.task.support.strategy;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 生成 {@link FilterStrategy} 实例.
*
* @author: opensnail
* @date : 2021-11-30 10:03
*/
@Slf4j
@Deprecated
public class FilterStrategies {
private FilterStrategies() {
}
/**
* 触发时间过滤策略
*
* @return {@link TriggerAtFilterStrategies} 触发时间过滤策略
*/
@Deprecated
public static FilterStrategy triggerAtFilter() {
return new TriggerAtFilterStrategies();
}
/**
* 使用BitSet幂等的过滤策略
*
* @return {@link BitSetIdempotentFilterStrategies} BitSet幂等的过滤策略
*/
public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy<String> idempotentStrategy) {
return new BitSetIdempotentFilterStrategies(idempotentStrategy);
}
/**
* 场景黑名单策略
*
* @return {@link SceneBlackFilterStrategies} 场景黑名单策略
*/
public static FilterStrategy sceneBlackFilter() {
return new SceneBlackFilterStrategies();
}
/**
* 检查是否存在存活的客户端POD
*
* @return {@link CheckAliveClientPodFilterStrategies} 客户端存活POD检查策略
*/
public static FilterStrategy checkAliveClientPodFilter() {
return new CheckAliveClientPodFilterStrategies();
}
/**
* 检查分配的客户端是否达到限流阈值
*
* @return {@link RateLimiterFilterStrategies} 检查分配的客户端是否达到限流阈值
*/
public static FilterStrategy rateLimiterFilter() {
return new RateLimiterFilterStrategies();
}
/**
* 正在rebalance时不允许下发重试流量
*
* @return {@link ReBalanceFilterStrategies} 正在rebalance时不允许下发重试流量
*/
public static FilterStrategy rebalanceFilterStrategies() {
return new ReBalanceFilterStrategies();
}
/**
* 触发时间过滤策略
* <p>
* 根据延迟等级的时间计算下次触发时间是否小于当前时间满足则返回true 否则返回false
*/
private static final class TriggerAtFilterStrategies implements FilterStrategy {
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
Retry retry = retryContext.getRetryTask();
Long nextTriggerAt = retry.getNextTriggerAt();
// boolean result = nextTriggerAt.isBefore(LocalDateTime.now());
// StringBuilder description = new StringBuilder();
// if (!result) {
// description.append(MessageFormat.format("未到触发时间. uniqueId:[{0}]", retry.getId()));
// }
return null;
}
@Override
public int order() {
return 0;
}
}
/**
* 使用BitSet幂等的过滤策略
* <p>
* 判断BitSet中是否存在若存在则放回false 否则返回true
*/
private static final class BitSetIdempotentFilterStrategies implements FilterStrategy {
private final IdempotentStrategy<String> idempotentStrategy;
public BitSetIdempotentFilterStrategies(IdempotentStrategy<String> idempotentStrategy) {
this.idempotentStrategy = idempotentStrategy;
}
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
Retry retry = retryContext.getRetryTask();
boolean result = !idempotentStrategy.isExist(ImmutableTriple.of(retry.getGroupName(), retry.getNamespaceId(), retry.getId()).toString());
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("存在执行中的任务.uniqueId:[{0}]", retry.getId()));
}
return Pair.of(result, description);
}
@Override
public int order() {
return 1;
}
}
/**
* 场景黑名单策略
* <p>
* 如果重试的数据在黑名单中的则返回false 否则为true
*/
private static final class SceneBlackFilterStrategies implements FilterStrategy {
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
Retry retry = retryContext.getRetryTask();
boolean result = !retryContext.getSceneBlacklist().contains(retry.getSceneName());
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("场景:[{0}]在黑名单中, 不允许执行.", retry.getSceneName()));
}
return Pair.of(result, description);
}
@Override
public int order() {
return 2;
}
}
/**
* 检查是否存在存活的客户端POD
*/
private static final class CheckAliveClientPodFilterStrategies implements FilterStrategy {
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
Retry retry = retryContext.getRetryTask();
RegisterNodeInfo serverNode = retryContext.getServerNode();
boolean result;
StringBuilder description = new StringBuilder();
if (Objects.isNull(serverNode)) {
result = false;
description.append(MessageFormat.format("没有可执行的客户端节点. uniqueId:[{0}]", retry.getId()));
} else {
ServerNodeMapper serverNodeMapper = SnailSpringContext.getBeanByType(ServerNodeMapper.class);
result = 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, serverNode.getHostId()));
if (!result) {
// 删除缓存中的失效节点
CacheRegisterTable.remove(retry.getGroupName(), retry.getNamespaceId(), serverNode.getHostId());
description.append(MessageFormat.format("DB中未查询到客户端节点. hostId:[{0}] uniqueId:[{1}]", serverNode.getHostId(), retry.getId()));
}
}
if (result == false) {
RetryTaskFailAlarmEventDTO toRetryTaskFailAlarmEventDTO =
RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
retry,
description.toString(),
RetryNotifySceneEnum.RETRY_NO_CLIENT_NODES_ERROR.getNotifyScene());
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(toRetryTaskFailAlarmEventDTO));
}
return Pair.of(result, description);
}
@Override
public int order() {
return 3;
}
}
/**
* 检查是否存在存活的客户端POD
*/
private static final class RateLimiterFilterStrategies implements FilterStrategy {
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
RegisterNodeInfo serverNode = retryContext.getServerNode();
Retry retry = retryContext.getRetryTask();
StringBuilder description = new StringBuilder();
Boolean result = Boolean.TRUE;
RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId());
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
SnailJobLog.LOCAL.error("该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId());
description.append(MessageFormat.format("该POD:[{0}]已到达最大限流阈值,本次重试不执行.uniqueId:[{1}]", serverNode.getHostId(), retry.getId()));
result = Boolean.FALSE;
}
return Pair.of(result, description);
}
@Override
public int order() {
return 4;
}
}
/**
* rebalance中数据不进行重试
*/
private static final class ReBalanceFilterStrategies implements FilterStrategy {
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
Retry retry = retryContext.getRetryTask();
boolean result = !DistributeInstance.RE_BALANCE_ING.get();
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("系统Rebalancing中数据无法重试.uniqueId:[{0}]", retry.getId()));
}
return Pair.of(result, description);
}
@Override
public int order() {
return 1;
}
}
}

View File

@ -1,140 +0,0 @@
package com.aizuda.snailjob.server.retry.task.support.strategy;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.StopStrategy;
import com.aizuda.snailjob.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import java.util.Objects;
/**
* 生成 {@link StopStrategy} 实例.
*
* @author: opensnail
* @date : 2021-11-29 19:22
*/
public class StopStrategies {
private StopStrategies() {
}
/**
* 调用客户端发生异常触发停止策略
*
* @return {@link ExceptionStopStrategy} 调用客户端发生异常触发停止策略
*/
public static StopStrategy stopException() {
return new ExceptionStopStrategy();
}
/**
* 根据客户端返回状态判断是否终止重试
*
* @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略
*/
public static StopStrategy stopResultStatus() {
return new ResultStatusStopStrategy();
}
/**
* 根据客户端返回结果对象的状态码判断是否终止重试
*
* @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略
*/
public static StopStrategy stopResultStatusCode() {
return new ResultStatusCodeStopStrategy();
}
/**
* 调用客户端发生异常触发停止策略
*/
private static final class ExceptionStopStrategy implements StopStrategy {
@Override
public boolean shouldStop(RetryContext retryContext) {
return !retryContext.hasException();
}
@Override
public boolean supports(RetryContext retryContext) {
return true;
}
@Override
public int order() {
return 1;
}
}
/**
* 根据客户端返回的状态码判断是否应该停止
* <p>
* 1{@link Result#getStatus()} 不为1 则继续重试
*/
private static final class ResultStatusStopStrategy implements StopStrategy {
@Override
public boolean shouldStop(RetryContext retryContext) {
Result response = (Result) retryContext.getCallResult();
if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) {
return Boolean.FALSE;
}
return Boolean.TRUE;
}
@Override
public boolean supports(RetryContext retryContext) {
return true;
}
@Override
public int order() {
return 2;
}
}
/**
* 根据客户端返回结果集判断是否应该停止
* <p>
* 1{@link Result#getStatus()} 不为1 则继续重试
* 2根据{@link Result#getData()}中的statusCode判断是否停止
*/
private static final class ResultStatusCodeStopStrategy implements StopStrategy {
@Override
public boolean shouldStop(RetryContext retryContext) {
Result<DispatchRetryResultDTO> response = (Result<DispatchRetryResultDTO>) retryContext.getCallResult();
if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) {
return Boolean.FALSE;
}
DispatchRetryResultDTO data = response.getData();
if (Objects.isNull(data)) {
return Boolean.FALSE;
}
Integer statusCode = data.getStatusCode();
Integer status = RetryResultStatusEnum.getRetryResultStatusEnum(statusCode).getStatus();
return RetryResultStatusEnum.SUCCESS.getStatus().equals(status) || RetryResultStatusEnum.STOP.getStatus().equals(status);
}
@Override
public boolean supports(RetryContext retryContext) {
return retryContext instanceof MaxAttemptsPersistenceRetryContext;
}
@Override
public int order() {
return 3;
}
}
}

View File

@ -4,7 +4,7 @@ import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.web.annotation.LoginRequired;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
@ -19,34 +19,34 @@ import java.util.List;
* @date 2022-02-27
*/
@RestController
@RequestMapping("/retry-task")
public class RetryTaskController {
@RequestMapping("/retry")
public class RetryController {
@Autowired
private RetryTaskService retryTaskService;
@LoginRequired
@GetMapping("list")
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
return retryTaskService.getRetryTaskPage(queryVO);
public PageResult<List<RetryResponseVO>> getRetryTaskPage(RetryQueryVO queryVO) {
return retryTaskService.getRetryPage(queryVO);
}
@LoginRequired
@GetMapping("{id}")
public RetryTaskResponseVO getRetryTaskById(@RequestParam("groupName") String groupName,
@PathVariable("id") Long id) {
return retryTaskService.getRetryTaskById(groupName, id);
public RetryResponseVO getRetryTaskById(@RequestParam("groupName") String groupName,
@PathVariable("id") Long id) {
return retryTaskService.getRetryById(groupName, id);
}
@LoginRequired
@PutMapping("status")
public int updateRetryTaskStatus(@RequestBody RetryTaskUpdateStatusRequestVO retryTaskUpdateStatusRequestVO) {
return retryTaskService.updateRetryTaskStatus(retryTaskUpdateStatusRequestVO);
public int updateRetryTaskStatus(@RequestBody RetryUpdateStatusRequestVO retryUpdateStatusRequestVO) {
return retryTaskService.updateRetryTaskStatus(retryUpdateStatusRequestVO);
}
@LoginRequired
@PostMapping
public int saveRetryTask(@RequestBody @Validated RetryTaskSaveRequestVO retryTaskRequestVO) {
public int saveRetryTask(@RequestBody @Validated RetrySaveRequestVO retryTaskRequestVO) {
return retryTaskService.saveRetryTask(retryTaskRequestVO);
}
@ -58,14 +58,14 @@ public class RetryTaskController {
@LoginRequired
@PutMapping("/batch")
public Integer updateRetryTaskExecutorName(@RequestBody @Validated RetryTaskUpdateExecutorNameRequestVO requestVO) {
return retryTaskService.updateRetryTaskExecutorName(requestVO);
public Integer updateRetryTaskExecutorName(@RequestBody @Validated RetryUpdateExecutorNameRequestVO requestVO) {
return retryTaskService.updateRetryExecutorName(requestVO);
}
@LoginRequired
@DeleteMapping("/batch")
public boolean batchDeleteRetryTask(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) {
return retryTaskService.batchDeleteRetryTask(requestVO);
return retryTaskService.batchDeleteRetry(requestVO);
}
@LoginRequired
@ -77,12 +77,12 @@ public class RetryTaskController {
@LoginRequired
@PostMapping("/manual/trigger/retry/task")
public boolean manualTriggerRetryTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) {
return retryTaskService.manualTriggerRetryTask(requestVO);
return retryTaskService.manualTriggerRetry(requestVO);
}
@LoginRequired
@PostMapping("/manual/trigger/callback/task")
public boolean manualTriggerCallbackTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) {
return retryTaskService.manualTriggerCallbackTask(requestVO);
return retryTaskService.manualTriggerCallback(requestVO);
}
}

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.web.controller;
import com.aizuda.snailjob.server.web.annotation.LoginRequired;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskLogService;
@ -21,7 +21,7 @@ import java.util.Set;
* @date 2022-02-27
*/
@RestController
@RequestMapping("/retry-task-log")
@RequestMapping("/retry-task")
public class RetryTaskLogController {
@Autowired
@ -29,7 +29,7 @@ public class RetryTaskLogController {
@LoginRequired
@GetMapping("list")
public PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskLogQueryVO queryVO) {
public PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO) {
return retryTaskLogService.getRetryTaskLogPage(queryVO);
}

View File

@ -4,15 +4,14 @@ import com.aizuda.snailjob.server.web.model.base.BaseQueryVO;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author: opensnail
* @date : 2022-02-28 09:08
* @author opensnail
* @date 2022-02-27
* @since 2.0
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class RetryTaskLogQueryVO extends BaseQueryVO {
public class RetryQueryVO extends BaseQueryVO {
private String groupName;
@ -22,11 +21,7 @@ public class RetryTaskLogQueryVO extends BaseQueryVO {
private String idempotentId;
private String uniqueId;
private Integer retryStatus;
private LocalDateTime beginDate;
private LocalDateTime endDate;
private Long retryId;
}

View File

@ -12,7 +12,7 @@ import org.hibernate.validator.constraints.NotBlank;
* @since 2.0
*/
@Data
public class RetryTaskSaveRequestVO {
public class RetrySaveRequestVO {
/**
* 组名称

View File

@ -4,10 +4,11 @@ import com.aizuda.snailjob.server.web.model.base.BaseQueryVO;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author opensnail
* @date 2022-02-27
* @since 2.0
* @author: opensnail
* @date : 2022-02-28 09:08
*/
@Data
@EqualsAndHashCode(callSuper = true)
@ -21,7 +22,11 @@ public class RetryTaskQueryVO extends BaseQueryVO {
private String idempotentId;
private Long retryId;
private Integer retryStatus;
private Long retryId;
private LocalDateTime beginDate;
private LocalDateTime endDate;
}

View File

@ -15,7 +15,7 @@ import java.util.List;
* @date 2022-09-29
*/
@Data
public class RetryTaskUpdateExecutorNameRequestVO {
public class RetryUpdateExecutorNameRequestVO {
/**
* 组名称

View File

@ -12,7 +12,7 @@ import lombok.Data;
* @date 2022-09-29
*/
@Data
public class RetryTaskUpdateStatusRequestVO {
public class RetryUpdateStatusRequestVO {
/**
* 重试状态 {@link RetryStatusEnum}

View File

@ -82,7 +82,7 @@ public class SceneConfigRequestVO {
/**
* 回调的最大执行次数
*/
private int cbMaxCount = 288;
private int cbMaxCount;
/**
* 回调间隔时间

View File

@ -10,7 +10,7 @@ import java.time.LocalDateTime;
* @since 2.0
*/
@Data
public class RetryTaskResponseVO {
public class RetryResponseVO {
private Long id;
@ -42,4 +42,6 @@ public class RetryTaskResponseVO {
private LocalDateTime updateDt;
private RetryResponseVO children;
}

View File

@ -13,28 +13,24 @@ public class RetryTaskLogResponseVO {
private Long id;
private String uniqueId;
private String groupName;
private String sceneName;
private String idempotentId;
private Integer taskStatus;
private String bizNo;
private Long retryId;
private String executorName;
private String argsStr;
private String extAttrs;
private LocalDateTime nextTriggerAt;
private Integer retryStatus;
private Integer taskType;
private LocalDateTime createDt;
private Integer operationReason;
/**
* 客户端ID
*/
private String clientInfo;
}

View File

@ -42,4 +42,24 @@ public class SceneConfigResponseVO {
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
/**
* 回调状态 0不开启 1开启
*/
private Integer cbStatus;
/**
* 回调触发类型
*/
private Integer cbTriggerType;
/**
* 回调的最大执行次数
*/
private int cbMaxCount;
/**
* 回调间隔时间
*/
private String cbTriggerInterval;
}

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO;
@ -16,7 +16,7 @@ import java.util.Set;
*/
public interface RetryTaskLogService {
PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskLogQueryVO queryVO);
PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO);
RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO);

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import java.util.List;
@ -13,7 +13,7 @@ import java.util.List;
*/
public interface RetryTaskService {
PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO);
PageResult<List<RetryResponseVO>> getRetryPage(RetryQueryVO queryVO);
/**
* 通过重试任务表id获取重试任务信息
@ -22,23 +22,23 @@ public interface RetryTaskService {
* @param id 重试任务表id
* @return 重试任务
*/
RetryTaskResponseVO getRetryTaskById(String groupName, Long id);
RetryResponseVO getRetryById(String groupName, Long id);
/**
* 更新重试任务状态
*
* @param retryTaskUpdateStatusRequestVO 更新重试任务状态请求模型
* @param retryUpdateStatusRequestVO 更新重试任务状态请求模型
* @return
*/
int updateRetryTaskStatus(RetryTaskUpdateStatusRequestVO retryTaskUpdateStatusRequestVO);
int updateRetryTaskStatus(RetryUpdateStatusRequestVO retryUpdateStatusRequestVO);
/**
* 手动新增重试任务
*
* @param retryTaskRequestVO {@link RetryTaskSaveRequestVO} 重试数据模型
* @param retryTaskRequestVO {@link RetrySaveRequestVO} 重试数据模型
* @return
*/
int saveRetryTask(RetryTaskSaveRequestVO retryTaskRequestVO);
int saveRetryTask(RetrySaveRequestVO retryTaskRequestVO);
/**
* 委托客户端生成idempotentId
@ -54,7 +54,7 @@ public interface RetryTaskService {
* @param requestVO 更新执行器变更模型
* @return 更新条数
*/
int updateRetryTaskExecutorName(RetryTaskUpdateExecutorNameRequestVO requestVO);
int updateRetryExecutorName(RetryUpdateExecutorNameRequestVO requestVO);
/**
* 批量删除重试数据
@ -62,7 +62,7 @@ public interface RetryTaskService {
* @param requestVO 批量删除重试数据
* @return
*/
boolean batchDeleteRetryTask(BatchDeleteRetryTaskVO requestVO);
boolean batchDeleteRetry(BatchDeleteRetryTaskVO requestVO);
/**
* 解析日志
@ -78,7 +78,7 @@ public interface RetryTaskService {
* @param requestVO
* @return
*/
boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO);
boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO);
/**
* 手动执行回调任务
@ -86,5 +86,5 @@ public interface RetryTaskService {
* @param requestVO
* @return
*/
boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO);
boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO);
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
@ -17,7 +17,7 @@ public interface RetryTaskResponseVOConverter {
RetryTaskResponseVOConverter INSTANCE = Mappers.getMapper(RetryTaskResponseVOConverter.class);
RetryTaskResponseVO convert(Retry retry);
RetryResponseVO convert(Retry retry);
List<RetryTaskResponseVO> convertList(List<Retry> retries);
List<RetryResponseVO> convertList(List<Retry> retries);
}

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskContext;
import com.aizuda.snailjob.server.web.model.request.RetryTaskSaveRequestVO;
import com.aizuda.snailjob.server.web.model.request.RetrySaveRequestVO;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
@ -17,7 +17,7 @@ import java.util.List;
public interface TaskContextConverter {
TaskContextConverter INSTANCE = Mappers.getMapper(TaskContextConverter.class);
TaskContext.TaskInfo convert(RetryTaskSaveRequestVO retryTaskSaveRequestVO);
TaskContext.TaskInfo convert(RetrySaveRequestVO retrySaveRequestVO);
List<TaskContext.TaskInfo> convert(List<RetryTaskDTO> retryTasks);
}

View File

@ -10,7 +10,7 @@ import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO;
@ -43,37 +43,30 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService {
private final RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskLogQueryVO queryVO) {
public PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO) {
PageDTO<RetryTask> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
// UserSessionVO userSessionVO = UserSessionUtils.currentUserSession();
// String namespaceId = userSessionVO.getNamespaceId();
//
// List<String> groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName());
//
// LambdaQueryWrapper<RetryTask> retryTaskLogLambdaQueryWrapper = new LambdaQueryWrapper<RetryTask>()
// .eq(RetryTask::getNamespaceId, namespaceId)
// .in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames)
// .eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName())
// .eq(StrUtil.isNotBlank(queryVO.getBizNo()), RetryTask::getBizNo, queryVO.getBizNo())
// .eq(StrUtil.isNotBlank(queryVO.getUniqueId()), RetryTask::getUniqueId, queryVO.getUniqueId())
// .eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), RetryTask::getIdempotentId, queryVO.getIdempotentId())
// .eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus())
// .between(ObjUtil.isNotNull(queryVO.getDatetimeRange()),
// RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt())
// .select(RetryTask::getGroupName, RetryTask::getId,
// RetryTask::getSceneName,
// RetryTask::getIdempotentId, RetryTask::getBizNo, RetryTask::getTaskStatus,
// RetryTask::getCreateDt, RetryTask::getUniqueId, RetryTask::getTaskType)
// .orderByDesc(RetryTask::getCreateDt);
// PageDTO<RetryTask> retryTaskLogPageDTO = retryTaskMapper.selectPage(pageDTO,
// retryTaskLogLambdaQueryWrapper);
UserSessionVO userSessionVO = UserSessionUtils.currentUserSession();
String namespaceId = userSessionVO.getNamespaceId();
// return new PageResult<>(
// retryTaskLogPageDTO,
// RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskLogPageDTO.getRecords()));
List<String> groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName());
LambdaQueryWrapper<RetryTask> wrapper = new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames)
.eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName())
.eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus())
.eq(Objects.nonNull(queryVO.getRetryId()), RetryTask::getRetryId, queryVO.getRetryId())
.between(ObjUtil.isNotNull(queryVO.getDatetimeRange()),
RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt())
.select(RetryTask::getGroupName, RetryTask::getId, RetryTask::getSceneName, RetryTask::getTaskStatus,
RetryTask::getCreateDt, RetryTask::getTaskType, RetryTask::getOperationReason)
.orderByDesc(RetryTask::getCreateDt);
PageDTO<RetryTask> retryTaskPageDTO = retryTaskMapper.selectPage(pageDTO, wrapper);
return new PageResult<>(retryTaskPageDTO,
RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskPageDTO.getRecords()));
return null;
}
@Override

View File

@ -29,7 +29,7 @@ import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskGenerat
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskService;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter;
import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter;
@ -71,14 +71,11 @@ public class RetryTaskServiceImpl implements RetryTaskService {
@Autowired
@Lazy
private List<TaskGenerator> taskGenerators;
// @Lazy
// @Autowired
// private List<TaskExecutor> taskExecutors;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
public PageResult<List<RetryResponseVO>> getRetryPage(RetryQueryVO queryVO) {
PageDTO<Retry> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
@ -102,12 +99,22 @@ public class RetryTaskServiceImpl implements RetryTaskService {
Retry::getTaskType)
.orderByDesc(Retry::getCreateDt);
pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper);
return new PageResult<>(pageDTO,
RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords()));
Set<Long> ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId);
List<Retry> callbackTaskList = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>().eq(Retry::getParentId, ids));
Map<Long, Retry> callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
List<RetryResponseVO> retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords());
for (RetryResponseVO retryResponseVO : retryResponseList) {
retryResponseVO.setChildren(RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId())));
}
return new PageResult<>(pageDTO, retryResponseList);
}
@Override
public RetryTaskResponseVO getRetryTaskById(String groupName, Long id) {
public RetryResponseVO getRetryById(String groupName, Long id) {
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
Retry retry = retryTaskAccess.one(new LambdaQueryWrapper<Retry>().eq(Retry::getId, id));
return RetryTaskResponseVOConverter.INSTANCE.convert(retry);
@ -115,7 +122,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
@Override
@Transactional
public int updateRetryTaskStatus(RetryTaskUpdateStatusRequestVO requestVO) {
public int updateRetryTaskStatus(RetryUpdateStatusRequestVO requestVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(requestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
@ -166,7 +173,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
}
@Override
public int saveRetryTask(final RetryTaskSaveRequestVO retryTaskRequestVO) {
public int saveRetryTask(final RetrySaveRequestVO retryTaskRequestVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskRequestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new SnailJobServerException("重试状态错误");
@ -230,7 +237,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
}
@Override
public int updateRetryTaskExecutorName(final RetryTaskUpdateExecutorNameRequestVO requestVO) {
public int updateRetryExecutorName(final RetryUpdateExecutorNameRequestVO requestVO) {
Retry retry = new Retry();
retry.setExecutorName(requestVO.getExecutorName());
@ -248,7 +255,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
@Override
@Transactional
public boolean batchDeleteRetryTask(final BatchDeleteRetryTaskVO requestVO) {
public boolean batchDeleteRetry(final BatchDeleteRetryTaskVO requestVO) {
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
@ -342,7 +349,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
}
@Override
public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) {
public boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
@ -375,7 +382,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
}
@Override
public boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO) {
public boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO) {
List<String> uniqueIds = requestVO.getUniqueIds();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();