feat(1.4.0-beta1): 1.场景新增阻塞策略

This commit is contained in:
opensnail 2025-02-22 23:28:39 +08:00
parent 896593d4a0
commit d9155866af
15 changed files with 97 additions and 51 deletions

View File

@ -196,6 +196,7 @@ CREATE TABLE `sj_retry_scene_config`
`deadline_request` bigint(20) unsigned NOT NULL DEFAULT 60000 COMMENT 'Deadline Request 调用链超时 单位毫秒',
`executor_timeout` int(11) unsigned NOT NULL DEFAULT 5 COMMENT '任务执行超时时间,单位秒',
`route_key` tinyint(4) NOT NULL DEFAULT 4 COMMENT '路由策略',
`block_strategy` tinyint(4) NOT NULL DEFAULT 1 COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
`cb_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '回调状态 0、不开启 1、开启',
`cb_trigger_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、默认等级 2、固定间隔时间 3、CRON 表达式',
`cb_max_count` int(11) NOT NULL DEFAULT 16 COMMENT '回调的最大执行次数',
@ -306,7 +307,7 @@ CREATE TABLE `sj_job`
`executor_info` varchar(255) DEFAULT NULL COMMENT '执行器名称',
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
`block_strategy` tinyint(4) NOT NULL DEFAULT 1 COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
`block_strategy` tinyint(4) NOT NULL DEFAULT 1 COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行 4、恢复',
`executor_timeout` int(11) NOT NULL DEFAULT 0 COMMENT '任务执行超时时间,单位秒',
`max_retry_times` int(11) NOT NULL DEFAULT 0 COMMENT '最大重试次数',
`parallel_num` int(11) NOT NULL DEFAULT 1 COMMENT '并行数',

View File

