feat(1.4.0-beta1): 1.优化重试结果处理器 2. 支持任务停止

This commit is contained in:
opensnail 2025-02-22 21:35:31 +08:00
parent 7def7cecb2
commit d252e46d76
39 changed files with 1015 additions and 892 deletions

View File

@ -4,9 +4,11 @@ import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.core.context.CallbackContext;
import com.aizuda.snailjob.client.core.client.RetryClient;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.DispatchCallbackResultRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.client.model.request.RetryCallbackResultRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -44,9 +46,9 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback<Boolea
@Override
public void onSuccess(Boolean result) {
try {
DispatchRetryResultRequest request = buildDispatchRetryResultRequest();
request.setStatusCode(RetryResultStatusEnum.SUCCESS.getStatus());
CLIENT.dispatchResult(request);
DispatchCallbackResultRequest request = buildDispatchRetryResultRequest();
request.setTaskStatus(RetryTaskStatusEnum.SUCCESS.getStatus());
CLIENT.callbackResult(request);
} catch (Exception e) {
SnailJobLog.REMOTE.error("回调执行结果上报异常.[{}]", context.getRetryTaskId(), e);
@ -61,17 +63,17 @@ public class CallbackTaskExecutorFutureCallback implements FutureCallback<Boolea
return;
}
try {
DispatchRetryResultRequest request = buildDispatchRetryResultRequest();
request.setStatusCode(RetryResultStatusEnum.FAILURE.getStatus());
DispatchCallbackResultRequest request = buildDispatchRetryResultRequest();
request.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
request.setExceptionMsg(t.getMessage());
CLIENT.dispatchResult(request);
CLIENT.callbackResult(request);
} catch (Exception e) {
SnailJobLog.REMOTE.error("回调执行结果上报异常.[{}]", context.getRetryTaskId(), e);
}
}
private DispatchRetryResultRequest buildDispatchRetryResultRequest() {
DispatchRetryResultRequest request = new DispatchRetryResultRequest();
private DispatchCallbackResultRequest buildDispatchRetryResultRequest() {
DispatchCallbackResultRequest request = new DispatchCallbackResultRequest();
request.setRetryTaskId(context.getRetryTaskId());
request.setNamespaceId(context.getNamespaceId());
request.setGroupName(context.getGroupName());

View File

@ -6,6 +6,7 @@ import com.aizuda.snailjob.client.core.client.RetryClient;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -44,7 +45,15 @@ public class RetryTaskExecutorFutureCallback implements FutureCallback<DispatchR
public void onSuccess(DispatchRetryResultDTO result) {
try {
CLIENT.dispatchResult(buildDispatchRetryResultRequest(result));
DispatchRetryResultRequest request = buildDispatchRetryResultRequest(result);
if (RetryResultStatusEnum.SUCCESS.getStatus().equals(result.getStatusCode())) {
request.setTaskStatus(RetryTaskStatusEnum.SUCCESS.getStatus());
} else if (RetryResultStatusEnum.STOP.getStatus().equals(result.getStatusCode())) {
request.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
} else {
request.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
}
CLIENT.dispatchResult(request);
} catch (Exception e) {
SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", retryContext.getRetryTaskId(), e);
}
@ -62,7 +71,7 @@ public class RetryTaskExecutorFutureCallback implements FutureCallback<DispatchR
try {
DispatchRetryResultRequest request = buildDispatchRetryResultRequest(null);
request.setExceptionMsg(t.getMessage());
request.setStatusCode(RetryResultStatusEnum.FAILURE.getStatus());
request.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
CLIENT.dispatchResult(request);
} catch (Exception e) {
SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", retryContext.getRetryTaskId(), e);
@ -82,7 +91,6 @@ public class RetryTaskExecutorFutureCallback implements FutureCallback<DispatchR
if (Objects.nonNull(result)) {
request.setResult(result.getResultJson());
request.setExceptionMsg(result.getExceptionMsg());
request.setStatusCode(result.getStatusCode());
}
return request;
}

View File

@ -2,10 +2,7 @@ package com.aizuda.snailjob.client.core.client;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.client.model.request.RetryCallbackResultRequest;
import com.aizuda.snailjob.client.model.request.*;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.Result;
@ -21,4 +18,6 @@ public interface RetryClient {
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_RETRY_DISPATCH_RESULT)
Result dispatchResult(DispatchRetryResultRequest request);
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_CALLBACK_RESULT)
Result callbackResult(DispatchCallbackResultRequest request);
}

View File

@ -111,7 +111,7 @@ public class SnailRetryEndPoint implements Lifecycle {
// 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(request.getRetryTaskId()), request.getExecutorTimeout(), TimeUnit.SECONDS);
SnailJobLog.REMOTE.info("重试任务:[{}] 任务调度成功. ", request.getRetryTaskId());
SnailJobLog.REMOTE.info("重试任务:[{}] 调度成功. ", request.getRetryTaskId());
return new Result<>(Boolean.TRUE);
}
@ -157,6 +157,7 @@ public class SnailRetryEndPoint implements Lifecycle {
// 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(callbackDTO.getRetryTaskId()), callbackDTO.getExecutorTimeout(), TimeUnit.SECONDS);
SnailJobLog.REMOTE.info("回调任务:[{}] 调度成功. ", callbackDTO.getRetryTaskId());
return new Result<>(Boolean.TRUE);
}

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.client.model.request;
import lombok.Data;
/**
* @author: opensnail
* @date : 2023-09-26 15:10
*/
@Data
public class DispatchCallbackResultRequest {
private String namespaceId;
private String groupName;
private String sceneName;
private Long retryId;
private Long retryTaskId;
private Integer taskStatus;
private String exceptionMsg;
}

View File

@ -14,8 +14,8 @@ public class DispatchRetryResultRequest {
private String sceneName;
private Long retryId;
private Long retryTaskId;
private Integer taskStatus;
private String result;
private Integer statusCode;
private String exceptionMsg;
}

View File

@ -80,6 +80,11 @@ public interface SystemConstants {
*/
String REPORT_RETRY_DISPATCH_RESULT = "/report/retry/dispatch/result";
/**
* 上报客户端回调执行结果
*/
String REPORT_CALLBACK_RESULT = "/report/retry/callback/result";
/**
* 批量日志上报
*/

View File

@ -4,9 +4,6 @@ import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
/**
* 标识某个操作的具体原因
*
@ -21,9 +18,9 @@ public enum RetryOperationReasonEnum {
NONE(0, StrUtil.EMPTY),
TASK_EXECUTION_TIMEOUT(1, "任务执行超时"),
NOT_CLIENT(2, "无客户端节点"),
JOB_CLOSED(3, "JOB已关闭"),
JOB_DISCARD(4, "任务丢弃"),
JOB_OVERLAY(5, "任务被覆盖"),
RETRY_SUSPEND(3, "重试已暂停"),
RETRY_TASK_DISCARD(4, "任务丢弃"),
RETRY_TASK_OVERLAY(5, "任务被覆盖"),
TASK_EXECUTION_ERROR(6, "任务执行期间发生非预期异常"),
MANNER_STOP(7, "手动停止"),
NOT_RUNNING_RETRY(8, "当前重试非运行中"),
@ -35,9 +32,13 @@ public enum RetryOperationReasonEnum {
private final int reason;
private final String desc;
public static RetryOperationReasonEnum getWorkflowNotifyScene(Integer notifyScene) {
public static RetryOperationReasonEnum of(Integer operationReason) {
if (operationReason == null) {
return NONE;
}
for (RetryOperationReasonEnum sceneEnum : RetryOperationReasonEnum.values()) {
if (sceneEnum.getReason() == notifyScene) {
if (sceneEnum.getReason() == operationReason) {
return sceneEnum;
}
}

View File

@ -59,6 +59,8 @@ public enum RetryTaskStatusEnum {
public static final Set<Integer> TERMINAL_STATUS_SET = Sets.newHashSet(SUCCESS.getStatus(), FAIL.getStatus(),
STOP.getStatus(), CANCEL.getStatus());
public static final Set<Integer> NOT_SUCCESS = Sets.newHashSet(FAIL.getStatus(), STOP.getStatus(), CANCEL.getStatus());
public static RetryTaskStatusEnum getByStatus(@NonNull Integer status) {
for (RetryTaskStatusEnum value : RetryTaskStatusEnum.values()) {
if (Objects.equals(value.status, status)) {

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* <p>
*
* </p>
*
* @author opensnail
* @date 2025-02-22
*/
@AllArgsConstructor
@Getter
public enum RetryTaskExecutorSceneEnum {
AUTO_RETRY(1, SyetemTaskTypeEnum.RETRY),
MANUAL_RETRY(2, SyetemTaskTypeEnum.RETRY),
AUTO_CALLBACK(3, SyetemTaskTypeEnum.CALLBACK),
MANUAL_CALLBACK(4, SyetemTaskTypeEnum.CALLBACK);
private final int scene;
private final SyetemTaskTypeEnum taskType;
}

View File

@ -1,7 +1,5 @@
package com.aizuda.snailjob.server.retry.task.dto;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -17,12 +15,10 @@ import lombok.EqualsAndHashCode;
@Data
public class RetryExecutorResultDTO extends BaseDTO {
private Integer resultStatus;
private Integer operationReason;
private boolean incrementRetryCount;
private String resultJson;
private Integer statusCode;
private String idempotentId;
private String exceptionMsg;
private Integer taskStatus;
}

View File

@ -34,4 +34,6 @@ public class RetryTaskPrepareDTO {
private boolean onlyTimeoutCheck;
private Integer executorTimeout;
private Integer retryTaskExecutorScene;
}

View File

@ -20,6 +20,9 @@ public class TaskStopJobDTO extends BaseDTO {
*/
private Integer operationReason;
/**
* 若是失败补充失败信息
*/
private String message;
/**

View File

@ -2,9 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.client.model.request.StopRetryRequest;
import com.aizuda.snailjob.client.model.request.*;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
@ -125,7 +123,9 @@ public interface RetryTaskConverter {
RequestRetryExecutorDTO toRealRetryExecutorDTO(TaskStopJobDTO stopJobDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultDTO resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchRetryResultRequest resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(DispatchCallbackResultRequest resultDTO);
RetryExecutorResultDTO toRetryExecutorResultDTO(TaskStopJobDTO resultDTO);

View File

@ -22,7 +22,7 @@ public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy {
// 重新生成任务
RetryTaskGeneratorDTO generatorDTO = RetryTaskConverter.INSTANCE.toRetryTaskGeneratorDTO(context);
generatorDTO.setTaskStatus(RetryTaskStatusEnum.CANCEL.getStatus());
generatorDTO.setOperationReason(RetryOperationReasonEnum.JOB_DISCARD.getReason());
generatorDTO.setOperationReason(RetryOperationReasonEnum.RETRY_TASK_DISCARD.getReason());
retryTaskGeneratorHandler.generateRetryTask(generatorDTO);
}

View File

@ -31,12 +31,12 @@ public class OverlayRetryBlockStrategy extends AbstracJobBlockStrategy {
// 重新生成任务
RetryTaskGeneratorDTO generatorDTO = RetryTaskConverter.INSTANCE.toRetryTaskGeneratorDTO(context);
generatorDTO.setTaskStatus(RetryTaskStatusEnum.CANCEL.getStatus());
generatorDTO.setOperationReason(RetryOperationReasonEnum.JOB_DISCARD.getReason());
generatorDTO.setOperationReason(RetryOperationReasonEnum.RETRY_TASK_DISCARD.getReason());
retryTaskGeneratorHandler.generateRetryTask(generatorDTO);
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(context);
if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) {
stopJobDTO.setOperationReason(RetryOperationReasonEnum.JOB_OVERLAY.getReason());
stopJobDTO.setOperationReason(RetryOperationReasonEnum.RETRY_TASK_OVERLAY.getReason());
}
stopJobDTO.setNeedUpdateTaskStatus(true);

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.snailjob.client.model.request.RetryCallbackRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -188,7 +188,7 @@ public class RequestCallbackClientActor extends AbstractActor {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
executorResultDTO.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.snailjob.client.model.request.DispatchRetryRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.model.SnailJobHeaders;
@ -182,7 +182,7 @@ public class RequestRetryClientActor extends AbstractActor {
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(executorDTO);
executorResultDTO.setExceptionMsg(message);
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
executorResultDTO.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
actorRef.tell(executorResultDTO, actorRef);
}
}

View File

@ -14,6 +14,7 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
@ -166,6 +167,7 @@ public class ScanRetryActor extends AbstractActor {
RetryTaskPrepareDTO retryTaskPrepareDTO = RetryTaskConverter.INSTANCE.toRetryTaskPrepareDTO(partitionTask);
retryTaskPrepareDTO.setBlockStrategy(retrySceneConfig.getBackOff());
retryTaskPrepareDTO.setExecutorTimeout(retrySceneConfig.getExecutorTimeout());
retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.AUTO_RETRY.getScene());
waitExecRetries.add(retryTaskPrepareDTO);
}

View File

@ -1,18 +1,12 @@
package com.aizuda.snailjob.server.retry.task.support.handler;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.dto.RequestRetryExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@ -27,8 +21,6 @@ import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class RetryTaskStopHandler {
private final RetryTaskMapper retryTaskMapper;
/**
* 执行停止任务
*
@ -36,12 +28,6 @@ public class RetryTaskStopHandler {
*/
public void stop(TaskStopJobDTO stopJobDTO) {
// RetryTask retryTask = new RetryTask();
// retryTask.setId(stopJobDTO.getRetryTaskId());
// retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
// retryTask.setOperationReason(stopJobDTO.getOperationReason());
// Assert.isTrue(1 == retryTaskMapper.updateById(retryTask), () -> new SnailJobServerException("update retry task failed"));
RequestRetryExecutorDTO executorDTO = RetryTaskConverter.INSTANCE.toRealRetryExecutorDTO(stopJobDTO);
ActorRef actorRef = ActorGenerator.stopRetryTaskActor();
actorRef.tell(executorDTO, actorRef);
@ -56,7 +42,7 @@ public class RetryTaskStopHandler {
}
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(stopJobDTO);
executorResultDTO.setExceptionMsg(stopJobDTO.getMessage());
executorResultDTO.setResultStatus(RetryResultStatusEnum.FAILURE.getStatus());
executorResultDTO.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
executorResultDTO.setOperationReason(stopJobDTO.getOperationReason());
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
actorRef.tell(executorResultDTO, actorRef);

View File

@ -0,0 +1,71 @@
package com.aizuda.snailjob.server.retry.task.support.request;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.client.model.request.DispatchCallbackResultRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_CALLBACK_RESULT;
/**
* 上报回调执行的处理结果
*
* @author: opensnail
* @date : 2022-03-07 16:39
* @since 1.0.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class ReportCallbackResultHttpRequestHandler extends PostHttpRequestHandler {
@Override
public boolean supports(String path) {
return REPORT_CALLBACK_RESULT.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
@Transactional
public SnailJobRpcResult doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
try {
DispatchCallbackResultRequest request = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), DispatchCallbackResultRequest.class);
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(request);
RetryTaskStatusEnum statusEnum = RetryTaskStatusEnum.getByStatus(request.getTaskStatus());
Assert.notNull(statusEnum, () -> new SnailJobServerException("task status code is invalid"));
executorResultDTO.setIncrementRetryCount(true);
ActorRef actorRef = ActorGenerator.retryTaskExecutorResultActor();
actorRef.tell(executorResultDTO, actorRef);
return new SnailJobRpcResult(StatusEnum.YES.getStatus(), "Report dispatch result processed successfully", Boolean.TRUE, retryRequest.getReqId());
} catch (Exception e) {
return new SnailJobRpcResult(StatusEnum.YES.getStatus(), e.getMessage(), Boolean.FALSE, retryRequest.getReqId());
}
}
}

View File

@ -3,9 +3,9 @@ package com.aizuda.snailjob.server.retry.task.support.request;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
@ -15,7 +15,6 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.retry.task.dto.RetryExecutorResultDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
@ -23,8 +22,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_RETRY_DISPATCH_RESULT;
/**
@ -56,15 +53,14 @@ public class ReportDispatchResultHttpRequestHandler extends PostHttpRequestHandl
Object[] args = retryRequest.getArgs();
try {
DispatchRetryResultDTO resultDTO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), DispatchRetryResultDTO.class);
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(resultDTO);
RetryResultStatusEnum statusEnum = RetryResultStatusEnum.getRetryResultStatusEnum(resultDTO.getStatusCode());
Assert.notNull(statusEnum, () -> new SnailJobServerException("status code is invalid"));
executorResultDTO.setResultStatus(statusEnum.getStatus());
DispatchRetryResultRequest request = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), DispatchRetryResultRequest.class);
RetryExecutorResultDTO executorResultDTO = RetryTaskConverter.INSTANCE.toRetryExecutorResultDTO(request);
RetryTaskStatusEnum statusEnum = RetryTaskStatusEnum.getByStatus(request.getTaskStatus());
Assert.notNull(statusEnum, () -> new SnailJobServerException("task status code is invalid"));
executorResultDTO.setIncrementRetryCount(true);
if (RetryResultStatusEnum.FAILURE.getStatus().equals(statusEnum.getStatus())) {
if (RetryTaskStatusEnum.FAIL.getStatus().equals(statusEnum.getStatus())) {
executorResultDTO.setOperationReason(RetryOperationReasonEnum.RETRY_FAIL.getReason());
} else if (RetryResultStatusEnum.STOP.getStatus().equals(statusEnum.getStatus())) {
} else if (RetryTaskStatusEnum.STOP.getStatus().equals(statusEnum.getStatus())) {
executorResultDTO.setOperationReason(RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason());
}

View File

@ -2,13 +2,12 @@ package com.aizuda.snailjob.server.retry.task.support.result;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler;
@ -18,22 +17,20 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMappe
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Optional;
import static com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum.RETRY_TASK_FAIL_ERROR;
import static com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum.NOT_SUCCESS;
/**
* <p>
*
* 客户端执行重试失败服务端调度时失败等场景导致的任务执行失败
* </p>
*
* @author opensnail
@ -51,7 +48,9 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
return Objects.equals(RetryResultStatusEnum.FAILURE.getStatus(), context.getResultStatus());
RetryOperationReasonEnum reasonEnum = RetryOperationReasonEnum.of(context.getOperationReason());
return NOT_SUCCESS.contains(context.getTaskStatus())
&& RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason() != reasonEnum.getReason();
}
@Override
@ -61,47 +60,46 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
context.getGroupName(), context.getSceneName(), context.getNamespaceId());
Retry retry = retryMapper.selectById(context.getRetryId());
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
transactionTemplate.execute(status -> {
Integer maxRetryCount;
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
maxRetryCount = retrySceneConfig.getCbMaxCount();
} else {
maxRetryCount = retrySceneConfig.getMaxRetryCount();
}
Integer maxRetryCount;
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retry.getTaskType())) {
maxRetryCount = retrySceneConfig.getCbMaxCount();
} else {
maxRetryCount = retrySceneConfig.getMaxRetryCount();
}
if (maxRetryCount <= retry.getRetryCount() + 1) {
retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retry, retrySceneConfig);
} else if (context.isIncrementRetryCount()) {
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
}
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setTaskStatus(RetryTaskStatusEnum.FAIL.getStatus());
retryTask.setOperationReason(context.getOperationReason());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
if (maxRetryCount <= retry.getRetryCount() + 1) {
retry.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retry, retrySceneConfig);
} else if (context.isIncrementRetryCount()) {
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO =
RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene());
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO));
}
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setTaskStatus(Optional.ofNullable(context.getTaskStatus()).orElse(RetryTaskStatusEnum.FAIL.getStatus()));
retryTask.setOperationReason(Optional.ofNullable(context.getOperationReason()).orElse(RetryOperationReasonEnum.NONE.getReason()));
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO =
RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene());
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO));
return null;
});
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.result;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.retry.task.dto.BaseDTO;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -17,8 +17,19 @@ import lombok.EqualsAndHashCode;
@Data
public class RetryResultContext extends BaseDTO {
private Integer resultStatus;
// /**
// * 客户端返回的结果
// * @see RetryResultStatusEnum
// */
// private Integer resultStatus;
/**
* 重试任务状态
* @see RetryTaskStatusEnum
*/
private Integer taskStatus;
private Integer operationReason;
private boolean incrementRetryCount;
private String resultJson;
private String idempotentId;

View File

@ -1,31 +1,25 @@
package com.aizuda.snailjob.server.retry.task.support.result;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler;
import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum.STOP;
/**
* <p>
*
* 客户端触发停止重试指令, 重试挂起
* </p>
*
* @author opensnail
@ -35,47 +29,38 @@ import java.util.Objects;
@RequiredArgsConstructor
@Slf4j
public class RetryStopHandler extends AbstractRetryResultHandler {
private final AccessTemplate accessTemplate;
private final CallbackRetryTaskHandler callbackRetryTaskHandler;
private final TransactionTemplate transactionTemplate;
private final RetryTaskMapper retryTaskMapper;
private final RetryMapper retryMapper;
@Override
public boolean supports(RetryResultContext context) {
return Objects.equals(RetryResultStatusEnum.STOP.getStatus(), context.getResultStatus());
RetryOperationReasonEnum reasonEnum = RetryOperationReasonEnum.of(context.getOperationReason());
return STOP.getStatus().equals(context.getTaskStatus())
&& RetryOperationReasonEnum.CLIENT_TRIGGER_RETRY_STOP.getReason() == reasonEnum.getReason();
}
@Override
public void doHandler(RetryResultContext context) {
RetrySceneConfig retrySceneConfig =
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(
context.getGroupName(), context.getSceneName(), context.getNamespaceId());
Retry retry = retryMapper.selectById(context.getRetryId());
transactionTemplate.execute((status) -> {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
Retry retry = new Retry();
retry.setId(context.getRetryId());
retry.setRetryStatus(RetryStatusEnum.SUSPEND.getStatus());
retry.setUpdateDt(LocalDateTime.now());
retry.setRetryCount(retry.getRetryCount() + 1);
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
retry.getGroupName()));
retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
retry.setUpdateDt(LocalDateTime.now());
retry.setDeleted(retry.getId());
retry.setRetryCount(retry.getRetryCount() + 1);
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
retry.getGroupName()));
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setOperationReason(context.getOperationReason());
retryTask.setTaskStatus(STOP.getStatus());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setOperationReason(context.getOperationReason());
retryTask.setTaskStatus(RetryTaskStatusEnum.STOP.getStatus());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retry, retrySceneConfig);
}
return null;
});
}
}

View File

@ -1,11 +1,9 @@
package com.aizuda.snailjob.server.retry.task.support.result;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.retry.task.support.RetryResultHandler;
import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
@ -15,8 +13,6 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
@ -24,7 +20,7 @@ import java.util.Objects;
/**
* <p>
*
* 任务执行成功
* </p>
*
* @author opensnail
@ -41,7 +37,7 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
@Override
public boolean supports(RetryResultContext context) {
return Objects.equals(RetryResultStatusEnum.SUCCESS.getStatus(), context.getResultStatus());
return Objects.equals(RetryTaskStatusEnum.SUCCESS.getStatus(), context.getTaskStatus());
}
@Override
@ -52,28 +48,26 @@ public class RetrySuccessHandler extends AbstractRetryResultHandler {
context.getGroupName(), context.getSceneName(), context.getNamespaceId());
Retry retry = retryMapper.selectById(context.getRetryId());
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
retry.setUpdateDt(LocalDateTime.now());
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
retry.getGroupName()));
transactionTemplate.execute((status -> {
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setTaskStatus(RetryTaskStatusEnum.SUCCESS.getStatus());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
retry.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
retry.setUpdateDt(LocalDateTime.now());
retry.setRetryCount(retry.getRetryCount() + 1);
retry.setDeleted(retry.getId());
Assert.isTrue(1 == retryMapper.updateById(retry),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]",
retry.getGroupName()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retry, retrySceneConfig);
RetryTask retryTask = new RetryTask();
retryTask.setId(context.getRetryTaskId());
retryTask.setTaskStatus(RetryTaskStatusEnum.SUCCESS.getStatus());
Assert.isTrue(1 == retryTaskMapper.updateById(retryTask),
() -> new SnailJobServerException("更新重试任务失败. groupName:[{}]", retry.getGroupName()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retry, retrySceneConfig);
}
});
return null;
}));
}
}

View File

@ -5,7 +5,7 @@ import com.aizuda.snailjob.server.web.annotation.LoginRequired;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskService;
import com.aizuda.snailjob.server.web.service.RetryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@ -23,66 +23,61 @@ import java.util.List;
public class RetryController {
@Autowired
private RetryTaskService retryTaskService;
private RetryService retryService;
@LoginRequired
@GetMapping("list")
public PageResult<List<RetryResponseVO>> getRetryTaskPage(RetryQueryVO queryVO) {
return retryTaskService.getRetryPage(queryVO);
return retryService.getRetryPage(queryVO);
}
@LoginRequired
@GetMapping("{id}")
public RetryResponseVO getRetryTaskById(@RequestParam("groupName") String groupName,
@PathVariable("id") Long id) {
return retryTaskService.getRetryById(groupName, id);
return retryService.getRetryById(groupName, id);
}
@LoginRequired
@PutMapping("status")
public int updateRetryTaskStatus(@RequestBody RetryUpdateStatusRequestVO retryUpdateStatusRequestVO) {
return retryTaskService.updateRetryTaskStatus(retryUpdateStatusRequestVO);
return retryService.updateRetryTaskStatus(retryUpdateStatusRequestVO);
}
@LoginRequired
@PostMapping
public int saveRetryTask(@RequestBody @Validated RetrySaveRequestVO retryTaskRequestVO) {
return retryTaskService.saveRetryTask(retryTaskRequestVO);
return retryService.saveRetryTask(retryTaskRequestVO);
}
@LoginRequired
@PostMapping("/generate/idempotent-id")
public Result<String> idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) {
return new Result<>(retryTaskService.idempotentIdGenerate(generateRetryIdempotentIdVO));
return new Result<>(retryService.idempotentIdGenerate(generateRetryIdempotentIdVO));
}
@LoginRequired
@PutMapping("/batch")
public Integer updateRetryTaskExecutorName(@RequestBody @Validated RetryUpdateExecutorNameRequestVO requestVO) {
return retryTaskService.updateRetryExecutorName(requestVO);
return retryService.updateRetryExecutorName(requestVO);
}
@LoginRequired
@DeleteMapping("/batch")
public boolean batchDeleteRetryTask(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) {
return retryTaskService.batchDeleteRetry(requestVO);
public boolean batchDeleteRetry(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) {
return retryService.batchDeleteRetry(requestVO);
}
@LoginRequired
@PostMapping("/batch")
public Integer parseLogs(@RequestBody @Validated ParseLogsVO parseLogsVO) {
return retryTaskService.parseLogs(parseLogsVO);
return retryService.parseLogs(parseLogsVO);
}
@LoginRequired
@PostMapping("/manual/trigger/retry/task")
public boolean manualTriggerRetryTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) {
return retryTaskService.manualTriggerRetry(requestVO);
return retryService.manualTriggerRetryTask(requestVO);
}
@LoginRequired
@PostMapping("/manual/trigger/callback/task")
public boolean manualTriggerCallbackTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) {
return retryTaskService.manualTriggerCallback(requestVO);
}
}

View File

@ -5,8 +5,8 @@ import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskLogService;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskService;
import jakarta.validation.constraints.NotEmpty;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@ -25,35 +25,43 @@ import java.util.Set;
public class RetryTaskController {
@Autowired
private RetryTaskLogService retryTaskLogService;
private RetryTaskService retryTaskService;
@LoginRequired
@GetMapping("list")
public PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO) {
return retryTaskLogService.getRetryTaskLogPage(queryVO);
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
return retryTaskService.getRetryTaskLogPage(queryVO);
}
@LoginRequired
@GetMapping("/message/list")
public RetryTaskLogMessageResponseVO getRetryTaskLogPage(RetryTaskLogMessageQueryVO queryVO) {
return retryTaskLogService.getRetryTaskLogMessagePage(queryVO);
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) {
return retryTaskService.getRetryTaskLogMessagePage(queryVO);
}
@LoginRequired
@GetMapping("{id}")
public RetryTaskLogResponseVO getRetryTaskLogById(@PathVariable("id") Long id) {
return retryTaskLogService.getRetryTaskLogById(id);
public RetryTaskResponseVO getRetryTaskById(@PathVariable("id") Long id) {
return retryTaskService.getRetryTaskById(id);
}
@LoginRequired
@PostMapping("/stop/{id}")
public Boolean stopById(@PathVariable("id") Long id) {
return retryTaskService.stopById(id);
}
@LoginRequired
@DeleteMapping("{id}")
public Boolean deleteById(@PathVariable("id") Long id) {
return retryTaskLogService.deleteById(id);
return retryTaskService.deleteById(id);
}
@LoginRequired
@DeleteMapping("ids")
public Boolean batchDelete(@RequestBody @NotEmpty(message = "ids不能为空") Set<Long> ids) {
return retryTaskLogService.batchDelete(ids);
return retryTaskService.batchDelete(ids);
}
}

View File

@ -19,7 +19,7 @@ public class ManualTriggerTaskRequestVO {
@Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线")
private String groupName;
@NotEmpty(message = "uniqueIds 不能为空")
private List<String> uniqueIds;
@NotEmpty(message = "retryIds 不能为空")
private List<Long> retryIds;
}

View File

@ -9,7 +9,7 @@ import java.time.LocalDateTime;
* @date : 2022-02-28 09:09
*/
@Data
public class RetryTaskLogResponseVO {
public class RetryTaskResponseVO {
private Long id;
@ -21,7 +21,6 @@ public class RetryTaskLogResponseVO {
private Long retryId;
private Integer taskType;
private LocalDateTime createDt;

View File

@ -0,0 +1,83 @@
package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import java.util.List;
/**
* @author opensnail
* @date 2022-02-27
* @since 2.0
*/
public interface RetryService {
PageResult<List<RetryResponseVO>> getRetryPage(RetryQueryVO queryVO);
/**
* 通过重试任务表id获取重试任务信息
*
* @param groupName 组名称
* @param id 重试任务表id
* @return 重试任务
*/
RetryResponseVO getRetryById(String groupName, Long id);
/**
* 更新重试任务状态
*
* @param retryUpdateStatusRequestVO 更新重试任务状态请求模型
* @return
*/
int updateRetryTaskStatus(RetryUpdateStatusRequestVO retryUpdateStatusRequestVO);
/**
* 手动新增重试任务
*
* @param retryTaskRequestVO {@link RetrySaveRequestVO} 重试数据模型
* @return
*/
int saveRetryTask(RetrySaveRequestVO retryTaskRequestVO);
/**
* 委托客户端生成idempotentId
*
* @param generateRetryIdempotentIdVO 生成idempotentId请求模型
* @return
*/
String idempotentIdGenerate(GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO);
/**
* 若客户端在变更了执行器,从而会导致执行重试任务时找不到执行器类因此使用者可以在后端进行执行变更
*
* @param requestVO 更新执行器变更模型
* @return 更新条数
*/
int updateRetryExecutorName(RetryUpdateExecutorNameRequestVO requestVO);
/**
* 批量删除重试数据
*
* @param requestVO 批量删除重试数据
* @return
*/
boolean batchDeleteRetry(BatchDeleteRetryTaskVO requestVO);
/**
* 解析日志
*
* @param parseLogsVO {@link ParseLogsVO} 解析参数模型
* @return
*/
Integer parseLogs(ParseLogsVO parseLogsVO);
/**
* 手动支持重试任务
*
* @param requestVO
* @return
*/
boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO);
}

View File

@ -1,28 +0,0 @@
package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO;
import java.util.List;
import java.util.Set;
/**
* @author opensnail
* @date 2022-02-27
* @since 2.0
*/
public interface RetryTaskLogService {
PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO);
RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO);
RetryTaskLogResponseVO getRetryTaskLogById(Long id);
boolean deleteById(Long id);
boolean batchDelete(Set<Long> ids);
}

View File

@ -1,10 +1,13 @@
package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import java.util.List;
import java.util.Set;
/**
* @author opensnail
@ -13,78 +16,15 @@ import java.util.List;
*/
public interface RetryTaskService {
PageResult<List<RetryResponseVO>> getRetryPage(RetryQueryVO queryVO);
PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO);
/**
* 通过重试任务表id获取重试任务信息
*
* @param groupName 组名称
* @param id 重试任务表id
* @return 重试任务
*/
RetryResponseVO getRetryById(String groupName, Long id);
RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO);
/**
* 更新重试任务状态
*
* @param retryUpdateStatusRequestVO 更新重试任务状态请求模型
* @return
*/
int updateRetryTaskStatus(RetryUpdateStatusRequestVO retryUpdateStatusRequestVO);
RetryTaskResponseVO getRetryTaskById(Long id);
/**
* 手动新增重试任务
*
* @param retryTaskRequestVO {@link RetrySaveRequestVO} 重试数据模型
* @return
*/
int saveRetryTask(RetrySaveRequestVO retryTaskRequestVO);
boolean deleteById(Long id);
/**
* 委托客户端生成idempotentId
*
* @param generateRetryIdempotentIdVO 生成idempotentId请求模型
* @return
*/
String idempotentIdGenerate(GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO);
boolean batchDelete(Set<Long> ids);
/**
* 若客户端在变更了执行器,从而会导致执行重试任务时找不到执行器类因此使用者可以在后端进行执行变更
*
* @param requestVO 更新执行器变更模型
* @return 更新条数
*/
int updateRetryExecutorName(RetryUpdateExecutorNameRequestVO requestVO);
/**
* 批量删除重试数据
*
* @param requestVO 批量删除重试数据
* @return
*/
boolean batchDeleteRetry(BatchDeleteRetryTaskVO requestVO);
/**
* 解析日志
*
* @param parseLogsVO {@link ParseLogsVO} 解析参数模型
* @return
*/
Integer parseLogs(ParseLogsVO parseLogsVO);
/**
* 手动支持重试任务
*
* @param requestVO
* @return
*/
boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO);
/**
* 手动执行回调任务
*
* @param requestVO
* @return
*/
boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO);
Boolean stopById(Long id);
}

