feat(1.4.0-beta1): 1.修复手动执行失败问题 2. 修复删除重试信息失败问题
This commit is contained in:
parent
40dcdb428b
commit
abe08bed5c
@ -16,5 +16,7 @@ import lombok.EqualsAndHashCode;
|
|||||||
public class RetryTaskExecuteDTO extends BaseDTO {
|
public class RetryTaskExecuteDTO extends BaseDTO {
|
||||||
|
|
||||||
private Integer routeKey;
|
private Integer routeKey;
|
||||||
|
|
||||||
|
private Integer retryTaskExecutorScene;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,4 +22,6 @@ public class RetryTaskGeneratorDTO extends BaseDTO {
|
|||||||
private Integer taskType;
|
private Integer taskType;
|
||||||
|
|
||||||
private long nextTriggerAt;
|
private long nextTriggerAt;
|
||||||
|
|
||||||
|
private Integer retryTaskExecutorScene;
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
|
|||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
|
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
|
||||||
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
||||||
|
import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum;
|
||||||
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
|
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
|
||||||
@ -78,7 +79,11 @@ public class RetryExecutor extends AbstractActor {
|
|||||||
|
|
||||||
private void doExecute(RetryTaskExecuteDTO execute) {
|
private void doExecute(RetryTaskExecuteDTO execute) {
|
||||||
LambdaQueryWrapper<Retry> wrapper = new LambdaQueryWrapper<>();
|
LambdaQueryWrapper<Retry> wrapper = new LambdaQueryWrapper<>();
|
||||||
wrapper.eq(Retry::getId, execute.getRetryId()).eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus());
|
|
||||||
|
if (RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene() != execute.getRetryTaskExecutorScene()) {
|
||||||
|
wrapper.eq(Retry::getId, execute.getRetryId()).eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
Retry retry = retryMapper.selectOne(wrapper);
|
Retry retry = retryMapper.selectOne(wrapper);
|
||||||
if (Objects.isNull(retry)) {
|
if (Objects.isNull(retry)) {
|
||||||
// 没有执行中的任务不执行调度
|
// 没有执行中的任务不执行调度
|
||||||
@ -86,6 +91,10 @@ public class RetryExecutor extends AbstractActor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
execute.setNamespaceId(retry.getNamespaceId());
|
||||||
|
execute.setGroupName(retry.getGroupName());
|
||||||
|
execute.setTaskType(retry.getTaskType());
|
||||||
|
|
||||||
if (CollUtil.isEmpty(CacheRegisterTable.getServerNodeSet(retry.getGroupName(), retry.getNamespaceId()))) {
|
if (CollUtil.isEmpty(CacheRegisterTable.getServerNodeSet(retry.getGroupName(), retry.getNamespaceId()))) {
|
||||||
// 无客户端不执行调度
|
// 无客户端不执行调度
|
||||||
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.CANCEL.getStatus(), RetryOperationReasonEnum.NOT_CLIENT);
|
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.CANCEL.getStatus(), RetryOperationReasonEnum.NOT_CLIENT);
|
||||||
@ -104,8 +113,8 @@ public class RetryExecutor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 获取执行的客户端
|
// 获取执行的客户端
|
||||||
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(execute.getRetryId().toString(),
|
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retry.getId().toString(),
|
||||||
execute.getGroupName(), execute.getNamespaceId(), retrySceneConfig.getRouteKey());
|
retry.getGroupName(), retry.getNamespaceId(), retrySceneConfig.getRouteKey());
|
||||||
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(),
|
updateRetryTaskStatus(execute.getRetryTaskId(), RetryTaskStatusEnum.RUNNING.getStatus(),
|
||||||
ClientInfoUtils.generate(serverNode));
|
ClientInfoUtils.generate(serverNode));
|
||||||
|
|
||||||
|
@ -1,53 +0,0 @@
|
|||||||
package com.aizuda.snailjob.server.retry.task.support.timer;
|
|
||||||
|
|
||||||
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
|
||||||
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
|
||||||
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
|
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
||||||
import io.netty.util.Timeout;
|
|
||||||
|
|
||||||
import java.text.MessageFormat;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author: opensnail
|
|
||||||
* @date : 2023-09-22 17:09
|
|
||||||
*/
|
|
||||||
public class CallbackTimerTask extends AbstractTimerTask {
|
|
||||||
public static final String IDEMPOTENT_KEY_PREFIX = "callback_{0}_{1}_{2}";
|
|
||||||
|
|
||||||
private final RetryTimerContext context;
|
|
||||||
|
|
||||||
public CallbackTimerTask(RetryTimerContext context) {
|
|
||||||
this.context = context;
|
|
||||||
super.groupName = context.getGroupName();
|
|
||||||
// super.uniqueId = context.getUniqueId();
|
|
||||||
super.namespaceId = context.getNamespaceId();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doRun(final Timeout timeout) {
|
|
||||||
SnailJobLog.LOCAL.debug("回调任务执行 {}", LocalDateTime.now());
|
|
||||||
AccessTemplate accessTemplate = SnailSpringContext.getBeanByType(AccessTemplate.class);
|
|
||||||
TaskAccess<Retry> retryTaskAccess = accessTemplate.getRetryAccess();
|
|
||||||
Retry retry = retryTaskAccess.one(new LambdaQueryWrapper<Retry>()
|
|
||||||
.eq(Retry::getNamespaceId, context.getNamespaceId())
|
|
||||||
.eq(Retry::getGroupName, context.getGroupName())
|
|
||||||
.eq(Retry::getId, context.getRetryId())
|
|
||||||
.eq(Retry::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
|
|
||||||
if (Objects.isNull(retry)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
|
|
||||||
// taskExecutor.actuator(retry);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String idempotentKey() {
|
|
||||||
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, context.getGroupName(), context.getNamespaceId(), context.getRetryTaskId());
|
|
||||||
}
|
|
||||||
}
|
|
@ -10,14 +10,10 @@ import lombok.Data;
|
|||||||
@Data
|
@Data
|
||||||
public class RetryTimerContext {
|
public class RetryTimerContext {
|
||||||
|
|
||||||
private String namespaceId;
|
|
||||||
|
|
||||||
private String groupName;
|
|
||||||
|
|
||||||
private Long retryId;
|
private Long retryId;
|
||||||
|
|
||||||
private Long retryTaskId;
|
private Long retryTaskId;
|
||||||
|
|
||||||
// private TaskExecutorSceneEnum scene;
|
private Integer retryTaskExecutorScene;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -36,8 +36,6 @@ public class RetryTimerTask extends AbstractTimerTask {
|
|||||||
|
|
||||||
public RetryTimerTask(RetryTimerContext context) {
|
public RetryTimerTask(RetryTimerContext context) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
super.groupName = context.getGroupName();
|
|
||||||
super.namespaceId = context.getNamespaceId();
|
|
||||||
super.retryId = context.getRetryId();
|
super.retryId = context.getRetryId();
|
||||||
super.retryTaskId = context.getRetryTaskId();
|
super.retryTaskId = context.getRetryTaskId();
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,8 @@ package com.aizuda.snailjob.server.web.service.convert;
|
|||||||
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
|
||||||
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
|
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
|
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
|
||||||
|
import org.mapstruct.Mapper;
|
||||||
|
import org.mapstruct.Mapping;
|
||||||
import org.mapstruct.factory.Mappers;
|
import org.mapstruct.factory.Mappers;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -13,6 +15,7 @@ import org.mapstruct.factory.Mappers;
|
|||||||
* @author opensnail
|
* @author opensnail
|
||||||
* @date 2025-02-22
|
* @date 2025-02-22
|
||||||
*/
|
*/
|
||||||
|
@Mapper
|
||||||
public interface RetryConverter {
|
public interface RetryConverter {
|
||||||
RetryConverter INSTANCE = Mappers.getMapper(RetryConverter.class);
|
RetryConverter INSTANCE = Mappers.getMapper(RetryConverter.class);
|
||||||
|
|
||||||
|
@ -159,7 +159,6 @@ public class RetryServiceImpl implements RetryService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
retry.setRetryStatus(requestVO.getRetryStatus());
|
retry.setRetryStatus(requestVO.getRetryStatus());
|
||||||
retry.setGroupName(requestVO.getGroupName());
|
|
||||||
|
|
||||||
// 若恢复重试则需要重新计算下次触发时间
|
// 若恢复重试则需要重新计算下次触发时间
|
||||||
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
|
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
|
||||||
@ -393,7 +392,7 @@ public class RetryServiceImpl implements RetryService {
|
|||||||
// 设置now表示立即执行
|
// 设置now表示立即执行
|
||||||
retryTaskPrepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
|
retryTaskPrepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
|
||||||
retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene());
|
retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene());
|
||||||
|
retryTaskPrepareDTO.setRetryId(retry.getId());
|
||||||
// 准备阶段执行
|
// 准备阶段执行
|
||||||
ActorRef actorRef = ActorGenerator.retryTaskPrepareActor();
|
ActorRef actorRef = ActorGenerator.retryTaskPrepareActor();
|
||||||
actorRef.tell(retryTaskPrepareDTO, actorRef);
|
actorRef.tell(retryTaskPrepareDTO, actorRef);
|
||||||
|
@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjUtil;
|
|||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
|
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
@ -175,7 +176,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
|||||||
|
|
||||||
RetryTask retryTask = retryTaskMapper.selectOne(
|
RetryTask retryTask = retryTaskMapper.selectOne(
|
||||||
new LambdaQueryWrapper<RetryTask>()
|
new LambdaQueryWrapper<RetryTask>()
|
||||||
.in(RetryTask::getTaskStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus()))
|
.in(RetryTask::getTaskStatus, RetryTaskStatusEnum.TERMINAL_STATUS_SET)
|
||||||
.eq(RetryTask::getNamespaceId, namespaceId)
|
.eq(RetryTask::getNamespaceId, namespaceId)
|
||||||
.eq(RetryTask::getId, id));
|
.eq(RetryTask::getId, id));
|
||||||
Assert.notNull(retryTask, () -> new SnailJobServerException("数据删除失败"));
|
Assert.notNull(retryTask, () -> new SnailJobServerException("数据删除失败"));
|
||||||
|
Loading…
Reference in New Issue
Block a user