From 9b403daea5bb2d9c23331a430eb2846925392a21 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Tue, 16 Apr 2024 18:32:33 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.0):=20=E4=BC=98=E5=8C=96=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E9=80=9A=E7=9F=A5=E5=92=8C=E6=96=B0=E5=A2=9E=E8=AE=A4?= =?UTF-8?q?=E8=AF=81=E5=BC=82=E5=B8=B8=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 通知配置表合并 2. 新增认证异常码5001 3. 告警模块优化 --- doc/sql/snail_job_mysql.sql | 53 ++++-------- .../SnailJobAuthenticationException.java | 33 ++++++++ .../core/handler/RestExceptionHandler.java | 18 +++- .../datasource/access/AccessTemplate.java | 14 +--- .../datasource/access/ConfigAccess.java | 17 ++-- .../access/config/AbstractConfigAccess.java | 58 ++++++------- .../access/config/SceneConfigAccess.java | 27 +++--- .../mapper/RetrySummaryMapper.java | 10 +-- .../persistence/mapper/SceneConfigMapper.java | 4 +- .../persistence/po/NotifyConfig.java | 10 ++- ...SceneConfig.java => RetrySceneConfig.java} | 4 +- .../template/mapper/SceneConfigMapper.xml | 2 +- .../mysql/mapper/RetrySummaryMapper.xml | 2 +- .../server/common/alarm/AbstractAlarm.java | 68 ++++++++++++--- .../common/alarm/AbstractFlowControl.java | 28 ------- .../server/common/alarm/AbstractJobAlarm.java | 54 +----------- .../common/alarm/AbstractRetryAlarm.java | 45 +--------- .../server/common/dto/NotifyConfigInfo.java | 8 +- .../listener/JobTaskFailAlarmListener.java | 17 ++-- .../generator/task/AbstractGenerator.java | 47 +++++------ .../retry/task/support/RetryContext.java | 4 +- .../support/context/CallbackRetryContext.java | 8 +- .../MaxAttemptsPersistenceRetryContext.java | 8 +- .../actor/exec/ExecCallbackUnitActor.java | 14 ++-- .../dispatch/actor/exec/ExecUnitActor.java | 12 +-- .../dispatch/actor/result/FailureActor.java | 6 +- .../actor/scan/AbstractScanGroup.java | 34 ++++---- .../actor/scan/ScanCallbackTaskActor.java | 4 +- .../actor/scan/ScanRetryTaskActor.java | 8 +- .../dispatch/task/AbstractTaskExecutor.java | 12 +-- .../dispatch/task/CallbackTaskExecutor.java | 12 +-- .../task/ManualCallbackTaskExecutor.java | 12 +-- .../task/ManualRetryTaskExecutor.java | 16 ++-- .../dispatch/task/RetryTaskExecutor.java | 18 ++-- .../RetryTaskFailDeadLetterAlarmListener.java | 6 ++ ...tryTaskFailMoreThresholdAlarmListener.java | 6 ++ .../AuthenticationInterceptor.java | 26 +++--- .../model/request/NotifyConfigRequestVO.java | 12 ++- .../service/convert/SceneConfigConverter.java | 4 +- .../SceneConfigResponseVOConverter.java | 6 +- .../service/impl/DashBoardServiceImpl.java | 10 +-- .../service/impl/NotifyConfigServiceImpl.java | 10 +-- .../impl/RetryDeadLetterServiceImpl.java | 22 +++-- .../service/impl/RetryTaskServiceImpl.java | 23 ++--- .../service/impl/SceneConfigServiceImpl.java | 84 +++++++++---------- 45 files changed, 418 insertions(+), 478 deletions(-) create mode 100644 snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobAuthenticationException.java rename snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/{SceneConfig.java => RetrySceneConfig.java} (91%) delete mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractFlowControl.java diff --git a/doc/sql/snail_job_mysql.sql b/doc/sql/snail_job_mysql.sql index b9fc34cd..88653c97 100644 --- a/doc/sql/snail_job_mysql.sql +++ b/doc/sql/snail_job_mysql.sql @@ -48,21 +48,22 @@ CREATE TABLE `sj_group_config` CREATE TABLE `sj_notify_config` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', - `group_name` varchar(64) NOT NULL COMMENT '组名称', - `scene_name` varchar(64) NOT NULL COMMENT '场景名称', - `notify_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知状态 0、未启用 1、启用', - `notify_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信', - `notify_attribute` varchar(512) NOT NULL COMMENT '配置属性', - `notify_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '通知阈值', - `notify_scene` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知场景', - `rate_limiter_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '限流状态 0、未启用 1、启用', - `rate_limiter_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '每秒限流阈值', - `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `group_name` varchar(64) NOT NULL COMMENT '组名称', + `business_id` varchar(64) NOT NULL COMMENT '业务id (job_id或workflow_id或scene_name)', + `system_task_type` tinyint(4) NOT NULL DEFAULT 3 COMMENT '任务类型 1. 重试任务 2. 重试回调 3、JOB任务 4、WORKFLOW任务', + `notify_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知状态 0、未启用 1、启用', + `notify_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信', + `notify_attribute` varchar(512) NOT NULL COMMENT '配置属性', + `notify_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '通知阈值', + `notify_scene` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知场景', + `rate_limiter_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '限流状态 0、未启用 1、启用', + `rate_limiter_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '每秒限流阈值', + `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), - KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`) + KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `business_id`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='通知配置' @@ -170,7 +171,7 @@ CREATE TABLE `sj_retry_task_log_message` DEFAULT CHARSET = utf8mb4 COMMENT ='任务调度日志信息记录表' ; -CREATE TABLE `sj_scene_config` +CREATE TABLE `sj_retry_scene_config` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', @@ -384,28 +385,6 @@ CREATE TABLE `sj_job_task_batch` AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次'; -CREATE TABLE `sj_job_notify_config` -( - `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', - `group_name` varchar(64) NOT NULL COMMENT '组名称', - `job_id` bigint(20) NOT NULL COMMENT '任务id', - `notify_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知状态 0、未启用 1、启用', - `notify_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信', - `notify_attribute` varchar(512) NOT NULL COMMENT '配置属性', - `notify_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '通知阈值', - `notify_scene` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知场景', - `rate_limiter_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '限流状态 0、未启用 1、启用', - `rate_limiter_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '每秒限流阈值', - `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', - PRIMARY KEY (`id`), - KEY `idx_namespace_id_group_name_job_id` (`namespace_id`, `group_name`, job_id) -) ENGINE = InnoDB - AUTO_INCREMENT = 4 - DEFAULT CHARSET = utf8mb4 COMMENT ='job通知配置'; - CREATE TABLE `sj_job_summary` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobAuthenticationException.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobAuthenticationException.java new file mode 100644 index 00000000..8ddf2797 --- /dev/null +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobAuthenticationException.java @@ -0,0 +1,33 @@ +package com.aizuda.snailjob.common.core.exception; + +import lombok.Getter; + +/** + * @author: opensnail + * @date : 2024-04-16 + * @since : 1.0.0 + */ +@Getter +public class SnailJobAuthenticationException extends BaseSnailJobException { + private final Integer errorCode = 5001; + + public SnailJobAuthenticationException(final String message) { + super(message); + } + + public SnailJobAuthenticationException(String message, Object... arguments) { + super(message, arguments); + } + + public SnailJobAuthenticationException(String message, Object[] arguments, Throwable cause) { + super(message, arguments, cause); + } + + public SnailJobAuthenticationException(String message, Object argument, Throwable cause) { + super(message, argument, cause); + } + + public SnailJobAuthenticationException(String message, Object argument) { + super(message, argument); + } +} diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/handler/RestExceptionHandler.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/handler/RestExceptionHandler.java index ef0b4a78..82a201eb 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/handler/RestExceptionHandler.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/handler/RestExceptionHandler.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.common.core.handler; import com.aizuda.snailjob.common.core.exception.AbstractError; import com.aizuda.snailjob.common.core.exception.BaseSnailJobException; +import com.aizuda.snailjob.common.core.exception.SnailJobAuthenticationException; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.model.Result; import jakarta.validation.ConstraintViolation; @@ -56,12 +57,27 @@ public class RestExceptionHandler { * @param ex * @return */ - @ExceptionHandler({BaseSnailJobException.class}) + @ExceptionHandler({BaseSnailJobException.class, SnailJobAuthenticationException.class}) public Result onBusinessException(BaseSnailJobException ex) { log.error("异常类 businessException", ex); + if (ex instanceof final SnailJobAuthenticationException authenticationException) { + return new Result(authenticationException.getErrorCode(), ex.getMessage()); + } + return new Result(0, ex.getMessage()); } + /** + * 业务异常 + * + * @param ex + * @return + */ + @ExceptionHandler({SnailJobAuthenticationException.class}) + public Result onBusinessException(SnailJobAuthenticationException ex) { + return new Result(ex.getErrorCode(), ex.getMessage()); + } + /** * 400错误 * diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java index 814a1571..8507d84e 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java @@ -11,15 +11,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; -import com.aizuda.snailjob.template.datasource.access.config.GroupConfigAccess; -import com.aizuda.snailjob.template.datasource.access.config.NotifyConfigAccess; -import com.aizuda.snailjob.template.datasource.access.config.SceneConfigAccess; -import com.aizuda.snailjob.template.datasource.access.task.RetryDeadLetterTaskAccess; -import com.aizuda.snailjob.template.datasource.access.task.RetryTaskAccess; -import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum; -import com.aizuda.snailjob.template.datasource.exception.SnailJobDatasourceException; -import com.aizuda.snailjob.template.datasource.persistence.po.*; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.springframework.stereotype.Component; import java.util.HashMap; @@ -76,8 +68,8 @@ public class AccessTemplate { * * @return {@link SceneConfigAccess} 获取场景配置操作类 */ - public ConfigAccess getSceneConfigAccess() { - return (ConfigAccess) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.SCENE.name())) + public ConfigAccess getSceneConfigAccess() { + return (ConfigAccess) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.SCENE.name())) .orElseThrow(() -> new SnailJobDatasourceException("not supports operation type")); } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/ConfigAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/ConfigAccess.java index f8f5b26b..6ae60234 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/ConfigAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/ConfigAccess.java @@ -4,12 +4,7 @@ import com.aizuda.snailjob.common.core.enums.NotifySceneEnum; import com.aizuda.snailjob.server.model.dto.ConfigDTO; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; -import com.aizuda.snailjob.common.core.enums.NotifySceneEnum; -import com.aizuda.snailjob.server.model.dto.ConfigDTO; -import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -45,9 +40,9 @@ public interface ConfigAccess extends Access { * @param groupName 组名称 * @param sceneName 场景名称 * @param namespaceId 命名空间 - * @return {@link SceneConfig} 场景配置 + * @return {@link RetrySceneConfig} 场景配置 */ - SceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId); + RetrySceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId); /** * 获取通知配置 @@ -83,9 +78,9 @@ public interface ConfigAccess extends Access { * 获取场景配置 * * @param groupName 组名称 - * @return {@link SceneConfig} 场景配置 + * @return {@link RetrySceneConfig} 场景配置 */ - List getSceneConfigByGroupName(String groupName); + List getSceneConfigByGroupName(String groupName); /** * 获取已开启的组配置信息 @@ -113,7 +108,7 @@ public interface ConfigAccess extends Access { * * @return 场景配置列表 */ - List getAllConfigSceneList(); + List getAllConfigSceneList(); /** * 获取配置版本号 diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java index ecb18e88..38353dda 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java @@ -11,18 +11,8 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyConfigMa import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.utils.DbUtils; -import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; -import com.aizuda.snailjob.common.core.enums.NotifySceneEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.server.model.dto.ConfigDTO; -import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyConfigMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; @@ -68,20 +58,20 @@ public abstract class AbstractConfigAccess implements ConfigAccess { Integer notifyScene) { return notifyConfigMapper.selectList( new LambdaQueryWrapper().eq(NotifyConfig::getGroupName, groupName) - .eq(NotifyConfig::getSceneName, sceneName) + .eq(NotifyConfig::getBusinessId, sceneName) .eq(NotifyConfig::getNotifyScene, notifyScene)); } - protected SceneConfig getByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) { - return sceneConfigMapper.selectOne(new LambdaQueryWrapper() - .eq(SceneConfig::getNamespaceId, namespaceId) - .eq(SceneConfig::getGroupName, groupName) - .eq(SceneConfig::getSceneName, sceneName)); + protected RetrySceneConfig getByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) { + return sceneConfigMapper.selectOne(new LambdaQueryWrapper() + .eq(RetrySceneConfig::getNamespaceId, namespaceId) + .eq(RetrySceneConfig::getGroupName, groupName) + .eq(RetrySceneConfig::getSceneName, sceneName)); } - protected List getSceneConfigs(String groupName) { - return sceneConfigMapper.selectList(new LambdaQueryWrapper() - .eq(SceneConfig::getGroupName, groupName)); + protected List getSceneConfigs(String groupName) { + return sceneConfigMapper.selectList(new LambdaQueryWrapper() + .eq(RetrySceneConfig::getGroupName, groupName)); } protected GroupConfig getByGroupName(String groupName, final String namespaceId) { @@ -109,7 +99,7 @@ public abstract class AbstractConfigAccess implements ConfigAccess { } @Override - public SceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) { + public RetrySceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) { return getByGroupNameAndSceneName(groupName, sceneName, namespaceId); } @@ -130,7 +120,7 @@ public abstract class AbstractConfigAccess implements ConfigAccess { } @Override - public List getSceneConfigByGroupName(String groupName) { + public List getSceneConfigByGroupName(String groupName) { return getSceneConfigs(groupName); } @@ -148,20 +138,20 @@ public abstract class AbstractConfigAccess implements ConfigAccess { return Collections.EMPTY_SET; } - LambdaQueryWrapper sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper() - .select(SceneConfig::getSceneName) - .eq(SceneConfig::getGroupName, groupName); + LambdaQueryWrapper sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper() + .select(RetrySceneConfig::getSceneName) + .eq(RetrySceneConfig::getGroupName, groupName); if (StatusEnum.YES.getStatus().equals(groupConfig.getGroupStatus())) { - sceneConfigLambdaQueryWrapper.eq(SceneConfig::getSceneStatus, StatusEnum.NO.getStatus()); + sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getSceneStatus, StatusEnum.NO.getStatus()); } - List sceneConfigs = sceneConfigMapper.selectList(sceneConfigLambdaQueryWrapper); - if (CollectionUtils.isEmpty(sceneConfigs)) { + List retrySceneConfigs = sceneConfigMapper.selectList(sceneConfigLambdaQueryWrapper); + if (CollectionUtils.isEmpty(retrySceneConfigs)) { return Collections.EMPTY_SET; } - return sceneConfigs.stream().map(SceneConfig::getSceneName).collect(Collectors.toSet()); + return retrySceneConfigs.stream().map(RetrySceneConfig::getSceneName).collect(Collectors.toSet()); } @Override @@ -178,9 +168,9 @@ public abstract class AbstractConfigAccess implements ConfigAccess { } @Override - public List getAllConfigSceneList() { - List allSystemConfigSceneList = sceneConfigMapper.selectList( - new LambdaQueryWrapper().orderByAsc(SceneConfig::getId)); + public List getAllConfigSceneList() { + List allSystemConfigSceneList = sceneConfigMapper.selectList( + new LambdaQueryWrapper().orderByAsc(RetrySceneConfig::getId)); if (CollectionUtils.isEmpty(allSystemConfigSceneList)) { return Collections.EMPTY_LIST; } @@ -226,10 +216,10 @@ public abstract class AbstractConfigAccess implements ConfigAccess { configDTO.setNotifyList(notifies); - List sceneConfig = getSceneConfigByGroupName(groupName); + List retrySceneConfig = getSceneConfigByGroupName(groupName); List sceneList = new ArrayList<>(); - for (SceneConfig config : sceneConfig) { + for (RetrySceneConfig config : retrySceneConfig) { ConfigDTO.Scene scene = new ConfigDTO.Scene(); scene.setSceneName(config.getSceneName()); scene.setDdl(config.getDeadlineRequest()); diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/SceneConfigAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/SceneConfigAccess.java index a69ef0bb..b5b90490 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/SceneConfigAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/SceneConfigAccess.java @@ -2,8 +2,7 @@ package com.aizuda.snailjob.template.datasource.access.config; import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum; import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -17,7 +16,7 @@ import java.util.List; * @since 2.2.0 */ @Component -public class SceneConfigAccess extends AbstractConfigAccess { +public class SceneConfigAccess extends AbstractConfigAccess { @Override public boolean supports(String operationType) { @@ -27,42 +26,42 @@ public class SceneConfigAccess extends AbstractConfigAccess { } @Override - public List list(LambdaQueryWrapper query) { + public List list(LambdaQueryWrapper query) { return sceneConfigMapper.selectList(query); } @Override - public int update(SceneConfig sceneConfig, LambdaUpdateWrapper query) { - return sceneConfigMapper.update(sceneConfig, query); + public int update(RetrySceneConfig retrySceneConfig, LambdaUpdateWrapper query) { + return sceneConfigMapper.update(retrySceneConfig, query); } @Override - public int updateById(SceneConfig sceneConfig) { - return sceneConfigMapper.updateById(sceneConfig); + public int updateById(RetrySceneConfig retrySceneConfig) { + return sceneConfigMapper.updateById(retrySceneConfig); } @Override - public int delete(LambdaQueryWrapper query) { + public int delete(LambdaQueryWrapper query) { return sceneConfigMapper.delete(query); } @Override - public int insert(SceneConfig sceneConfig) { - return sceneConfigMapper.insert(sceneConfig); + public int insert(RetrySceneConfig retrySceneConfig) { + return sceneConfigMapper.insert(retrySceneConfig); } @Override - public SceneConfig one(LambdaQueryWrapper query) { + public RetrySceneConfig one(LambdaQueryWrapper query) { return sceneConfigMapper.selectOne(query); } @Override - public PageDTO listPage(PageDTO iPage, LambdaQueryWrapper query) { + public PageDTO listPage(PageDTO iPage, LambdaQueryWrapper query) { return sceneConfigMapper.selectPage(iPage, query); } @Override - public long count(LambdaQueryWrapper query) { + public long count(LambdaQueryWrapper query) { return sceneConfigMapper.selectCount(query); } } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/RetrySummaryMapper.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/RetrySummaryMapper.java index bbd43ab6..b66485cc 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/RetrySummaryMapper.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/RetrySummaryMapper.java @@ -3,11 +3,7 @@ package com.aizuda.snailjob.template.datasource.persistence.mapper; import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardCardResponseDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardLineResponseDO; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySummary; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; -import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardRetryLineResponseDO.Rank; -import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardRetryLineResponseDO.Task; -import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardCardResponseDO; -import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardLineResponseDO; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardRetryLineResponseDO; import com.baomidou.mybatisplus.core.conditions.Wrapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -34,11 +30,11 @@ public interface RetrySummaryMapper extends BaseMapper { List retryTaskBarList(@Param("ew") Wrapper wrapper); - IPage retryTaskList(@Param("ew") Wrapper wrapper, Page page); + IPage retryTaskList(@Param("ew") Wrapper wrapper, Page page); List retryLineList(@Param("dateFormat") String dateFormat, @Param("ew") Wrapper wrapper); List dashboardRank(@Param("ew") Wrapper wrapper); - long countRetryTask(@Param("ew") Wrapper wrapper); + long countRetryTask(@Param("ew") Wrapper wrapper); } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/SceneConfigMapper.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/SceneConfigMapper.java index 99286fc3..1e3c91c9 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/SceneConfigMapper.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/SceneConfigMapper.java @@ -1,8 +1,8 @@ package com.aizuda.snailjob.template.datasource.persistence.mapper; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -public interface SceneConfigMapper extends BaseMapper { +public interface SceneConfigMapper extends BaseMapper { } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java index 0ccb16a1..0a2638a8 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java @@ -21,7 +21,15 @@ public class NotifyConfig implements Serializable { private String groupName; - private String sceneName; + /** + * 业务id (scene_name或job_id或workflow_id) + */ + private String businessId; + + /** + * 任务类型 1、重试任务 2、回调任务、 3、JOB任务 4、WORKFLOW任务 + */ + private Integer systemTaskType; private Integer notifyStatus; diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/SceneConfig.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetrySceneConfig.java similarity index 91% rename from snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/SceneConfig.java rename to snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetrySceneConfig.java index 49c521fa..fd908d07 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/SceneConfig.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/RetrySceneConfig.java @@ -16,8 +16,8 @@ import java.time.LocalDateTime; * @since 2.5.0 */ @Data -@TableName("sj_scene_config") -public class SceneConfig implements Serializable { +@TableName("sj_retry_scene_config") +public class RetrySceneConfig implements Serializable { private static final long serialVersionUID = 1L; diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/SceneConfigMapper.xml b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/SceneConfigMapper.xml index 70f3c353..2382e7fa 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/SceneConfigMapper.xml +++ b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/SceneConfigMapper.xml @@ -1,7 +1,7 @@ - + diff --git a/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetrySummaryMapper.xml b/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetrySummaryMapper.xml index f0c8ca00..9f41bb5e 100644 --- a/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetrySummaryMapper.xml +++ b/snail-job-datasource/snail-job-mysql-datasource/src/main/resources/mysql/mapper/RetrySummaryMapper.xml @@ -111,7 +111,7 @@ SELECT group_name AS groupName, SUM(CASE WHEN (scene_status = 1) THEN 1 ELSE 0 END) AS run, COUNT(*) AS total - FROM sj_scene_config + FROM sj_retry_scene_config ${ew.customSqlSegment} GROUP BY namespace_id, group_name diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java index 9e1d1900..f89fc9cc 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java @@ -5,13 +5,20 @@ import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.AlarmInfoConverter; import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.common.cache.CacheNotifyRateLimiter; import com.aizuda.snailjob.server.common.dto.AlarmInfo; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.server.common.triple.Triple; +import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.RateLimiter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -19,8 +26,10 @@ import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.util.CollectionUtils; +import java.text.MessageFormat; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @author xiaowoniu @@ -28,7 +37,7 @@ import java.util.concurrent.TimeUnit; * @since 2.5.0 */ @Slf4j -public abstract class AbstractAlarm extends AbstractFlowControl implements ApplicationListener, Runnable, +public abstract class AbstractAlarm implements ApplicationListener, Runnable, Lifecycle { @Autowired @@ -53,14 +62,14 @@ public abstract class AbstractAlarm groupNames = new HashSet<>(); // 获取所有的场景名称 - Set sceneNames = new HashSet<>(); + Set sceneNames = new HashSet<>(); // 转换AlarmDTO 为了下面循环发送使用 - Map, List> waitSendAlarmInfos = convertAlarmDTO( + Map, List> waitSendAlarmInfos = convertAlarmDTO( alarmInfos, namespaceIds, groupNames, sceneNames); // 批量获取通知配置 - Map, List> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames); + Map, List> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames); // 循环发送消息 waitSendAlarmInfos.forEach((key, list) -> { @@ -78,12 +87,39 @@ public abstract class AbstractAlarm, List> convertAlarmDTO(List alarmData, Set namespaceIds, Set groupNames, Set sceneNames); + protected Map, List> obtainNotifyConfig(Set namespaceIds, Set groupNames, Set businessIds) { + + // 批量获取所需的通知配置 + List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( + new LambdaQueryWrapper() + .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) + .in(NotifyConfig::getSystemTaskType, getSystemTaskType()) + .eq(NotifyConfig::getNotifyScene, getNotifyScene()) + .in(NotifyConfig::getNamespaceId, namespaceIds) + .in(NotifyConfig::getGroupName, groupNames) + .in(NotifyConfig::getBusinessId, businessIds) + ); + if (CollectionUtils.isEmpty(notifyConfigs)) { + return Maps.newHashMap(); + } + + List notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs); + + return notifyConfigInfos.stream() + .collect(Collectors.groupingBy(i -> { + + String namespaceId = i.getNamespaceId(); + String groupName = i.getGroupName(); + + return ImmutableTriple.of(namespaceId, groupName, i.getBusinessId()); + })); + + } + + protected abstract List getSystemTaskType(); + + protected abstract Map, List> convertAlarmDTO(List alarmData, Set namespaceIds, Set groupNames, Set sceneNames); - protected abstract Map, - List> obtainNotifyConfig(Set namespaceIds, - Set groupNames, - Set sceneNames); protected abstract List poll() throws InterruptedException; @@ -111,11 +147,10 @@ public abstract class AbstractAlarm implements ApplicationListener { - - protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) { - RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key); - if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) { - CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold)); - } - - return rateLimiter; - } -} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java index 9a7c30c3..3b335e15 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java @@ -1,21 +1,11 @@ package com.aizuda.snailjob.server.common.alarm; -import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.server.common.AlarmInfoConverter; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.enums.SystemModeEnum; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobNotifyConfigMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig; -import com.aizuda.snailjob.server.common.triple.ImmutableTriple; -import com.aizuda.snailjob.server.common.triple.Triple; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.collect.Maps; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEvent; -import org.springframework.util.CollectionUtils; import java.text.MessageFormat; import java.util.List; @@ -28,57 +18,19 @@ import java.util.stream.Collectors; * @date 2023-12-03 10:19:19 * @since 2.5.0 */ -public abstract class AbstractJobAlarm extends AbstractAlarm { - - @Autowired - private JobNotifyConfigMapper jobNotifyConfigMapper; +public abstract class AbstractJobAlarm extends AbstractAlarm { @Override - protected Map, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set jobIds) { + protected Map, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set jobIds) { return alarmInfos.stream().collect(Collectors.groupingBy(i -> { String namespaceId = i.getNamespaceId(); String groupName = i.getGroupName(); - Long jobId = i.getJobId(); + String jobId = String.valueOf(i.getJobId()); namespaceIds.add(namespaceId); groupNames.add(groupName); jobIds.add(jobId); return ImmutableTriple.of(namespaceId, groupName, jobId); })); } - - @Override - protected Map, List> obtainNotifyConfig(Set namespaceIds, Set groupNames, Set jobIds) { - - // 批量获取所需的通知配置 - List jobNotifyConfigs = jobNotifyConfigMapper.selectList( - new LambdaQueryWrapper() - .eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) - .eq(JobNotifyConfig::getNotifyScene, getNotifyScene()) - .in(JobNotifyConfig::getNamespaceId, namespaceIds) - .in(JobNotifyConfig::getGroupName, groupNames) - .in(JobNotifyConfig::getJobId, jobIds) - ); - if (CollectionUtils.isEmpty(jobNotifyConfigs)) { - return Maps.newHashMap(); - } - - List notifyConfigInfos = AlarmInfoConverter.INSTANCE.jobToNotifyConfigInfos(jobNotifyConfigs); - - return notifyConfigInfos.stream() - .collect(Collectors.groupingBy(i -> { - - String namespaceId = i.getNamespaceId(); - String groupName = i.getGroupName(); - Long jobId = i.getJobId(); - - return ImmutableTriple.of(namespaceId, groupName, jobId); - })); - - } - - @Override - protected String rateLimiterKey(NotifyConfigInfo notifyConfig) { - return MessageFormat.format("{}_{}", SystemModeEnum.JOB.name(), notifyConfig.getId()); - } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java index b358f58c..fac087b2 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java @@ -1,19 +1,11 @@ package com.aizuda.snailjob.server.common.alarm; -import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.server.common.AlarmInfoConverter; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.enums.SystemModeEnum; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.server.common.triple.ImmutableTriple; -import com.aizuda.snailjob.server.common.triple.Triple; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.collect.Maps; import org.springframework.context.ApplicationEvent; -import org.springframework.util.CollectionUtils; import java.text.MessageFormat; import java.util.List; @@ -26,7 +18,7 @@ import java.util.stream.Collectors; * @date 2023-12-03 10:19:19 * @since 2.5.0 */ -public abstract class AbstractRetryAlarm extends AbstractAlarm { +public abstract class AbstractRetryAlarm extends AbstractAlarm { @Override protected Map, List> convertAlarmDTO( List alarmDataList, @@ -49,39 +41,4 @@ public abstract class AbstractRetryAlarm extends Abs })); } - - @Override - protected Map, List> obtainNotifyConfig(Set namespaceIds, Set groupNames, Set sceneNames) { - - // 批量获取所需的通知配置 - List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( - new LambdaQueryWrapper() - .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) - .eq(NotifyConfig::getNotifyScene, getNotifyScene()) - .in(NotifyConfig::getNamespaceId, namespaceIds) - .in(NotifyConfig::getGroupName, groupNames) - .in(NotifyConfig::getSceneName, sceneNames) - ); - - if (CollectionUtils.isEmpty(notifyConfigs)) { - return Maps.newHashMap(); - } - - List notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs); - return notifyConfigInfos.stream() - .collect(Collectors.groupingBy(config -> { - - String namespaceId = config.getNamespaceId(); - String groupName = config.getGroupName(); - String sceneName = config.getSceneName(); - - return ImmutableTriple.of(namespaceId, groupName, sceneName); - })); - } - - @Override - protected String rateLimiterKey(NotifyConfigInfo notifyConfig) { - return MessageFormat.format("{}_{}", SystemModeEnum.RETRY.name(), notifyConfig.getId()); - } - } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/NotifyConfigInfo.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/NotifyConfigInfo.java index 7382e3b6..d611e5cd 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/NotifyConfigInfo.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/NotifyConfigInfo.java @@ -16,11 +16,11 @@ public class NotifyConfigInfo { private String groupName; - // job告警时使用 - private Long jobId; + // 业务id (scene_name或job_id或workflow_id) + private String businessId; - // retry告警时使用 - private String sceneName; + // 任务类型 1、重试任务 2、回调任务、 3、JOB任务 4、WORKFLOW任务 + private Integer systemTaskType; private Integer notifyStatus; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/listener/JobTaskFailAlarmListener.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/listener/JobTaskFailAlarmListener.java index 4381e6cd..49e5f16f 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/listener/JobTaskFailAlarmListener.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/listener/JobTaskFailAlarmListener.java @@ -8,6 +8,8 @@ import com.aizuda.snailjob.server.common.AlarmInfoConverter; import com.aizuda.snailjob.server.common.alarm.AbstractJobAlarm; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; +import com.aizuda.snailjob.server.common.triple.Triple; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; @@ -15,11 +17,14 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMa import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -31,16 +36,14 @@ import java.util.concurrent.LinkedBlockingQueue; * @since 2.5.0 */ @Component -@Slf4j +@RequiredArgsConstructor public class JobTaskFailAlarmListener extends AbstractJobAlarm { - - @Autowired - private JobTaskBatchMapper jobTaskBatchMapper; + private final JobTaskBatchMapper jobTaskBatchMapper; /** * job任务失败数据 */ - private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); private static String jobTaskFailTextMessagesFormatter = "{}环境 Job任务执行失败 \n" + @@ -90,6 +93,10 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm getSystemTaskType() { + return Lists.newArrayList(SyetemTaskTypeEnum.JOB); + } @Override public void onApplicationEvent(JobTaskFailAlarmEvent event) { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/generator/task/AbstractGenerator.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/generator/task/AbstractGenerator.java index 27e2bf8d..30bf4d13 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/generator/task/AbstractGenerator.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/generator/task/AbstractGenerator.java @@ -12,7 +12,6 @@ import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.generator.id.IdGenerator; import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext.TaskInfo; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter; import com.aizuda.snailjob.server.common.WaitStrategy; @@ -23,9 +22,9 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -56,7 +55,7 @@ public abstract class AbstractGenerator implements TaskGenerator { public void taskGenerator(TaskContext taskContext) { SnailJobLog.LOCAL.debug("received report data. {}", JsonUtil.toJsonString(taskContext)); - SceneConfig sceneConfig = checkAndInitScene(taskContext); + RetrySceneConfig retrySceneConfig = checkAndInitScene(taskContext); //客户端上报任务根据幂等id去重 List taskInfos = taskContext.getTaskInfos().stream().collect(Collectors.collectingAndThen( @@ -86,7 +85,7 @@ public abstract class AbstractGenerator implements TaskGenerator { LocalDateTime now = LocalDateTime.now(); for (TaskContext.TaskInfo taskInfo : taskInfos) { Pair, List> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo, - sceneConfig); + retrySceneConfig); waitInsertTasks.addAll(pair.getKey()); waitInsertTaskLogs.addAll(pair.getValue()); } @@ -107,11 +106,11 @@ public abstract class AbstractGenerator implements TaskGenerator { * @param retryTaskMap * @param now * @param taskInfo - * @param sceneConfig + * @param retrySceneConfig */ private Pair, List> doConvertTask(Map> retryTaskMap, TaskContext taskContext, LocalDateTime now, - TaskContext.TaskInfo taskInfo, SceneConfig sceneConfig) { + TaskContext.TaskInfo taskInfo, RetrySceneConfig retrySceneConfig) { List waitInsertTasks = new ArrayList<>(); List waitInsertTaskLogs = new ArrayList<>(); @@ -144,9 +143,9 @@ public abstract class AbstractGenerator implements TaskGenerator { WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setNextTriggerAt(now); - waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); + waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); waitStrategyContext.setDelayLevel(1); - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext))); waitInsertTasks.add(retryTask); @@ -161,11 +160,11 @@ public abstract class AbstractGenerator implements TaskGenerator { protected abstract Integer initStatus(TaskContext taskContext); - private SceneConfig checkAndInitScene(TaskContext taskContext) { - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess() + private RetrySceneConfig checkAndInitScene(TaskContext taskContext) { + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() .getSceneConfigByGroupNameAndSceneName(taskContext.getGroupName(), taskContext.getSceneName(), taskContext.getNamespaceId()); - if (Objects.isNull(sceneConfig)) { + if (Objects.isNull(retrySceneConfig)) { GroupConfig groupConfig = accessTemplate.getGroupConfigAccess() .getGroupConfigByGroupName(taskContext.getGroupName(), taskContext.getNamespaceId()); @@ -180,11 +179,11 @@ public abstract class AbstractGenerator implements TaskGenerator { taskContext.getGroupName(), taskContext.getSceneName()); } else { // 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景 - sceneConfig = initScene(taskContext.getGroupName(), taskContext.getSceneName(), taskContext.getNamespaceId()); + retrySceneConfig = initScene(taskContext.getGroupName(), taskContext.getSceneName(), taskContext.getNamespaceId()); } } - return sceneConfig; + return retrySceneConfig; } @@ -195,18 +194,18 @@ public abstract class AbstractGenerator implements TaskGenerator { * @param groupName 组名称 * @param sceneName 场景名称 */ - private SceneConfig initScene(String groupName, String sceneName, String namespaceId) { - SceneConfig sceneConfig = new SceneConfig(); - sceneConfig.setNamespaceId(namespaceId); - sceneConfig.setGroupName(groupName); - sceneConfig.setSceneName(sceneName); - sceneConfig.setSceneStatus(StatusEnum.YES.getStatus()); - sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()); - sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel()); - sceneConfig.setDescription("自动初始化场景"); - Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig), + private RetrySceneConfig initScene(String groupName, String sceneName, String namespaceId) { + RetrySceneConfig retrySceneConfig = new RetrySceneConfig(); + retrySceneConfig.setNamespaceId(namespaceId); + retrySceneConfig.setGroupName(groupName); + retrySceneConfig.setSceneName(sceneName); + retrySceneConfig.setSceneStatus(StatusEnum.YES.getStatus()); + retrySceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()); + retrySceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel()); + retrySceneConfig.setDescription("自动初始化场景"); + Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(retrySceneConfig), () -> new SnailJobServerException("init scene error")); - return sceneConfig; + return retrySceneConfig; } /** diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryContext.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryContext.java index bdbd0942..4fe4f4a8 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryContext.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryContext.java @@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support; import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import java.util.Set; @@ -81,5 +81,5 @@ public interface RetryContext { * * @return 路由策略 */ - SceneConfig sceneConfig(); + RetrySceneConfig sceneConfig(); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/CallbackRetryContext.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/CallbackRetryContext.java index 14f3bd23..a0990a12 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/CallbackRetryContext.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/CallbackRetryContext.java @@ -4,7 +4,7 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.retry.task.support.RetryContext; import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import lombok.Data; import java.util.Objects; @@ -51,7 +51,7 @@ public class CallbackRetryContext implements RetryContext { /** * 场景配置 */ - private SceneConfig sceneConfig; + private RetrySceneConfig retrySceneConfig; @Override public boolean hasException() { @@ -59,8 +59,8 @@ public class CallbackRetryContext implements RetryContext { } @Override - public SceneConfig sceneConfig() { - return sceneConfig; + public RetrySceneConfig sceneConfig() { + return retrySceneConfig; } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java index 5013cbf2..f4ab0c4c 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java @@ -4,7 +4,7 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.retry.task.support.RetryContext; import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import lombok.Data; import lombok.Getter; @@ -54,7 +54,7 @@ public class MaxAttemptsPersistenceRetryContext implements RetryContext { /** * 场景配置 */ - private SceneConfig sceneConfig; + private RetrySceneConfig retrySceneConfig; @Override public void setCallResult(V v) { @@ -72,8 +72,8 @@ public class MaxAttemptsPersistenceRetryContext implements RetryContext { } @Override - public SceneConfig sceneConfig() { - return sceneConfig; + public RetrySceneConfig sceneConfig() { + return retrySceneConfig; } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java index 753f3405..e7713e96 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -19,7 +19,7 @@ import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -54,7 +54,7 @@ public class ExecCallbackUnitActor extends AbstractActor { CallbackRetryContext context = (CallbackRetryContext) retryExecutor.getRetryContext(); RetryTask retryTask = context.getRetryTask(); RegisterNodeInfo serverNode = context.getServerNode(); - SceneConfig sceneConfig = context.getSceneConfig(); + RetrySceneConfig retrySceneConfig = context.getRetrySceneConfig(); // RetryTaskLogDTO retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask); // retryTaskLog.setTriggerTime(LocalDateTime.now()); @@ -64,7 +64,7 @@ public class ExecCallbackUnitActor extends AbstractActor { if (Objects.nonNull(serverNode)) { retryExecutor.call((Callable>) () -> { - Result result = callClient(retryTask, serverNode, sceneConfig); + Result result = callClient(retryTask, serverNode, retrySceneConfig); return result; }); } @@ -87,7 +87,7 @@ public class ExecCallbackUnitActor extends AbstractActor { * @param callbackTask {@link RetryTask} 回调任务 * @return 重试结果返回值 */ - private Result callClient(RetryTask callbackTask, RegisterNodeInfo serverNode, SceneConfig sceneConfig) { + private Result callClient(RetryTask callbackTask, RegisterNodeInfo serverNode, RetrySceneConfig retrySceneConfig) { String retryTaskUniqueId = callbackRetryTaskHandler.getRetryTaskUniqueId(callbackTask.getUniqueId()); @@ -116,9 +116,9 @@ public class ExecCallbackUnitActor extends AbstractActor { RetryRpcClient rpcClient = RequestBuilder.newBuilder() .nodeInfo(serverNode) .failover(Boolean.TRUE) - .routeKey(sceneConfig.getRouteKey()) - .allocKey(sceneConfig.getSceneName()) - .executorTimeout(sceneConfig.getExecutorTimeout()) + .routeKey(retrySceneConfig.getRouteKey()) + .allocKey(retrySceneConfig.getSceneName()) + .executorTimeout(retrySceneConfig.getExecutorTimeout()) .client(RetryRpcClient.class) .build(); return rpcClient.callback(retryCallbackDTO); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java index 96de3c73..cf55d12c 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java @@ -18,7 +18,7 @@ import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; @@ -46,7 +46,7 @@ public class ExecUnitActor extends AbstractActor { MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext(); RetryTask retryTask = context.getRetryTask(); RegisterNodeInfo serverNode = context.getServerNode(); - SceneConfig sceneConfig = context.getSceneConfig(); + RetrySceneConfig retrySceneConfig = context.getRetrySceneConfig(); try { @@ -54,7 +54,7 @@ public class ExecUnitActor extends AbstractActor { retryExecutor.call((Callable>) () -> { - Result result = callClient(retryTask, serverNode, sceneConfig); + Result result = callClient(retryTask, serverNode, retrySceneConfig); // 回调接口请求成功,处理返回值 if (StatusEnum.YES.getStatus() != result.getStatus()) { } else { @@ -85,7 +85,7 @@ public class ExecUnitActor extends AbstractActor { * @param retryTask {@link RetryTask} 需要重试的数据 * @return 重试结果返回值 */ - private Result callClient(RetryTask retryTask, RegisterNodeInfo serverNode, SceneConfig sceneConfig) { + private Result callClient(RetryTask retryTask, RegisterNodeInfo serverNode, RetrySceneConfig retrySceneConfig) { DispatchRetryDTO dispatchRetryDTO = new DispatchRetryDTO(); dispatchRetryDTO.setIdempotentId(retryTask.getIdempotentId()); @@ -106,8 +106,8 @@ public class ExecUnitActor extends AbstractActor { .nodeInfo(serverNode) .failover(Boolean.TRUE) .allocKey(retryTask.getSceneName()) - .routeKey(sceneConfig.getRouteKey()) - .executorTimeout(sceneConfig.getExecutorTimeout()) + .routeKey(retrySceneConfig.getRouteKey()) + .executorTimeout(retrySceneConfig.getExecutorTimeout()) .client(RetryRpcClient.class) .build(); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java index 01207e98..686afa4e 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/result/FailureActor.java @@ -16,7 +16,7 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -64,7 +64,7 @@ public class FailureActor extends AbstractActor { try { // 超过最大等级 - SceneConfig sceneConfig = + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), retryTask.getNamespaceId()); @@ -76,7 +76,7 @@ public class FailureActor extends AbstractActor { if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { maxRetryCount = systemProperties.getCallback().getMaxCount(); } else { - maxRetryCount = sceneConfig.getMaxRetryCount(); + maxRetryCount = retrySceneConfig.getMaxRetryCount(); } if (maxRetryCount <= retryTask.getRetryCount()) { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index ca39d3c4..8c94b3f7 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -18,8 +18,8 @@ import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorS import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerWheel; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import io.netty.util.TimerTask; @@ -116,17 +116,17 @@ public abstract class AbstractScanGroup extends AbstractActor { private void processRetryPartitionTasks(List partitionTasks, final ScanTask scanTask) { // 批次查询场景 - Map sceneConfigMap = getSceneConfigMap(partitionTasks, scanTask); + Map sceneConfigMap = getSceneConfigMap(partitionTasks, scanTask); List waitUpdateRetryTasks = new ArrayList<>(); for (PartitionTask task : partitionTasks) { RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task; - SceneConfig sceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName()); - if (Objects.isNull(sceneConfig)) { + RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName()); + if (Objects.isNull(retrySceneConfig)) { continue; } - RetryTask retryTask = processRetryTask(retryPartitionTask, sceneConfig); + RetryTask retryTask = processRetryTask(retryPartitionTask, retrySceneConfig); waitUpdateRetryTasks.add(retryTask); } @@ -147,22 +147,22 @@ public abstract class AbstractScanGroup extends AbstractActor { } - private Map getSceneConfigMap(final List partitionTasks, ScanTask scanTask) { + private Map getSceneConfigMap(final List partitionTasks, ScanTask scanTask) { Set sceneNameSet = partitionTasks.stream() .map(partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()).collect(Collectors.toSet()); - List sceneConfigs = accessTemplate.getSceneConfigAccess() - .list(new LambdaQueryWrapper() - .select(SceneConfig::getBackOff, SceneConfig::getTriggerInterval, SceneConfig::getSceneName) - .eq(SceneConfig::getNamespaceId, scanTask.getNamespaceId()) - .eq(SceneConfig::getGroupName, scanTask.getGroupName()) - .in(SceneConfig::getSceneName, sceneNameSet)); - return sceneConfigs.stream() - .collect(Collectors.toMap(SceneConfig::getSceneName, i -> i)); + List retrySceneConfigs = accessTemplate.getSceneConfigAccess() + .list(new LambdaQueryWrapper() + .select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName) + .eq(RetrySceneConfig::getNamespaceId, scanTask.getNamespaceId()) + .eq(RetrySceneConfig::getGroupName, scanTask.getGroupName()) + .in(RetrySceneConfig::getSceneName, sceneNameSet)); + return retrySceneConfigs.stream() + .collect(Collectors.toMap(RetrySceneConfig::getSceneName, i -> i)); } - private RetryTask processRetryTask(RetryPartitionTask partitionTask, SceneConfig sceneConfig) { + private RetryTask processRetryTask(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig) { RetryTask retryTask = new RetryTask(); - retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask, sceneConfig)); + retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask, retrySceneConfig)); retryTask.setId(partitionTask.getId()); return retryTask; } @@ -174,7 +174,7 @@ public abstract class AbstractScanGroup extends AbstractActor { protected abstract void putLastId(String groupName, Long lastId); protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, - final SceneConfig sceneConfig); + final RetrySceneConfig retrySceneConfig); protected abstract TimerTask timerTask(RetryPartitionTask partitionTask); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java index daa19691..5ede015d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java @@ -10,7 +10,7 @@ import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyCon import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.snailjob.server.retry.task.support.timer.CallbackTimerTask; import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -59,7 +59,7 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { @Override protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask, - final SceneConfig sceneConfig) { + final RetrySceneConfig retrySceneConfig) { long triggerInterval = systemProperties.getCallback().getTriggerInterval(); WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType()); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index d4fc80c5..7c6a750c 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -11,7 +11,7 @@ import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyCon import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext; import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -59,7 +59,7 @@ public class ScanRetryTaskActor extends AbstractScanGroup { } @Override - protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final SceneConfig sceneConfig) { + protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final RetrySceneConfig retrySceneConfig) { // 更新下次触发时间 WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); @@ -71,10 +71,10 @@ public class ScanRetryTaskActor extends AbstractScanGroup { } waitStrategyContext.setNextTriggerAt(DateUtils.toEpochMilli(nextTriggerAt)); - waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); + waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1); // 更新触发时间, 任务进入时间轮 - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); return DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java index 5edf7226..68060a61 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java @@ -12,8 +12,8 @@ import com.aizuda.snailjob.server.retry.task.support.RetryContext; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -44,11 +44,11 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing // 重试次数累加 retryCountIncrement(retryTask); - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), retryTask.getNamespaceId()); - RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask, sceneConfig); - RetryExecutor executor = builderResultRetryExecutor(retryContext, sceneConfig); + RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask, retrySceneConfig); + RetryExecutor executor = builderResultRetryExecutor(retryContext, retrySceneConfig); if (!preCheck(retryContext, executor)) { return; @@ -92,10 +92,10 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing } protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask, - final SceneConfig sceneConfig); + final RetrySceneConfig retrySceneConfig); protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext, - final SceneConfig sceneConfig); + final RetrySceneConfig retrySceneConfig); protected abstract ActorRef getActorRef(); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java index 2f616a09..7c3750a2 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java @@ -12,7 +12,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.springframework.stereotype.Component; /** @@ -27,22 +27,22 @@ public class CallbackTaskExecutor extends AbstractTaskExecutor { @Override protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask, - final SceneConfig sceneConfig) { + final RetrySceneConfig retrySceneConfig) { CallbackRetryContext retryContext = new CallbackRetryContext<>(); retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId())); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId())); retryContext.setServerNode( clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(), retryTask.getNamespaceId(), - sceneConfig.getRouteKey())); - retryContext.setSceneConfig(sceneConfig); + retrySceneConfig.getRouteKey())); + retryContext.setRetrySceneConfig(retrySceneConfig); return retryContext; } @Override - protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) { + protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final RetrySceneConfig retrySceneConfig) { return RetryBuilder.newBuilder() .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatus()) diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java index a2b9534a..488b4905 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java @@ -15,7 +15,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.springframework.stereotype.Component; /** @@ -30,22 +30,22 @@ public class ManualCallbackTaskExecutor extends AbstractTaskExecutor { @Override protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask, - final SceneConfig sceneConfig) { + final RetrySceneConfig retrySceneConfig) { CallbackRetryContext retryContext = new CallbackRetryContext<>(); retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId())); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId())); retryContext.setServerNode( clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(), retryTask.getNamespaceId(), - sceneConfig.getRouteKey())); - retryContext.setSceneConfig(sceneConfig); + retrySceneConfig.getRouteKey())); + retryContext.setRetrySceneConfig(retrySceneConfig); return retryContext; } @Override - protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) { + protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final RetrySceneConfig retrySceneConfig) { return RetryBuilder.newBuilder() .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatus()) diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java index 24358cd1..d2edd492 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java @@ -16,7 +16,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.springframework.stereotype.Component; /** @@ -32,21 +32,21 @@ public class ManualRetryTaskExecutor extends AbstractTaskExecutor { @Override protected RetryContext> builderRetryContext(final String groupName, final RetryTask retryTask, - final SceneConfig sceneConfig) { + final RetrySceneConfig retrySceneConfig) { MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); retryContext.setRetryTask(retryTask); retryContext.setSceneBlacklist( - accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId())); + accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId())); retryContext.setServerNode( clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), - retryTask.getGroupName(), retryTask.getNamespaceId(), sceneConfig.getRouteKey())); - retryContext.setSceneConfig(sceneConfig); + retryTask.getGroupName(), retryTask.getNamespaceId(), retrySceneConfig.getRouteKey())); + retryContext.setRetrySceneConfig(retrySceneConfig); return retryContext; } @Override protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext, - final SceneConfig sceneConfig) { + final RetrySceneConfig retrySceneConfig) { RetryTask retryTask = retryContext.getRetryTask(); return RetryBuilder.newBuilder() @@ -69,9 +69,9 @@ public class ManualRetryTaskExecutor extends AbstractTaskExecutor { private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName, String namespaceId) { - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess() + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() .getSceneConfigByGroupNameAndSceneName(groupName, sceneName, namespaceId); - Integer backOff = sceneConfig.getBackOff(); + Integer backOff = retrySceneConfig.getBackOff(); return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/RetryTaskExecutor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/RetryTaskExecutor.java index afec1064..99670b6d 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/RetryTaskExecutor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/task/RetryTaskExecutor.java @@ -13,7 +13,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.springframework.stereotype.Component; /** @@ -29,27 +29,27 @@ public class RetryTaskExecutor extends AbstractTaskExecutor { @Override protected RetryContext> builderRetryContext(final String groupName, final RetryTask retryTask, - final SceneConfig sceneConfig) { + final RetrySceneConfig retrySceneConfig) { MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId())); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId())); retryContext.setServerNode( clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(), retryTask.getNamespaceId(), - sceneConfig.getRouteKey())); - retryContext.setSceneConfig(sceneConfig); + retrySceneConfig.getRouteKey())); + retryContext.setRetrySceneConfig(retrySceneConfig); return retryContext; } @Override protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext, - final SceneConfig sceneConfig) { + final RetrySceneConfig retrySceneConfig) { return RetryBuilder.>newBuilder() .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatusCode()) - .withWaitStrategy(getWaitWaitStrategy(sceneConfig)) + .withWaitStrategy(getWaitWaitStrategy(retrySceneConfig)) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) .withFilterStrategy(FilterStrategies.sceneBlackFilter()) .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) @@ -64,8 +64,8 @@ public class RetryTaskExecutor extends AbstractTaskExecutor { return TaskExecutorSceneEnum.AUTO_RETRY; } - private WaitStrategy getWaitWaitStrategy(SceneConfig sceneConfig) { - Integer backOff = sceneConfig.getBackOff(); + private WaitStrategy getWaitWaitStrategy(RetrySceneConfig retrySceneConfig) { + Integer backOff = retrySceneConfig.getBackOff(); return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java index b4c7ccf4..fe1eeff4 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java @@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.Lifecycle; import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; @@ -46,6 +47,11 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm 业务数据:{} \n" + "> 时间:{} \n"; + @Override + protected List getSystemTaskType() { + return Lists.newArrayList(SyetemTaskTypeEnum.RETRY); + } + @Override protected List poll() throws InterruptedException { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java index 95222b64..7208dba8 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java @@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.Lifecycle; import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm; import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; +import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; @@ -88,6 +89,11 @@ public class RetryTaskFailMoreThresholdAlarmListener extends SnailJobLog.LOCAL.info("RetryTaskFailMoreThresholdAlarmListener started"); } + @Override + protected List getSystemTaskType() { + return Lists.newArrayList(SyetemTaskTypeEnum.RETRY); + } + @Override protected int getNotifyScene() { return NotifySceneEnum.RETRY_TASK_REACH_THRESHOLD.getNotifyScene(); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/interceptor/AuthenticationInterceptor.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/interceptor/AuthenticationInterceptor.java index 995bafa1..dc3aa59b 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/interceptor/AuthenticationInterceptor.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/interceptor/AuthenticationInterceptor.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.web.interceptor; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.exception.SnailJobAuthenticationException; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.web.model.request.UserSessionVO; import com.aizuda.snailjob.server.web.annotation.LoginRequired; @@ -23,6 +24,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.aizuda.snailjob.common.core.util.JsonUtil; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; +import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.web.method.HandlerMethod; @@ -41,17 +43,13 @@ import java.util.stream.Collectors; * @date:2023-04-26 12:52 */ @Configuration +@RequiredArgsConstructor public class AuthenticationInterceptor implements HandlerInterceptor { - public static final String AUTHENTICATION = "SNAIL-JOB-AUTH"; public static final String NAMESPACE_ID = "SNAIL-JOB-NAMESPACE-ID"; - - @Autowired - private SystemUserMapper systemUserMapper; - @Autowired - private NamespaceMapper namespaceMapper; - @Autowired - private SystemUserPermissionMapper systemUserPermissionMapper; + private final SystemUserMapper systemUserMapper; + private final NamespaceMapper namespaceMapper; + private final SystemUserPermissionMapper systemUserPermissionMapper; @Override public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object object) throws Exception { @@ -75,11 +73,11 @@ public class AuthenticationInterceptor implements HandlerInterceptor { if (loginRequired.required()) { // 执行认证 if (token == null) { - throw new SnailJobServerException("登陆过期,请重新登陆"); + throw new SnailJobAuthenticationException("登陆过期,请重新登陆"); } if (StrUtil.isBlank(namespaceId)) { - throw new SnailJobServerException("{} 命名空间不存在", namespaceId); + throw new SnailJobAuthenticationException("{} 命名空间不存在", namespaceId); } // 获取 token 中的 user id @@ -87,12 +85,12 @@ public class AuthenticationInterceptor implements HandlerInterceptor { try { systemUser = JsonUtil.parseObject(JWT.decode(token).getAudience().get(0), SystemUser.class); } catch (JWTDecodeException j) { - throw new SnailJobServerException("登陆过期,请重新登陆"); + throw new SnailJobAuthenticationException("登陆过期,请重新登陆"); } systemUser = systemUserMapper.selectById(systemUser.getId()); if (Objects.isNull(systemUser)) { - throw new SnailJobServerException("用户不存在"); + throw new SnailJobAuthenticationException("用户不存在"); } Long count = namespaceMapper.selectCount( @@ -122,7 +120,7 @@ public class AuthenticationInterceptor implements HandlerInterceptor { try { jwtVerifier.verify(token); } catch (JWTVerificationException e) { - throw new SnailJobServerException("登陆过期,请重新登陆"); + throw new SnailJobAuthenticationException("登陆过期,请重新登陆"); } RoleEnum role = loginRequired.role(); @@ -132,7 +130,7 @@ public class AuthenticationInterceptor implements HandlerInterceptor { if (role == RoleEnum.ADMIN) { if (role != RoleEnum.getEnumTypeMap().get(systemUser.getRole())) { - throw new SnailJobServerException("不具备访问权限"); + throw new SnailJobAuthenticationException("不具备访问权限"); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java index 861fc8db..fe5474de 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigRequestVO.java @@ -20,7 +20,17 @@ public class NotifyConfigRequestVO { @Pattern(regexp = "^[A-Za-z0-9_]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母和下划线") private String groupName; - private String sceneName; + /** + * 业务id (scene_name或job_id或workflow_id) + */ + @NotBlank(message = "业务id不能为空") + private String businessId; + + /** + * 任务类型 1、重试任务 2、回调任务、 3、JOB任务 4、WORKFLOW任务 + */ + @NotNull(message = "任务类型不能为空") + private Integer systemTaskType; @NotNull(message = "通知状态不能为空") private Integer notifyStatus; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigConverter.java index 84885cd8..fe048177 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigConverter.java @@ -1,7 +1,7 @@ package com.aizuda.snailjob.server.web.service.convert; import com.aizuda.snailjob.server.web.model.request.SceneConfigRequestVO; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -14,6 +14,6 @@ public interface SceneConfigConverter { SceneConfigConverter INSTANCE = Mappers.getMapper(SceneConfigConverter.class); - SceneConfig toSceneConfigRequestVO(SceneConfigRequestVO requestVO); + RetrySceneConfig toSceneConfigRequestVO(SceneConfigRequestVO requestVO); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigResponseVOConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigResponseVOConverter.java index 225cd811..dd213714 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigResponseVOConverter.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/SceneConfigResponseVOConverter.java @@ -1,7 +1,7 @@ package com.aizuda.snailjob.server.web.service.convert; import com.aizuda.snailjob.server.web.model.response.SceneConfigResponseVO; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -16,7 +16,7 @@ public interface SceneConfigResponseVOConverter { SceneConfigResponseVOConverter INSTANCE = Mappers.getMapper(SceneConfigResponseVOConverter.class); - SceneConfigResponseVO convert(SceneConfig sceneConfig); + SceneConfigResponseVO convert(RetrySceneConfig retrySceneConfig); - List batchConvert(List sceneConfigs); + List batchConvert(List retrySceneConfigs); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/DashBoardServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/DashBoardServiceImpl.java index a2d448c4..c5ee99d0 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/DashBoardServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/DashBoardServiceImpl.java @@ -40,12 +40,10 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetrySummaryMa import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobSummary; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetrySummary; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; import com.aizuda.snailjob.template.datasource.utils.DbUtils; -import com.aizuda.snailjob.server.web.model.response.DashboardLineResponseVO; -import com.aizuda.snailjob.server.web.service.convert.*; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.StringUtils; @@ -160,9 +158,9 @@ public class DashBoardServiceImpl implements DashBoardService { DashboardRetryLineResponseVO dashboardRetryLineResponseVO = new DashboardRetryLineResponseVO(); // 重试任务列表 Page pager = new Page<>(baseQueryVO.getPage(), baseQueryVO.getSize()); - LambdaQueryWrapper wrapper = new LambdaQueryWrapper() - .eq(SceneConfig::getNamespaceId, namespaceId) - .in(CollUtil.isNotEmpty(groupNames), SceneConfig::getGroupName, groupNames); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(RetrySceneConfig::getNamespaceId, namespaceId) + .in(CollUtil.isNotEmpty(groupNames), RetrySceneConfig::getGroupName, groupNames); // 针对SQL Server的分页COUNT, 自定义statement ID if (DbTypeEnum.SQLSERVER == DbUtils.getDbType()) { pager.setSearchCount(false); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java index f2cb0f6d..f1f421a6 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/NotifyConfigServiceImpl.java @@ -16,10 +16,9 @@ import com.aizuda.snailjob.server.web.util.UserSessionUtils; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.ConfigAccess; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.server.web.service.convert.NotifyConfigResponseVOConverter; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.time.LocalDateTime; @@ -30,10 +29,9 @@ import java.util.List; * @date : 2022-03-03 11:17 */ @Service +@RequiredArgsConstructor public class NotifyConfigServiceImpl implements NotifyConfigService { - - @Autowired - private AccessTemplate accessTemplate; + private final AccessTemplate accessTemplate; @Override public PageResult> getNotifyConfigList(NotifyConfigQueryVO queryVO) { @@ -51,7 +49,7 @@ public class NotifyConfigServiceImpl implements NotifyConfigService { queryWrapper.eq(NotifyConfig::getGroupName, queryVO.getGroupName()); } if (StrUtil.isNotBlank(queryVO.getSceneName())) { - queryWrapper.eq(NotifyConfig::getSceneName, queryVO.getSceneName()); + queryWrapper.eq(NotifyConfig::getBusinessId, queryVO.getSceneName()); } queryWrapper.orderByDesc(NotifyConfig::getId); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java index 1c56ece7..ec252955 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryDeadLetterServiceImpl.java @@ -24,11 +24,9 @@ import com.aizuda.snailjob.template.datasource.access.ConfigAccess; import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.server.web.service.convert.RetryDeadLetterResponseVOConverter; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -117,21 +115,21 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { Assert.notEmpty(retryDeadLetterList, () -> new SnailJobServerException("数据不存在")); - ConfigAccess sceneConfigAccess = accessTemplate.getSceneConfigAccess(); + ConfigAccess sceneConfigAccess = accessTemplate.getSceneConfigAccess(); Set sceneNameSet = retryDeadLetterList.stream().map(RetryDeadLetter::getSceneName) .collect(Collectors.toSet()); - List sceneConfigs = sceneConfigAccess.list(new LambdaQueryWrapper() - .eq(SceneConfig::getNamespaceId, namespaceId) - .in(SceneConfig::getSceneName, sceneNameSet)); + List retrySceneConfigs = sceneConfigAccess.list(new LambdaQueryWrapper() + .eq(RetrySceneConfig::getNamespaceId, namespaceId) + .in(RetrySceneConfig::getSceneName, sceneNameSet)); - Map sceneConfigMap = sceneConfigs.stream().collect(Collectors.toMap((sceneConfig) -> + Map sceneConfigMap = retrySceneConfigs.stream().collect(Collectors.toMap((sceneConfig) -> sceneConfig.getGroupName() + sceneConfig.getSceneName(), Function.identity())); List waitRollbackList = new ArrayList<>(); for (RetryDeadLetter retryDeadLetter : retryDeadLetterList) { - SceneConfig sceneConfig = sceneConfigMap.get( + RetrySceneConfig retrySceneConfig = sceneConfigMap.get( retryDeadLetter.getGroupName() + retryDeadLetter.getSceneName()); - Assert.notNull(sceneConfig, + Assert.notNull(retrySceneConfig, () -> new SnailJobServerException("未查询到场景. [{}]", retryDeadLetter.getSceneName())); RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter); @@ -140,9 +138,9 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); - waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); + waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); waitStrategyContext.setDelayLevel(1); - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext))); retryTask.setCreateDt(LocalDateTime.now()); waitRollbackList.add(retryTask); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index 15d7c396..708db4ef 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -45,18 +45,9 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; -import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; -import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; -import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext; -import com.aizuda.snailjob.server.retry.task.generator.task.TaskGenerator; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutor; -import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; -import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter; -import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -175,13 +166,13 @@ public class RetryTaskServiceImpl implements RetryTaskService { // 若恢复重试则需要重新计算下次触发时间 if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) { - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess() + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() .getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), namespaceId); WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli()); - waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); + waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval()); waitStrategyContext.setDelayLevel(retryTask.getRetryCount() + 1); - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff()); retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext))); } @@ -238,12 +229,12 @@ public class RetryTaskServiceImpl implements RetryTaskService { Assert.notEmpty(serverNodes, () -> new SnailJobServerException("生成idempotentId失败: 不存在活跃的客户端节点")); - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess() + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess() .getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(), generateRetryIdempotentIdVO.getSceneName(), namespaceId); - RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(sceneConfig.getSceneName(), - sceneConfig.getGroupName(), sceneConfig.getNamespaceId(), sceneConfig.getRouteKey()); + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retrySceneConfig.getSceneName(), + retrySceneConfig.getGroupName(), retrySceneConfig.getNamespaceId(), retrySceneConfig.getRouteKey()); // 委托客户端生成idempotentId GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO(); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SceneConfigServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SceneConfigServiceImpl.java index 48bb030e..885da640 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SceneConfigServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SceneConfigServiceImpl.java @@ -17,9 +17,7 @@ import com.aizuda.snailjob.server.web.service.convert.SceneConfigResponseVOConve import com.aizuda.snailjob.server.web.util.UserSessionUtils; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.ConfigAccess; -import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig; -import com.aizuda.snailjob.server.web.service.convert.SceneConfigConverter; -import com.aizuda.snailjob.server.web.service.convert.SceneConfigResponseVOConverter; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -43,27 +41,27 @@ public class SceneConfigServiceImpl implements SceneConfigService { @Override public PageResult> getSceneConfigPageList(SceneConfigQueryVO queryVO) { - PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); + PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); UserSessionVO userSessionVO = UserSessionUtils.currentUserSession(); String namespaceId = userSessionVO.getNamespaceId(); - LambdaQueryWrapper sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<>(); - sceneConfigLambdaQueryWrapper.eq(SceneConfig::getNamespaceId, namespaceId); + LambdaQueryWrapper sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<>(); + sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getNamespaceId, namespaceId); if (userSessionVO.isUser()) { - sceneConfigLambdaQueryWrapper.in(SceneConfig::getGroupName, userSessionVO.getGroupNames()); + sceneConfigLambdaQueryWrapper.in(RetrySceneConfig::getGroupName, userSessionVO.getGroupNames()); } if (StrUtil.isNotBlank(queryVO.getGroupName())) { - sceneConfigLambdaQueryWrapper.eq(SceneConfig::getGroupName, queryVO.getGroupName().trim()); + sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getGroupName, queryVO.getGroupName().trim()); } if (StrUtil.isNotBlank(queryVO.getSceneName())) { - sceneConfigLambdaQueryWrapper.eq(SceneConfig::getSceneName, queryVO.getSceneName().trim()); + sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getSceneName, queryVO.getSceneName().trim()); } pageDTO = accessTemplate.getSceneConfigAccess() - .listPage(pageDTO, sceneConfigLambdaQueryWrapper.orderByDesc(SceneConfig::getCreateDt)); + .listPage(pageDTO, sceneConfigLambdaQueryWrapper.orderByDesc(RetrySceneConfig::getCreateDt)); return new PageResult<>(pageDTO, SceneConfigResponseVOConverter.INSTANCE.batchConvert(pageDTO.getRecords())); @@ -74,14 +72,14 @@ public class SceneConfigServiceImpl implements SceneConfigService { String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - List sceneConfigs = accessTemplate.getSceneConfigAccess() - .list(new LambdaQueryWrapper() - .select(SceneConfig::getSceneName, SceneConfig::getDescription, SceneConfig::getMaxRetryCount) - .eq(SceneConfig::getNamespaceId, namespaceId) - .eq(SceneConfig::getGroupName, groupName) - .orderByDesc(SceneConfig::getCreateDt)); + List retrySceneConfigs = accessTemplate.getSceneConfigAccess() + .list(new LambdaQueryWrapper() + .select(RetrySceneConfig::getSceneName, RetrySceneConfig::getDescription, RetrySceneConfig::getMaxRetryCount) + .eq(RetrySceneConfig::getNamespaceId, namespaceId) + .eq(RetrySceneConfig::getGroupName, groupName) + .orderByDesc(RetrySceneConfig::getCreateDt)); - return SceneConfigResponseVOConverter.INSTANCE.batchConvert(sceneConfigs); + return SceneConfigResponseVOConverter.INSTANCE.batchConvert(retrySceneConfigs); } @Override @@ -89,21 +87,21 @@ public class SceneConfigServiceImpl implements SceneConfigService { checkExecuteInterval(requestVO); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - ConfigAccess sceneConfigAccess = accessTemplate.getSceneConfigAccess(); + ConfigAccess sceneConfigAccess = accessTemplate.getSceneConfigAccess(); Assert.isTrue(0 == sceneConfigAccess.count( - new LambdaQueryWrapper() - .eq(SceneConfig::getNamespaceId, namespaceId) - .eq(SceneConfig::getGroupName, requestVO.getGroupName()) - .eq(SceneConfig::getSceneName, requestVO.getSceneName()) + new LambdaQueryWrapper() + .eq(RetrySceneConfig::getNamespaceId, namespaceId) + .eq(RetrySceneConfig::getGroupName, requestVO.getGroupName()) + .eq(RetrySceneConfig::getSceneName, requestVO.getSceneName()) ), () -> new SnailJobServerException("场景名称重复. {}", requestVO.getSceneName())); - SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO); - sceneConfig.setCreateDt(LocalDateTime.now()); - sceneConfig.setNamespaceId(namespaceId); - Assert.isTrue(1 == sceneConfigAccess.insert(sceneConfig), - () -> new SnailJobServerException("failed to insert scene. sceneConfig:[{}]", - JsonUtil.toJsonString(sceneConfig))); + RetrySceneConfig retrySceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO); + retrySceneConfig.setCreateDt(LocalDateTime.now()); + retrySceneConfig.setNamespaceId(namespaceId); + Assert.isTrue(1 == sceneConfigAccess.insert(retrySceneConfig), + () -> new SnailJobServerException("failed to insert scene. retrySceneConfig:[{}]", + JsonUtil.toJsonString(retrySceneConfig))); return Boolean.TRUE; } @@ -125,29 +123,29 @@ public class SceneConfigServiceImpl implements SceneConfigService { @Override public Boolean updateSceneConfig(SceneConfigRequestVO requestVO) { checkExecuteInterval(requestVO); - SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO); + RetrySceneConfig retrySceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO); // 防止更新 - sceneConfig.setSceneName(null); - sceneConfig.setGroupName(null); - sceneConfig.setNamespaceId(null); + retrySceneConfig.setSceneName(null); + retrySceneConfig.setGroupName(null); + retrySceneConfig.setNamespaceId(null); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - sceneConfig.setTriggerInterval(Optional.ofNullable(sceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY)); - Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(sceneConfig, - new LambdaUpdateWrapper() - .eq(SceneConfig::getNamespaceId, namespaceId) - .eq(SceneConfig::getGroupName, requestVO.getGroupName()) - .eq(SceneConfig::getSceneName, requestVO.getSceneName())), - () -> new SnailJobServerException("failed to update scene. sceneConfig:[{}]", - JsonUtil.toJsonString(sceneConfig))); + retrySceneConfig.setTriggerInterval(Optional.ofNullable(retrySceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY)); + Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(retrySceneConfig, + new LambdaUpdateWrapper() + .eq(RetrySceneConfig::getNamespaceId, namespaceId) + .eq(RetrySceneConfig::getGroupName, requestVO.getGroupName()) + .eq(RetrySceneConfig::getSceneName, requestVO.getSceneName())), + () -> new SnailJobServerException("failed to update scene. retrySceneConfig:[{}]", + JsonUtil.toJsonString(retrySceneConfig))); return Boolean.TRUE; } @Override public SceneConfigResponseVO getSceneConfigDetail(Long id) { - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().one(new LambdaQueryWrapper() - .eq(SceneConfig::getId, id)); - return SceneConfigResponseVOConverter.INSTANCE.convert(sceneConfig); + RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().one(new LambdaQueryWrapper() + .eq(RetrySceneConfig::getId, id)); + return SceneConfigResponseVOConverter.INSTANCE.convert(retrySceneConfig); } }