View File

@ -0,0 +1,22 @@
package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import org.mapstruct.factory.Mappers;
/**
* <p>
*
* </p>
*
* @author opensnail
* @date 2025-02-22
*/
public interface RetryConverter {
RetryConverter INSTANCE = Mappers.getMapper(RetryConverter.class);
RetryTaskPrepareDTO toRetryTaskPrepareDTO(Retry retry);
TaskStopJobDTO toTaskStopJobDTO(Retry retry);
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
@ -16,8 +16,8 @@ public interface RetryTaskLogResponseVOConverter {
RetryTaskLogResponseVOConverter INSTANCE = Mappers.getMapper(RetryTaskLogResponseVOConverter.class);
RetryTaskLogResponseVO convert(RetryTask retryTask);
RetryTaskResponseVO convert(RetryTask retryTask);
List<RetryTaskLogResponseVO> convertList(List<RetryTask> retryTasks);
List<RetryTaskResponseVO> convertList(List<RetryTask> retryTasks);
}

View File

@ -0,0 +1,404 @@
package com.aizuda.snailjob.server.web.service.impl;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.enums.TaskGeneratorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskContext;
import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskGenerator;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.server.web.service.RetryService;
import com.aizuda.snailjob.server.web.service.convert.RetryConverter;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter;
import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.aizuda.snailjob.common.core.enums.RetryStatusEnum.ALLOW_DELETE_STATUS;
/**
* @author opensnail
* @date 2022-02-27
* @since 2.0
*/
@Service
public class RetryServiceImpl implements RetryService {
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
private RetryTaskMapper retryTaskMapper;
@Autowired
private AccessTemplate accessTemplate;
@Autowired
@Lazy
private List<TaskGenerator> taskGenerators;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public PageResult<List<RetryResponseVO>> getRetryPage(RetryQueryVO queryVO) {
PageDTO<Retry> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
if (StrUtil.isBlank(queryVO.getGroupName())) {
return new PageResult<>(pageDTO, new ArrayList<>());
}
List<String> groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName());
LambdaQueryWrapper<Retry> queryWrapper = new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.in(CollUtil.isNotEmpty(groupNames), Retry::getGroupName, groupNames)
.eq(StrUtil.isNotBlank(queryVO.getSceneName()), Retry::getSceneName, queryVO.getSceneName())
.eq(StrUtil.isNotBlank(queryVO.getBizNo()), Retry::getBizNo, queryVO.getBizNo())
.eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), Retry::getIdempotentId, queryVO.getIdempotentId())
.eq(Objects.nonNull(queryVO.getRetryId()), Retry::getId, queryVO.getRetryId())
.eq(Objects.nonNull(queryVO.getRetryStatus()), Retry::getRetryStatus, queryVO.getRetryStatus())
.eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.select(Retry::getId, Retry::getBizNo, Retry::getIdempotentId,
Retry::getGroupName, Retry::getNextTriggerAt, Retry::getRetryCount,
Retry::getRetryStatus, Retry::getUpdateDt, Retry::getCreateDt, Retry::getSceneName,
Retry::getTaskType, Retry::getParentId)
.orderByDesc(Retry::getCreateDt);
pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper);
Set<Long> ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId);
Map<Long, Retry> callbackMap = Maps.newHashMap();
if (CollUtil.isNotEmpty(ids)) {
List<Retry> callbackTaskList = accessTemplate.getRetryAccess()
.list(new LambdaQueryWrapper<Retry>().in(Retry::getParentId, ids));
callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
}
List<RetryResponseVO> retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords());
for (RetryResponseVO retryResponseVO : retryResponseList) {
RetryResponseVO responseVO = RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId()));
if (Objects.isNull(responseVO)) {
retryResponseVO.setChildren(Lists.newArrayList());
} else {
retryResponseVO.setChildren(Lists.newArrayList(responseVO));
}
}
return new PageResult<>(pageDTO, retryResponseList);
}
@Override
public RetryResponseVO getRetryById(String groupName, Long id) {
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
Retry retry = retryTaskAccess.one(new LambdaQueryWrapper<Retry>().eq(Retry::getId, id));
return RetryTaskResponseVOConverter.INSTANCE.convert(retry);
}
@Override
@Transactional
public int updateRetryTaskStatus(RetryUpdateStatusRequestVO requestVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(requestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new SnailJobServerException("重试状态错误. [{}]", requestVO.getRetryStatus());
}
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
Retry retry = retryTaskAccess.one(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getId, requestVO.getId()));
if (Objects.isNull(retry)) {
throw new SnailJobServerException("未查询到重试任务");
}
retry.setRetryStatus(requestVO.getRetryStatus());
retry.setGroupName(requestVO.getGroupName());
// 若恢复重试则需要重新计算下次触发时间
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(retry.getGroupName(), retry.getSceneName(), namespaceId);
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli());
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(retry.getRetryCount() + 1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
retry.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext));
}
if (RetryStatusEnum.FINISH.getStatus().equals(retryStatusEnum.getStatus())) {
RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retry);
retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli());
SnailJobLog.REMOTE.info("=============手动操作完成============. <|>{}<|>", retryLogMetaDTO);
}
//
// RetryTask retryTask = new RetryTask();
// retryTask.setTaskStatus(requestVO.getRetryStatus());
// retryTaskMapper.update(retryTask, new LambdaUpdateWrapper<RetryTask>()
// .eq(RetryTask::getNamespaceId, namespaceId)
// .eq(RetryTask::getUniqueId, retry.getUniqueId())
// .eq(RetryTask::getGroupName, retry.getGroupName()));
retry.setUpdateDt(LocalDateTime.now());
return retryTaskAccess.updateById(retry);
}
@Override
public int saveRetryTask(final RetrySaveRequestVO retryTaskRequestVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskRequestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new SnailJobServerException("重试状态错误");
}
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_SINGLE.getScene()))
.findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器"));
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(retryTaskRequestVO.getSceneName());
taskContext.setGroupName(retryTaskRequestVO.getGroupName());
taskContext.setInitStatus(retryTaskRequestVO.getRetryStatus());
taskContext.setNamespaceId(namespaceId);
taskContext.setTaskInfos(
Collections.singletonList(TaskContextConverter.INSTANCE.convert(retryTaskRequestVO)));
// 生成任务
taskGenerator.taskGenerator(taskContext);
return 1;
}
@Override
public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(
generateRetryIdempotentIdVO.getGroupName(),
namespaceId);
Assert.notEmpty(serverNodes,
() -> new SnailJobServerException("生成idempotentId失败: 不存在活跃的客户端节点"));
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(),
generateRetryIdempotentIdVO.getSceneName(), namespaceId);
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retrySceneConfig.getSceneName(),
retrySceneConfig.getGroupName(), retrySceneConfig.getNamespaceId(), retrySceneConfig.getRouteKey());
// 委托客户端生成idempotentId
GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO();
generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName());
generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName());
generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr());
generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName());
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(serverNode)
.client(RetryRpcClient.class)
.build();
Result result = rpcClient.generateIdempotentId(generateRetryIdempotentIdDTO);
Assert.notNull(result, () -> new SnailJobServerException("idempotentId生成失败"));
Assert.isTrue(1 == result.getStatus(),
() -> new SnailJobServerException("idempotentId生成失败:请确保参数与执行器名称正确"));
return (String) result.getData();
}
@Override
public int updateRetryExecutorName(final RetryUpdateExecutorNameRequestVO requestVO) {
Retry retry = new Retry();
retry.setExecutorName(requestVO.getExecutorName());
retry.setRetryStatus(requestVO.getRetryStatus());
retry.setUpdateDt(LocalDateTime.now());
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
// 根据重试数据id更新执行器名称
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
return retryTaskAccess.update(retry, new LambdaUpdateWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, requestVO.getGroupName())
.in(Retry::getId, requestVO.getIds()));
}
@Override
@Transactional
public boolean batchDeleteRetry(final BatchDeleteRetryTaskVO requestVO) {
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<Retry> retries = retryTaskAccess.list(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, requestVO.getGroupName())
.in(Retry::getRetryStatus, ALLOW_DELETE_STATUS)
.in(Retry::getId, requestVO.getIds())
);
Assert.notEmpty(retries,
() -> new SnailJobServerException("没有可删除的数据, 只有非【处理中】的数据可以删除"));
Set<Long> retryIds = StreamUtils.toSet(retries, Retry::getId);
retryTaskMapper.delete(new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getGroupName, requestVO.getGroupName())
.eq(RetryTask::getNamespaceId, namespaceId)
.in(RetryTask::getRetryId, retryIds));
retryTaskLogMessageMapper.delete(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, requestVO.getGroupName())
.in(RetryTaskLogMessage::getRetryId, retryIds));
Assert.isTrue(requestVO.getIds().size() == retryTaskAccess.delete(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, requestVO.getGroupName())
.in(Retry::getRetryStatus, ALLOW_DELETE_STATUS)
.in(Retry::getId, requestVO.getIds()))
, () -> new SnailJobServerException("删除重试任务失败, 请检查任务状态是否为已完成或者最大次数"));
return Boolean.TRUE;
}
@Override
public Integer parseLogs(ParseLogsVO parseLogsVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(parseLogsVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new SnailJobServerException("重试状态错误");
}
String logStr = parseLogsVO.getLogStr();
String patternString = "<\\|>(.*?)<\\|>";
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(logStr);
List<RetryTaskDTO> waitInsertList = new ArrayList<>();
// 查找匹配的内容并输出
while (matcher.find()) {
String extractedData = matcher.group(1);
if (StrUtil.isBlank(extractedData)) {
continue;
}
List<RetryTaskDTO> retryTaskList = JsonUtil.parseList(extractedData, RetryTaskDTO.class);
if (CollUtil.isNotEmpty(retryTaskList)) {
waitInsertList.addAll(retryTaskList);
}
}
Assert.isFalse(waitInsertList.isEmpty(), () -> new SnailJobServerException("未找到匹配的数据"));
Assert.isTrue(waitInsertList.size() <= 500, () -> new SnailJobServerException("最多只能处理500条数据"));
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_BATCH.getScene()))
.findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器"));
boolean allMatch = waitInsertList.stream()
.allMatch(retryTaskDTO -> retryTaskDTO.getGroupName().equals(parseLogsVO.getGroupName()));
Assert.isTrue(allMatch, () -> new SnailJobServerException("存在数据groupName不匹配请检查您的数据"));
Map<String, List<RetryTaskDTO>> map = StreamUtils.groupByKey(waitInsertList, RetryTaskDTO::getSceneName);
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
map.forEach(((sceneName, retryTaskDTOS) -> {
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(sceneName);
taskContext.setGroupName(parseLogsVO.getGroupName());
taskContext.setNamespaceId(namespaceId);
taskContext.setInitStatus(parseLogsVO.getRetryStatus());
taskContext.setTaskInfos(TaskContextConverter.INSTANCE.convert(retryTaskDTOS));
// 生成任务
taskGenerator.taskGenerator(taskContext);
}));
return waitInsertList.size();
}
@Override
public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, requestVO.getGroupName())
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
Assert.isTrue(count > 0, () -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName()));
List<Long> retryIds = requestVO.getRetryIds();
List<Retry> list = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.in(Retry::getId, retryIds)
);
Assert.notEmpty(list, () -> new SnailJobServerException("没有可执行的任务"));
for (Retry retry : list) {
RetryTaskPrepareDTO retryTaskPrepareDTO = RetryConverter.INSTANCE.toRetryTaskPrepareDTO(retry);
// 设置now表示立即执行
retryTaskPrepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene());
// 准备阶段执行
ActorRef actorRef = ActorGenerator.retryTaskPrepareActor();
actorRef.tell(retryTaskPrepareDTO, actorRef);
}
return true;
}
}