@ -3,18 +3,13 @@ package com.aizuda.snailjob.client.core.callback.future;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.core.context.CallbackContext;
import com.aizuda.snailjob.client.core.client.RetryClient;
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
import com.aizuda.snailjob.client.model.request.DispatchCallbackResultRequest;
import com.aizuda.snailjob.client.model.request.DispatchRetryResultRequest;
import com.aizuda.snailjob.client.model.request.RetryCallbackResultRequest;
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.google.common.util.concurrent.FutureCallback;
import java.util.Objects;
import java.util.concurrent.CancellationException;
/**

View File

@ -27,6 +27,8 @@ public class RetrySceneConfig extends CreateUpdateDt {
private String sceneName;
private Integer blockStrategy;
private Integer sceneStatus;
private Integer maxRetryCount;
@ -61,7 +63,7 @@ public class RetrySceneConfig extends CreateUpdateDt {
/**
* 回调的最大执行次数
*/
private int cbMaxCount = 288;
private int cbMaxCount;
/**
* 回调间隔时间

View File

@ -14,6 +14,7 @@ import java.time.LocalDateTime;
*/
@Data
@TableName("sj_sequence_alloc")
@Deprecated
public class SequenceAlloc implements Serializable {
private static final long serialVersionUID = 1L;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.block;
import com.aizuda.snailjob.common.core.enums.JobBlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.RetryBlockStrategyEnum;
import com.aizuda.snailjob.server.retry.task.support.BlockStrategy;
import org.springframework.beans.factory.InitializingBean;
@ -18,7 +18,7 @@ public abstract class AbstracJobBlockStrategy implements BlockStrategy, Initiali
protected abstract void doBlock(final BlockStrategyContext context);
protected abstract JobBlockStrategyEnum blockStrategyEnum();
protected abstract RetryBlockStrategyEnum blockStrategyEnum();
@Override
public void afterPropertiesSet() throws Exception {

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.block;
import com.aizuda.snailjob.common.core.enums.JobBlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.RetryBlockStrategyEnum;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskGeneratorDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.generator.task.RetryTaskGeneratorHandler;
@ -8,9 +8,9 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
* @author: opensnail
* @date : 2025-02-10
* @since : sj_1.4.0
*/
@Component
@RequiredArgsConstructor
@ -25,7 +25,7 @@ public class ConcurrencyRetryBlockStrategy extends AbstracJobBlockStrategy {
}
@Override
protected JobBlockStrategyEnum blockStrategyEnum() {
return JobBlockStrategyEnum.CONCURRENCY;
protected RetryBlockStrategyEnum blockStrategyEnum() {
return RetryBlockStrategyEnum.CONCURRENCY;
}
}

View File

@ -27,7 +27,7 @@ public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy {
}
@Override
protected JobBlockStrategyEnum blockStrategyEnum() {
return JobBlockStrategyEnum.DISCARD;
protected RetryBlockStrategyEnum blockStrategyEnum() {
return RetryBlockStrategyEnum.DISCARD;
}
}

View File

@ -1,9 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.block;
import com.aizuda.snailjob.common.core.enums.JobBlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskGeneratorDTO;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
@ -45,7 +42,7 @@ public class OverlayRetryBlockStrategy extends AbstracJobBlockStrategy {
}
@Override
protected JobBlockStrategyEnum blockStrategyEnum() {
return JobBlockStrategyEnum.OVERLAY;
protected RetryBlockStrategyEnum blockStrategyEnum() {
return RetryBlockStrategyEnum.OVERLAY;
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.block;
import com.aizuda.snailjob.common.core.enums.JobBlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.RetryBlockStrategyEnum;
import com.aizuda.snailjob.server.retry.task.support.BlockStrategy;
import java.util.concurrent.ConcurrentHashMap;
@ -11,17 +11,17 @@ import java.util.concurrent.ConcurrentHashMap;
* @since : sj_1.4.0
*/
public final class RetryBlockStrategyFactory {
private static final ConcurrentHashMap<JobBlockStrategyEnum, BlockStrategy> CACHE = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<RetryBlockStrategyEnum, BlockStrategy> CACHE = new ConcurrentHashMap<>();
private RetryBlockStrategyFactory() {
}
static void registerBlockStrategy(JobBlockStrategyEnum jobBlockStrategyEnum, BlockStrategy blockStrategy) {
static void registerBlockStrategy(RetryBlockStrategyEnum jobBlockStrategyEnum, BlockStrategy blockStrategy) {
CACHE.put(jobBlockStrategyEnum, blockStrategy);
}
public static BlockStrategy getBlockStrategy(Integer blockStrategy) {
return CACHE.get(JobBlockStrategyEnum.valueOf(blockStrategy));
return CACHE.get(RetryBlockStrategyEnum.valueOf(blockStrategy));
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryPrePareHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
@ -15,6 +16,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum.NOT_COMPLETE;
import static com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum.SUCCESS;
@ -66,7 +68,9 @@ public class RetryTaskPrepareActor extends AbstractActor {
.orderByAsc(RetryTask::getRetryId)
);
if (CollUtil.isEmpty(retryTasks)) {
if (CollUtil.isEmpty(retryTasks)
|| Objects.isNull(prepareDTO.getRetryTaskExecutorScene())
|| RetryTaskExecutorSceneEnum.MANUAL_RETRY.getScene() == prepareDTO.getRetryTaskExecutorScene()) {
RetryTask retryTask = new RetryTask();
retryTask.setTaskStatus(SUCCESS.getStatus());
retryTasks = Lists.newArrayList(retryTask);

View File

@ -206,6 +206,8 @@ public abstract class AbstractGenerator implements TaskGenerator {
retrySceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
retrySceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType());
retrySceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
retrySceneConfig.setCbStatus(StatusEnum.NO.getStatus());
retrySceneConfig.setCbMaxCount(DelayLevelEnum._16.getLevel());
retrySceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(retrySceneConfig),
() -> new SnailJobServerException("init scene error"));

View File

@ -52,7 +52,7 @@ public class RunningRetryPrepareHandler implements RetryPrePareHandler {
log.info("任务执行超时.retryTaskId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getRetryTaskId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务
TaskStopJobDTO stopJobDTO = RetryTaskConverter.INSTANCE.toTaskStopJobDTO(prepare);
stopJobDTO.setOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobDTO.setOperationReason(RetryOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobDTO.setNeedUpdateTaskStatus(true);
retryTaskStopHandler.stop(stopJobDTO);
}

View File

@ -35,6 +35,12 @@ public class SceneConfigRequestVO {
@NotNull(message = "路由策略不能为空")
private Integer routeKey;
/**
* @see: RetryBlockStrategyEnum
*/
@NotNull(message = "阻塞策略不能为空")
private Integer blockStrategy;
/**
* 描述
*/
@ -72,16 +78,19 @@ public class SceneConfigRequestVO {
/**
* 回调状态 0不开启 1开启
*/
@NotNull(message = "回调状态不能为空")
private Integer cbStatus;
/**
* 回调触发类型
*/
@NotNull(message = "回调触发类型不能为空")
private Integer cbTriggerType;
/**
* 回调的最大执行次数
*/
@NotNull(message = "回调的最大执行次数不能为空")
private int cbMaxCount;
/**

View File

@ -32,6 +32,8 @@ public class SceneConfigResponseVO {
private Integer routeKey;
private Integer blockStrategy;
private Integer executorTimeout;
private LocalDateTime createDt;

View File

@ -60,19 +60,7 @@ public class SceneConfigServiceImpl implements SceneConfigService {
private final GroupHandler groupHandler;
private final RetrySummaryMapper retrySummaryMapper;
private static void checkExecuteInterval(SceneConfigRequestVO requestVO) {
if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(),
WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getBackOff())) {
if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) {
throw new SnailJobServerException("间隔时间不得小于10");
}
} else if (requestVO.getBackOff() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(requestVO.getTriggerInterval()) < 10 * 1000) {
throw new SnailJobServerException("间隔时间不得小于10");
}
}
}
@Override
public PageResult<List<SceneConfigResponseVO>> getSceneConfigPageList(SceneConfigQueryVO queryVO) {
@ -111,7 +99,7 @@ public class SceneConfigServiceImpl implements SceneConfigService {
@Override
public Boolean saveSceneConfig(SceneConfigRequestVO requestVO) {
checkExecuteInterval(requestVO);
checkExecuteInterval(requestVO.getBackOff(), requestVO.getTriggerInterval());
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
ConfigAccess<RetrySceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
Assert.isTrue(0 == sceneConfigAccess.count(
@ -130,6 +118,13 @@ public class SceneConfigServiceImpl implements SceneConfigService {
retrySceneConfig.setTriggerInterval(StrUtil.EMPTY);
}
if (Objects.equals(requestVO.getCbStatus(), StatusEnum.YES.getStatus())) {
checkExecuteInterval(requestVO.getCbTriggerType(), requestVO.getCbTriggerInterval());
if (requestVO.getCbTriggerType() == WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()) {
retrySceneConfig.setCbTriggerInterval(StrUtil.EMPTY);
}
}
Assert.isTrue(1 == sceneConfigAccess.insert(retrySceneConfig),
() -> new SnailJobServerException("failed to insert scene. retrySceneConfig:[{}]",
JsonUtil.toJsonString(retrySceneConfig)));
@ -142,7 +137,7 @@ public class SceneConfigServiceImpl implements SceneConfigService {
@Override
public Boolean updateSceneConfig(SceneConfigRequestVO requestVO) {
checkExecuteInterval(requestVO);
checkExecuteInterval(requestVO.getBackOff(), requestVO.getTriggerInterval());
RetrySceneConfig retrySceneConfig = SceneConfigConverter.INSTANCE.toRetrySceneConfig(requestVO);
// 防止更新
retrySceneConfig.setSceneName(null);
@ -151,6 +146,13 @@ public class SceneConfigServiceImpl implements SceneConfigService {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
if (Objects.equals(requestVO.getCbStatus(), StatusEnum.YES.getStatus())) {
checkExecuteInterval(requestVO.getCbTriggerType(), requestVO.getCbTriggerInterval());
if (requestVO.getCbTriggerType() == WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()) {
retrySceneConfig.setCbTriggerInterval(StrUtil.EMPTY);
}
}
retrySceneConfig.setTriggerInterval(
Optional.ofNullable(retrySceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(retrySceneConfig,
@ -161,6 +163,8 @@ public class SceneConfigServiceImpl implements SceneConfigService {
() -> new SnailJobServerException("failed to update scene. retrySceneConfig:[{}]",
JsonUtil.toJsonString(retrySceneConfig)));
// 同步配置到客户端
SyncConfigHandler.addSyncTask(requestVO.getGroupName(), namespaceId);
return Boolean.TRUE;
@ -244,12 +248,18 @@ public class SceneConfigServiceImpl implements SceneConfigService {
TaskAccess<RetryDeadLetter> retryTaskTaskAccess = accessTemplate.getRetryDeadLetterAccess();
for (String groupName : groupNames) {
List<Retry> retries = retryTaskAccess.listPage(new PageDTO<>(1, 1),
new LambdaQueryWrapper<Retry>().in(Retry::getSceneName, sceneNames).orderByAsc(Retry::getId)).getRecords();
new LambdaQueryWrapper<Retry>()
.eq(Retry::getGroupName, groupName)
.in(Retry::getSceneName, sceneNames)
.orderByAsc(Retry::getId)).getRecords();
Assert.isTrue(CollUtil.isEmpty(retries),
() -> new SnailJobServerException("删除重试场景失败, 存在【重试任务】请先删除【重试任务】在重试"));
List<RetryDeadLetter> retryDeadLetters = retryTaskTaskAccess.listPage(new PageDTO<>(1, 1),
new LambdaQueryWrapper<RetryDeadLetter>().in(RetryDeadLetter::getSceneName, sceneNames).orderByAsc(RetryDeadLetter::getId)).getRecords();
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, groupName)
.in(RetryDeadLetter::getSceneName, sceneNames)
.orderByAsc(RetryDeadLetter::getId)).getRecords();
Assert.isTrue(CollUtil.isEmpty(retryDeadLetters),
() -> new SnailJobServerException("删除重试场景失败, 存在【死信任务】请先删除【死信任务】在重试"));
}
@ -257,7 +267,6 @@ public class SceneConfigServiceImpl implements SceneConfigService {
Assert.isTrue(ids.size() == accessTemplate.getSceneConfigAccess().delete(queryWrapper),
() -> new SnailJobServerException("删除重试场景失败, 请检查场景状态是否关闭状态"));
List<RetrySummary> retrySummaries = retrySummaryMapper.selectList(
new LambdaQueryWrapper<RetrySummary>()
.select(RetrySummary::getId)
@ -265,6 +274,7 @@ public class SceneConfigServiceImpl implements SceneConfigService {
.in(RetrySummary::getGroupName, groupNames)
.in(RetrySummary::getSceneName, sceneNames)
);
if (CollUtil.isNotEmpty(retrySummaries)) {
Assert.isTrue(retrySummaries.size() == retrySummaryMapper.deleteByIds(StreamUtils.toSet(retrySummaries, RetrySummary::getId))
, () -> new SnailJobServerException("删除汇总表数据失败"));
@ -277,10 +287,13 @@ public class SceneConfigServiceImpl implements SceneConfigService {
Set<String> groupNameSet = Sets.newHashSet();
Set<String> sceneNameSet = Sets.newHashSet();
for (final SceneConfigRequestVO request : requests) {
checkExecuteInterval(request);
groupNameSet.add(request.getGroupName());
sceneNameSet.add(request.getSceneName());
for (final SceneConfigRequestVO requestVO : requests) {
checkExecuteInterval(requestVO.getBackOff(), requestVO.getTriggerInterval());
if (Objects.equals(requestVO.getCbStatus(), StatusEnum.YES.getStatus())) {
checkExecuteInterval(requestVO.getCbTriggerType(), requestVO.getCbTriggerInterval());
}
groupNameSet.add(requestVO.getGroupName());
sceneNameSet.add(requestVO.getSceneName());
}
groupHandler.validateGroupExistence(groupNameSet, namespaceId);
@ -305,6 +318,12 @@ public class SceneConfigServiceImpl implements SceneConfigService {
retrySceneConfig.setTriggerInterval(StrUtil.EMPTY);
}
if (Objects.equals(retrySceneConfig.getCbStatus(), StatusEnum.YES.getStatus())) {
if (retrySceneConfig.getCbTriggerType() == WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()) {
retrySceneConfig.setCbTriggerInterval(StrUtil.EMPTY);
}
}
Assert.isTrue(1 == sceneConfigAccess.insert(retrySceneConfig),
() -> new SnailJobServerException("failed to insert scene. retrySceneConfig:[{}]",
JsonUtil.toJsonString(retrySceneConfig)));
@ -323,4 +342,18 @@ public class SceneConfigServiceImpl implements SceneConfigService {
setId(config.getId());
}
}
private static void checkExecuteInterval(Integer backOff, String triggerInterval) {
if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(),
WaitStrategies.WaitStrategyEnum.RANDOM.getType()).contains(backOff)) {
if (Integer.parseInt(triggerInterval) < 10) {
throw new SnailJobServerException("间隔时间不得小于10");
}
} else if (backOff == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(triggerInterval) < 10 * 1000) {
throw new SnailJobServerException("间隔时间不得小于10");
}
}
}
}