View File

@ -1,207 +0,0 @@
package com.aizuda.snailjob.server.web.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskLogService;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskLogResponseVOConverter;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author: opensnail
* @date : 2022-02-28 09:10
*/
@Service
@RequiredArgsConstructor
public class RetryTaskLogServiceImpl implements RetryTaskLogService {
private final RetryTaskMapper retryTaskMapper;
private final RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public PageResult<List<RetryTaskLogResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO) {
PageDTO<RetryTask> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
UserSessionVO userSessionVO = UserSessionUtils.currentUserSession();
String namespaceId = userSessionVO.getNamespaceId();
List<String> groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName());
LambdaQueryWrapper<RetryTask> wrapper = new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames)
.eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName())
.eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus())
.eq(Objects.nonNull(queryVO.getRetryId()), RetryTask::getRetryId, queryVO.getRetryId())
.between(ObjUtil.isNotNull(queryVO.getDatetimeRange()),
RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt())
.select(RetryTask::getGroupName, RetryTask::getId, RetryTask::getSceneName, RetryTask::getTaskStatus,
RetryTask::getCreateDt, RetryTask::getTaskType, RetryTask::getOperationReason, RetryTask::getRetryId)
.orderByDesc(RetryTask::getCreateDt);
PageDTO<RetryTask> retryTaskPageDTO = retryTaskMapper.selectPage(pageDTO, wrapper);
return new PageResult<>(retryTaskPageDTO,
RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskPageDTO.getRecords()));
}
@Override
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(
RetryTaskLogMessageQueryVO queryVO) {
if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) {
RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO();
jobLogResponseVO.setNextStartId(0L);
jobLogResponseVO.setFromIndex(0);
return jobLogResponseVO;
}
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
PageDTO<RetryTaskLogMessage> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(pageDTO,
new LambdaQueryWrapper<RetryTaskLogMessage>()
.select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum)
.ge(RetryTaskLogMessage::getId, queryVO.getStartId())
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId())
.eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName())
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)
.orderByDesc(RetryTaskLogMessage::getCreateDt));
List<RetryTaskLogMessage> records = selectPage.getRecords();
if (CollUtil.isEmpty(records)) {
return new RetryTaskLogMessageResponseVO()
.setFinished(Boolean.TRUE)
.setNextStartId(queryVO.getStartId())
.setFromIndex(0);
}
Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0);
RetryTaskLogMessage firstRecord = records.get(0);
List<Long> ids = Lists.newArrayList(firstRecord.getId());
int total = firstRecord.getLogNum() - fromIndex;
for (int i = 1; i < records.size(); i++) {
RetryTaskLogMessage record = records.get(i);
if (total + record.getLogNum() > queryVO.getSize()) {
break;
}
total += record.getLogNum();
ids.add(record.getId());
}
long nextStartId = 0;
List<Map<String, String>> messages = Lists.newArrayList();
List<RetryTaskLogMessage> jobLogMessages = retryTaskLogMessageMapper.selectList(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.in(RetryTaskLogMessage::getId, ids)
.orderByAsc(RetryTaskLogMessage::getId)
.orderByAsc(RetryTaskLogMessage::getRealTime)
);
for (final RetryTaskLogMessage retryTaskLogMessage : jobLogMessages) {
List<Map<String, String>> originalList = JsonUtil.parseObject(retryTaskLogMessage.getMessage(), List.class);
int size = originalList.size() - fromIndex;
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
.collect(Collectors.toList());
if (messages.size() + size >= queryVO.getSize()) {
messages.addAll(pageList);
nextStartId = retryTaskLogMessage.getId();
fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1;
break;
}
messages.addAll(pageList);
nextStartId = retryTaskLogMessage.getId() + 1;
fromIndex = 0;
}
messages = messages.stream()
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.collect(Collectors.toList());
RetryTaskLogMessageResponseVO responseVO = new RetryTaskLogMessageResponseVO();
responseVO.setMessage(messages);
responseVO.setNextStartId(nextStartId);
responseVO.setFromIndex(fromIndex);
return responseVO;
}
@Override
public RetryTaskLogResponseVO getRetryTaskLogById(Long id) {
RetryTask retryTask = retryTaskMapper.selectById(id);
return RetryTaskLogResponseVOConverter.INSTANCE.convert(retryTask);
}
@Override
@Transactional
public boolean deleteById(final Long id) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
RetryTask retryTask = retryTaskMapper.selectOne(
new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus()))
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getId, id));
Assert.notNull(retryTask, () -> new SnailJobServerException("数据删除失败"));
retryTaskLogMessageMapper.delete(new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName())
.eq(RetryTaskLogMessage::getRetryTaskId, id)
);
return 1 == retryTaskMapper.deleteById(id);
}
@Override
@Transactional
public boolean batchDelete(final Set<Long> ids) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<RetryTask> retryTasks = retryTaskMapper.selectList(
new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus()))
.eq(RetryTask::getNamespaceId, namespaceId)
.in(RetryTask::getId, ids));
Assert.notEmpty(retryTasks, () -> new SnailJobServerException("数据不存在"));
Assert.isTrue(retryTasks.size() == ids.size(), () -> new SnailJobServerException("数据不存在"));
for (final RetryTask retryTask : retryTasks) {
retryTaskLogMessageMapper.delete(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName())
.eq(RetryTaskLogMessage::getRetryTaskId, retryTask.getId()));
}
return 1 == retryTaskMapper.deleteByIds(ids);
}
}

View File

@ -2,427 +2,228 @@ package com.aizuda.snailjob.server.web.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.enums.TaskGeneratorSceneEnum;
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskContext;
import com.aizuda.snailjob.server.retry.task.support.generator.retry.TaskGenerator;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.web.model.response.RetryResponseVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskService;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter;
import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter;
import com.aizuda.snailjob.server.web.service.convert.RetryConverter;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskLogResponseVOConverter;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.aizuda.snailjob.common.core.enums.RetryStatusEnum.ALLOW_DELETE_STATUS;
import java.util.stream.Collectors;
/**
* @author opensnail
* @date 2022-02-27
* @since 2.0
* @author: opensnail
* @date : 2022-02-28 09:10
*/
@Service
@RequiredArgsConstructor
public class RetryTaskServiceImpl implements RetryTaskService {
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
private RetryTaskMapper retryTaskMapper;
@Autowired
private AccessTemplate accessTemplate;
@Autowired
@Lazy
private List<TaskGenerator> taskGenerators;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
private final RetryTaskMapper retryTaskMapper;
private final RetryMapper retryMapper;
private final RetryTaskLogMessageMapper retryTaskLogMessageMapper;
private final RetryTaskStopHandler retryTaskStopHandler;
@Override
public PageResult<List<RetryResponseVO>> getRetryPage(RetryQueryVO queryVO) {
public PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO) {
PageDTO<RetryTask> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
PageDTO<Retry> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
if (StrUtil.isBlank(queryVO.getGroupName())) {
return new PageResult<>(pageDTO, new ArrayList<>());
}
UserSessionVO userSessionVO = UserSessionUtils.currentUserSession();
String namespaceId = userSessionVO.getNamespaceId();
List<String> groupNames = UserSessionUtils.getGroupNames(queryVO.getGroupName());
LambdaQueryWrapper<Retry> queryWrapper = new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.in(CollUtil.isNotEmpty(groupNames), Retry::getGroupName, groupNames)
.eq(StrUtil.isNotBlank(queryVO.getSceneName()), Retry::getSceneName, queryVO.getSceneName())
.eq(StrUtil.isNotBlank(queryVO.getBizNo()), Retry::getBizNo, queryVO.getBizNo())
.eq(StrUtil.isNotBlank(queryVO.getIdempotentId()), Retry::getIdempotentId, queryVO.getIdempotentId())
.eq(Objects.nonNull(queryVO.getRetryId()), Retry::getId, queryVO.getRetryId())
.eq(Objects.nonNull(queryVO.getRetryStatus()), Retry::getRetryStatus, queryVO.getRetryStatus())
.eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.select(Retry::getId, Retry::getBizNo, Retry::getIdempotentId,
Retry::getGroupName, Retry::getNextTriggerAt, Retry::getRetryCount,
Retry::getRetryStatus, Retry::getUpdateDt, Retry::getCreateDt, Retry::getSceneName,
Retry::getTaskType, Retry::getParentId)
.orderByDesc(Retry::getCreateDt);
pageDTO = accessTemplate.getRetryAccess().listPage(pageDTO, queryWrapper);
Set<Long> ids = StreamUtils.toSet(pageDTO.getRecords(), Retry::getId);
Map<Long, Retry> callbackMap = Maps.newHashMap();
if (CollUtil.isNotEmpty(ids)) {
List<Retry> callbackTaskList = accessTemplate.getRetryAccess()
.list(new LambdaQueryWrapper<Retry>().in(Retry::getParentId, ids));
callbackMap = StreamUtils.toIdentityMap(callbackTaskList, Retry::getParentId);
}
List<RetryResponseVO> retryResponseList = RetryTaskResponseVOConverter.INSTANCE.convertList(pageDTO.getRecords());
for (RetryResponseVO retryResponseVO : retryResponseList) {
RetryResponseVO responseVO = RetryTaskResponseVOConverter.INSTANCE.convert(callbackMap.get(retryResponseVO.getId()));
if (Objects.isNull(responseVO)) {
retryResponseVO.setChildren(Lists.newArrayList());
} else {
retryResponseVO.setChildren(Lists.newArrayList(responseVO));
}
}
return new PageResult<>(pageDTO, retryResponseList);
}
@Override
public RetryResponseVO getRetryById(String groupName, Long id) {
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
Retry retry = retryTaskAccess.one(new LambdaQueryWrapper<Retry>().eq(Retry::getId, id));
return RetryTaskResponseVOConverter.INSTANCE.convert(retry);
}
@Override
@Transactional
public int updateRetryTaskStatus(RetryUpdateStatusRequestVO requestVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(requestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new SnailJobServerException("重试状态错误. [{}]", requestVO.getRetryStatus());
}
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
Retry retry = retryTaskAccess.one(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getId, requestVO.getId()));
if (Objects.isNull(retry)) {
throw new SnailJobServerException("未查询到重试任务");
}
retry.setRetryStatus(requestVO.getRetryStatus());
retry.setGroupName(requestVO.getGroupName());
// 若恢复重试则需要重新计算下次触发时间
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(retry.getGroupName(), retry.getSceneName(), namespaceId);
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli());
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(retry.getRetryCount() + 1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
retry.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext));
}
if (RetryStatusEnum.FINISH.getStatus().equals(retryStatusEnum.getStatus())) {
RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retry);
retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli());
SnailJobLog.REMOTE.info("=============手动操作完成============. <|>{}<|>", retryLogMetaDTO);
}
//
// RetryTask retryTask = new RetryTask();
// retryTask.setTaskStatus(requestVO.getRetryStatus());
// retryTaskMapper.update(retryTask, new LambdaUpdateWrapper<RetryTask>()
// .eq(RetryTask::getNamespaceId, namespaceId)
// .eq(RetryTask::getUniqueId, retry.getUniqueId())
// .eq(RetryTask::getGroupName, retry.getGroupName()));
retry.setUpdateDt(LocalDateTime.now());
return retryTaskAccess.updateById(retry);
}
@Override
public int saveRetryTask(final RetrySaveRequestVO retryTaskRequestVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskRequestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new SnailJobServerException("重试状态错误");
}
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_SINGLE.getScene()))
.findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器"));
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(retryTaskRequestVO.getSceneName());
taskContext.setGroupName(retryTaskRequestVO.getGroupName());
taskContext.setInitStatus(retryTaskRequestVO.getRetryStatus());
taskContext.setNamespaceId(namespaceId);
taskContext.setTaskInfos(
Collections.singletonList(TaskContextConverter.INSTANCE.convert(retryTaskRequestVO)));
// 生成任务
taskGenerator.taskGenerator(taskContext);
return 1;
}
@Override
public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(
generateRetryIdempotentIdVO.getGroupName(),
namespaceId);
Assert.notEmpty(serverNodes,
() -> new SnailJobServerException("生成idempotentId失败: 不存在活跃的客户端节点"));
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(),
generateRetryIdempotentIdVO.getSceneName(), namespaceId);
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retrySceneConfig.getSceneName(),
retrySceneConfig.getGroupName(), retrySceneConfig.getNamespaceId(), retrySceneConfig.getRouteKey());
// 委托客户端生成idempotentId
GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO();
generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName());
generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName());
generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr());
generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName());
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(serverNode)
.client(RetryRpcClient.class)
.build();
Result result = rpcClient.generateIdempotentId(generateRetryIdempotentIdDTO);
Assert.notNull(result, () -> new SnailJobServerException("idempotentId生成失败"));
Assert.isTrue(1 == result.getStatus(),
() -> new SnailJobServerException("idempotentId生成失败:请确保参数与执行器名称正确"));
return (String) result.getData();
}
@Override
public int updateRetryExecutorName(final RetryUpdateExecutorNameRequestVO requestVO) {
Retry retry = new Retry();
retry.setExecutorName(requestVO.getExecutorName());
retry.setRetryStatus(requestVO.getRetryStatus());
retry.setUpdateDt(LocalDateTime.now());
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
// 根据重试数据id更新执行器名称
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
return retryTaskAccess.update(retry, new LambdaUpdateWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, requestVO.getGroupName())
.in(Retry::getId, requestVO.getIds()));
}
@Override
@Transactional
public boolean batchDeleteRetry(final BatchDeleteRetryTaskVO requestVO) {
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<Retry> tasks = retryTaskAccess.list(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, requestVO.getGroupName())
.in(Retry::getRetryStatus, ALLOW_DELETE_STATUS)
.in(Retry::getId, requestVO.getIds())
);
Assert.notEmpty(tasks,
() -> new SnailJobServerException("没有可删除的数据, 只有非【处理中】的数据可以删除"));
Assert.isTrue(requestVO.getIds().size() == retryTaskAccess.delete(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getGroupName, requestVO.getGroupName())
.in(Retry::getRetryStatus, ALLOW_DELETE_STATUS)
.in(Retry::getId, requestVO.getIds()))
, () -> new SnailJobServerException("删除重试任务失败, 请检查任务状态是否为已完成或者最大次数"));
Set<Long> uniqueIds = StreamUtils.toSet(tasks, Retry::getId);
retryTaskMapper.delete(new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getTaskStatus, ALLOW_DELETE_STATUS)
.eq(RetryTask::getGroupName, requestVO.getGroupName())
LambdaQueryWrapper<RetryTask> wrapper = new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.in(RetryTask::getRetryId, uniqueIds));
.in(CollUtil.isNotEmpty(groupNames), RetryTask::getGroupName, groupNames)
.eq(StrUtil.isNotBlank(queryVO.getSceneName()), RetryTask::getSceneName, queryVO.getSceneName())
.eq(queryVO.getRetryStatus() != null, RetryTask::getTaskStatus, queryVO.getRetryStatus())
.eq(Objects.nonNull(queryVO.getRetryId()), RetryTask::getRetryId, queryVO.getRetryId())
.between(ObjUtil.isNotNull(queryVO.getDatetimeRange()),
RetryTask::getCreateDt, queryVO.getStartDt(), queryVO.getEndDt())
.select(RetryTask::getGroupName, RetryTask::getId, RetryTask::getSceneName, RetryTask::getTaskStatus,
RetryTask::getCreateDt, RetryTask::getTaskType, RetryTask::getOperationReason, RetryTask::getRetryId)
.orderByDesc(RetryTask::getCreateDt);
retryTaskLogMessageMapper.delete(
PageDTO<RetryTask> retryTaskPageDTO = retryTaskMapper.selectPage(pageDTO, wrapper);
return new PageResult<>(retryTaskPageDTO,
RetryTaskLogResponseVOConverter.INSTANCE.convertList(retryTaskPageDTO.getRecords()));
}
@Override
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(
RetryTaskLogMessageQueryVO queryVO) {
if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) {
RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO();
jobLogResponseVO.setNextStartId(0L);
jobLogResponseVO.setFromIndex(0);
return jobLogResponseVO;
}
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
PageDTO<RetryTaskLogMessage> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(pageDTO,
new LambdaQueryWrapper<RetryTaskLogMessage>()
.select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum)
.ge(RetryTaskLogMessage::getId, queryVO.getStartId())
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, requestVO.getGroupName())
.in(RetryTaskLogMessage::getRetryId, uniqueIds));
return Boolean.TRUE;
.eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId())
.eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName())
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)
.orderByDesc(RetryTaskLogMessage::getCreateDt));
List<RetryTaskLogMessage> records = selectPage.getRecords();
if (CollUtil.isEmpty(records)) {
return new RetryTaskLogMessageResponseVO()
.setFinished(Boolean.TRUE)
.setNextStartId(queryVO.getStartId())
.setFromIndex(0);
}
Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0);
RetryTaskLogMessage firstRecord = records.get(0);
List<Long> ids = Lists.newArrayList(firstRecord.getId());
int total = firstRecord.getLogNum() - fromIndex;
for (int i = 1; i < records.size(); i++) {
RetryTaskLogMessage record = records.get(i);
if (total + record.getLogNum() > queryVO.getSize()) {
break;
}
total += record.getLogNum();
ids.add(record.getId());
}
long nextStartId = 0;
List<Map<String, String>> messages = Lists.newArrayList();
List<RetryTaskLogMessage> jobLogMessages = retryTaskLogMessageMapper.selectList(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.in(RetryTaskLogMessage::getId, ids)
.orderByAsc(RetryTaskLogMessage::getId)
.orderByAsc(RetryTaskLogMessage::getRealTime)
);
for (final RetryTaskLogMessage retryTaskLogMessage : jobLogMessages) {
List<Map<String, String>> originalList = JsonUtil.parseObject(retryTaskLogMessage.getMessage(), List.class);
int size = originalList.size() - fromIndex;
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
.collect(Collectors.toList());
if (messages.size() + size >= queryVO.getSize()) {
messages.addAll(pageList);
nextStartId = retryTaskLogMessage.getId();
fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1;
break;
}
messages.addAll(pageList);
nextStartId = retryTaskLogMessage.getId() + 1;
fromIndex = 0;
}
messages = messages.stream()
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.collect(Collectors.toList());
RetryTaskLogMessageResponseVO responseVO = new RetryTaskLogMessageResponseVO();
responseVO.setMessage(messages);
responseVO.setNextStartId(nextStartId);
responseVO.setFromIndex(fromIndex);
return responseVO;
}
@Override
public Integer parseLogs(ParseLogsVO parseLogsVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(parseLogsVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new SnailJobServerException("重试状态错误");
}
String logStr = parseLogsVO.getLogStr();
String patternString = "<\\|>(.*?)<\\|>";
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(logStr);
List<RetryTaskDTO> waitInsertList = new ArrayList<>();
// 查找匹配的内容并输出
while (matcher.find()) {
String extractedData = matcher.group(1);
if (StrUtil.isBlank(extractedData)) {
continue;
}
List<RetryTaskDTO> retryTaskList = JsonUtil.parseList(extractedData, RetryTaskDTO.class);
if (CollUtil.isNotEmpty(retryTaskList)) {
waitInsertList.addAll(retryTaskList);
}
}
Assert.isFalse(waitInsertList.isEmpty(), () -> new SnailJobServerException("未找到匹配的数据"));
Assert.isTrue(waitInsertList.size() <= 500, () -> new SnailJobServerException("最多只能处理500条数据"));
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_BATCH.getScene()))
.findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器"));
boolean allMatch = waitInsertList.stream()
.allMatch(retryTaskDTO -> retryTaskDTO.getGroupName().equals(parseLogsVO.getGroupName()));
Assert.isTrue(allMatch, () -> new SnailJobServerException("存在数据groupName不匹配请检查您的数据"));
Map<String, List<RetryTaskDTO>> map = StreamUtils.groupByKey(waitInsertList, RetryTaskDTO::getSceneName);
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
map.forEach(((sceneName, retryTaskDTOS) -> {
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(sceneName);
taskContext.setGroupName(parseLogsVO.getGroupName());
taskContext.setNamespaceId(namespaceId);
taskContext.setInitStatus(parseLogsVO.getRetryStatus());
taskContext.setTaskInfos(TaskContextConverter.INSTANCE.convert(retryTaskDTOS));
// 生成任务
taskGenerator.taskGenerator(taskContext);
}));
return waitInsertList.size();
public RetryTaskResponseVO getRetryTaskById(Long id) {
RetryTask retryTask = retryTaskMapper.selectById(id);
return RetryTaskLogResponseVOConverter.INSTANCE.convert(retryTask);
}
@Override
public boolean manualTriggerRetry(ManualTriggerTaskRequestVO requestVO) {
@Transactional
public boolean deleteById(final Long id) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, requestVO.getGroupName())
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
RetryTask retryTask = retryTaskMapper.selectOne(
new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus()))
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getId, id));
Assert.notNull(retryTask, () -> new SnailJobServerException("数据删除失败"));
retryTaskLogMessageMapper.delete(new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName())
.eq(RetryTaskLogMessage::getRetryTaskId, id)
);
Assert.isTrue(count > 0, () -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName()));
return 1 == retryTaskMapper.deleteById(id);
}
List<String> uniqueIds = requestVO.getUniqueIds();
@Override
@Transactional
public boolean batchDelete(final Set<Long> ids) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<RetryTask> retryTasks = retryTaskMapper.selectList(
new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus()))
.eq(RetryTask::getNamespaceId, namespaceId)
.in(RetryTask::getId, ids));
Assert.notEmpty(retryTasks, () -> new SnailJobServerException("数据不存在"));
Assert.isTrue(retryTasks.size() == ids.size(), () -> new SnailJobServerException("数据不存在"));
List<Retry> list = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
// .in(Retry::getUniqueId, uniqueIds)
);
Assert.notEmpty(list, () -> new SnailJobServerException("没有可执行的任务"));
for (Retry retry : list) {
// for (TaskExecutor taskExecutor : taskExecutors) {
// if (taskExecutor.getTaskType().getScene() == TaskExecutorSceneEnum.MANUAL_RETRY.getScene()) {
// taskExecutor.actuator(retry);
// }
// }
for (final RetryTask retryTask : retryTasks) {
retryTaskLogMessageMapper.delete(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, retryTask.getGroupName())
.eq(RetryTaskLogMessage::getRetryTaskId, retryTask.getId()));
}
return 1 == retryTaskMapper.deleteByIds(ids);
}
@Override
public Boolean stopById(Long id) {
Retry retry = retryMapper.selectById(id);
Assert.notNull(retry, () -> new SnailJobServerException("没有可执行的任务"));
TaskStopJobDTO taskStopJobDTO = RetryConverter.INSTANCE.toTaskStopJobDTO(retry);
taskStopJobDTO.setOperationReason(RetryOperationReasonEnum.MANNER_STOP.getReason());
taskStopJobDTO.setNeedUpdateTaskStatus(true);
taskStopJobDTO.setMessage("用户手动触发停止");
retryTaskStopHandler.stop(taskStopJobDTO);
return true;
}
@Override
public boolean manualTriggerCallback(ManualTriggerTaskRequestVO requestVO) {
List<String> uniqueIds = requestVO.getUniqueIds();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, requestVO.getGroupName())
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
Assert.isTrue(count > 0, () -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName()));
List<Retry> list = accessTemplate.getRetryAccess().list(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, namespaceId)
.eq(Retry::getTaskType, SyetemTaskTypeEnum.CALLBACK.getType())
// .in(Retry::getUniqueId, uniqueIds)
);
Assert.notEmpty(list, () -> new SnailJobServerException("没有可执行的任务"));
for (Retry retry : list) {
// for (TaskExecutor taskExecutor : taskExecutors) {
// if (taskExecutor.getTaskType().getScene() == TaskExecutorSceneEnum.MANUAL_CALLBACK.getScene()) {
// taskExecutor.actuator(retry);
// }
// }
}
return true;
}
}