feat(sj_1.0.0): 优化告警通知和新增认证异常码

1. 通知配置表合并
2. 新增认证异常码5001
3. 告警模块优化
This commit is contained in:
opensnail 2024-04-16 18:32:33 +08:00
parent 850d2cab36
commit 9b403daea5
45 changed files with 418 additions and 478 deletions

View File

@ -48,21 +48,22 @@ CREATE TABLE `sj_group_config`
CREATE TABLE `sj_notify_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
`notify_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知状态 0、未启用 1、启用',
`notify_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信',
`notify_attribute` varchar(512) NOT NULL COMMENT '配置属性',
`notify_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '通知阈值',
`notify_scene` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知场景',
`rate_limiter_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '限流状态 0、未启用 1、启用',
`rate_limiter_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '每秒限流阈值',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`business_id` varchar(64) NOT NULL COMMENT '业务id (job_id或workflow_id或scene_name)',
`system_task_type` tinyint(4) NOT NULL DEFAULT 3 COMMENT '任务类型 1. 重试任务 2. 重试回调 3、JOB任务 4、WORKFLOW任务',
`notify_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知状态 0、未启用 1、启用',
`notify_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信',
`notify_attribute` varchar(512) NOT NULL COMMENT '配置属性',
`notify_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '通知阈值',
`notify_scene` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知场景',
`rate_limiter_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '限流状态 0、未启用 1、启用',
`rate_limiter_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '每秒限流阈值',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`)
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `business_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='通知配置'
@ -170,7 +171,7 @@ CREATE TABLE `sj_retry_task_log_message`
DEFAULT CHARSET = utf8mb4 COMMENT ='任务调度日志信息记录表'
;
CREATE TABLE `sj_scene_config`
CREATE TABLE `sj_retry_scene_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
@ -384,28 +385,6 @@ CREATE TABLE `sj_job_task_batch`
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次';
CREATE TABLE `sj_job_notify_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`notify_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知状态 0、未启用 1、启用',
`notify_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信',
`notify_attribute` varchar(512) NOT NULL COMMENT '配置属性',
`notify_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '通知阈值',
`notify_scene` tinyint(4) NOT NULL DEFAULT 0 COMMENT '通知场景',
`rate_limiter_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '限流状态 0、未启用 1、启用',
`rate_limiter_threshold` int(11) NOT NULL DEFAULT 0 COMMENT '每秒限流阈值',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_job_id` (`namespace_id`, `group_name`, job_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 4
DEFAULT CHARSET = utf8mb4 COMMENT ='job通知配置';
CREATE TABLE `sj_job_summary`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',

View File

@ -0,0 +1,33 @@
package com.aizuda.snailjob.common.core.exception;
import lombok.Getter;
/**
* @author: opensnail
* @date : 2024-04-16
* @since : 1.0.0
*/
@Getter
public class SnailJobAuthenticationException extends BaseSnailJobException {
private final Integer errorCode = 5001;
public SnailJobAuthenticationException(final String message) {
super(message);
}
public SnailJobAuthenticationException(String message, Object... arguments) {
super(message, arguments);
}
public SnailJobAuthenticationException(String message, Object[] arguments, Throwable cause) {
super(message, arguments, cause);
}
public SnailJobAuthenticationException(String message, Object argument, Throwable cause) {
super(message, argument, cause);
}
public SnailJobAuthenticationException(String message, Object argument) {
super(message, argument);
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.common.core.handler;
import com.aizuda.snailjob.common.core.exception.AbstractError;
import com.aizuda.snailjob.common.core.exception.BaseSnailJobException;
import com.aizuda.snailjob.common.core.exception.SnailJobAuthenticationException;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.model.Result;
import jakarta.validation.ConstraintViolation;
@ -56,12 +57,27 @@ public class RestExceptionHandler {
* @param ex
* @return
*/
@ExceptionHandler({BaseSnailJobException.class})
@ExceptionHandler({BaseSnailJobException.class, SnailJobAuthenticationException.class})
public Result onBusinessException(BaseSnailJobException ex) {
log.error("异常类 businessException", ex);
if (ex instanceof final SnailJobAuthenticationException authenticationException) {
return new Result<String>(authenticationException.getErrorCode(), ex.getMessage());
}
return new Result<String>(0, ex.getMessage());
}
/**
* 业务异常
*
* @param ex
* @return
*/
@ExceptionHandler({SnailJobAuthenticationException.class})
public Result onBusinessException(SnailJobAuthenticationException ex) {
return new Result<String>(ex.getErrorCode(), ex.getMessage());
}
/**
* 400错误
*

View File

@ -11,15 +11,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.access.config.GroupConfigAccess;
import com.aizuda.snailjob.template.datasource.access.config.NotifyConfigAccess;
import com.aizuda.snailjob.template.datasource.access.config.SceneConfigAccess;
import com.aizuda.snailjob.template.datasource.access.task.RetryDeadLetterTaskAccess;
import com.aizuda.snailjob.template.datasource.access.task.RetryTaskAccess;
import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
import com.aizuda.snailjob.template.datasource.exception.SnailJobDatasourceException;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@ -76,8 +68,8 @@ public class AccessTemplate {
*
* @return {@link SceneConfigAccess} 获取场景配置操作类
*/
public ConfigAccess<SceneConfig> getSceneConfigAccess() {
return (ConfigAccess<SceneConfig>) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.SCENE.name()))
public ConfigAccess<RetrySceneConfig> getSceneConfigAccess() {
return (ConfigAccess<RetrySceneConfig>) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.SCENE.name()))
.orElseThrow(() -> new SnailJobDatasourceException("not supports operation type"));
}

View File

@ -4,12 +4,7 @@ import com.aizuda.snailjob.common.core.enums.NotifySceneEnum;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.common.core.enums.NotifySceneEnum;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -45,9 +40,9 @@ public interface ConfigAccess<T> extends Access<T> {
* @param groupName 组名称
* @param sceneName 场景名称
* @param namespaceId 命名空间
* @return {@link SceneConfig} 场景配置
* @return {@link RetrySceneConfig} 场景配置
*/
SceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId);
RetrySceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId);
/**
* 获取通知配置
@ -83,9 +78,9 @@ public interface ConfigAccess<T> extends Access<T> {
* 获取场景配置
*
* @param groupName 组名称
* @return {@link SceneConfig} 场景配置
* @return {@link RetrySceneConfig} 场景配置
*/
List<SceneConfig> getSceneConfigByGroupName(String groupName);
List<RetrySceneConfig> getSceneConfigByGroupName(String groupName);
/**
* 获取已开启的组配置信息
@ -113,7 +108,7 @@ public interface ConfigAccess<T> extends Access<T> {
*
* @return 场景配置列表
*/
List<SceneConfig> getAllConfigSceneList();
List<RetrySceneConfig> getAllConfigSceneList();
/**
* 获取配置版本号

View File

@ -11,18 +11,8 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyConfigMa
import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.enums.NotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
@ -68,20 +58,20 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
Integer notifyScene) {
return notifyConfigMapper.selectList(
new LambdaQueryWrapper<NotifyConfig>().eq(NotifyConfig::getGroupName, groupName)
.eq(NotifyConfig::getSceneName, sceneName)
.eq(NotifyConfig::getBusinessId, sceneName)
.eq(NotifyConfig::getNotifyScene, notifyScene));
}
protected SceneConfig getByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) {
return sceneConfigMapper.selectOne(new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, groupName)
.eq(SceneConfig::getSceneName, sceneName));
protected RetrySceneConfig getByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) {
return sceneConfigMapper.selectOne(new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
.eq(RetrySceneConfig::getGroupName, groupName)
.eq(RetrySceneConfig::getSceneName, sceneName));
}
protected List<SceneConfig> getSceneConfigs(String groupName) {
return sceneConfigMapper.selectList(new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getGroupName, groupName));
protected List<RetrySceneConfig> getSceneConfigs(String groupName) {
return sceneConfigMapper.selectList(new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getGroupName, groupName));
}
protected GroupConfig getByGroupName(String groupName, final String namespaceId) {
@ -109,7 +99,7 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
}
@Override
public SceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) {
public RetrySceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) {
return getByGroupNameAndSceneName(groupName, sceneName, namespaceId);
}
@ -130,7 +120,7 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
}
@Override
public List<SceneConfig> getSceneConfigByGroupName(String groupName) {
public List<RetrySceneConfig> getSceneConfigByGroupName(String groupName) {
return getSceneConfigs(groupName);
}
@ -148,20 +138,20 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
return Collections.EMPTY_SET;
}
LambdaQueryWrapper<SceneConfig> sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<SceneConfig>()
.select(SceneConfig::getSceneName)
.eq(SceneConfig::getGroupName, groupName);
LambdaQueryWrapper<RetrySceneConfig> sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getSceneName)
.eq(RetrySceneConfig::getGroupName, groupName);
if (StatusEnum.YES.getStatus().equals(groupConfig.getGroupStatus())) {
sceneConfigLambdaQueryWrapper.eq(SceneConfig::getSceneStatus, StatusEnum.NO.getStatus());
sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getSceneStatus, StatusEnum.NO.getStatus());
}
List<SceneConfig> sceneConfigs = sceneConfigMapper.selectList(sceneConfigLambdaQueryWrapper);
if (CollectionUtils.isEmpty(sceneConfigs)) {
List<RetrySceneConfig> retrySceneConfigs = sceneConfigMapper.selectList(sceneConfigLambdaQueryWrapper);
if (CollectionUtils.isEmpty(retrySceneConfigs)) {
return Collections.EMPTY_SET;
}
return sceneConfigs.stream().map(SceneConfig::getSceneName).collect(Collectors.toSet());
return retrySceneConfigs.stream().map(RetrySceneConfig::getSceneName).collect(Collectors.toSet());
}
@Override
@ -178,9 +168,9 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
}
@Override
public List<SceneConfig> getAllConfigSceneList() {
List<SceneConfig> allSystemConfigSceneList = sceneConfigMapper.selectList(
new LambdaQueryWrapper<SceneConfig>().orderByAsc(SceneConfig::getId));
public List<RetrySceneConfig> getAllConfigSceneList() {
List<RetrySceneConfig> allSystemConfigSceneList = sceneConfigMapper.selectList(
new LambdaQueryWrapper<RetrySceneConfig>().orderByAsc(RetrySceneConfig::getId));
if (CollectionUtils.isEmpty(allSystemConfigSceneList)) {
return Collections.EMPTY_LIST;
}
@ -226,10 +216,10 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
configDTO.setNotifyList(notifies);
List<SceneConfig> sceneConfig = getSceneConfigByGroupName(groupName);
List<RetrySceneConfig> retrySceneConfig = getSceneConfigByGroupName(groupName);
List<ConfigDTO.Scene> sceneList = new ArrayList<>();
for (SceneConfig config : sceneConfig) {
for (RetrySceneConfig config : retrySceneConfig) {
ConfigDTO.Scene scene = new ConfigDTO.Scene();
scene.setSceneName(config.getSceneName());
scene.setDdl(config.getDeadlineRequest());

View File

@ -2,8 +2,7 @@ package com.aizuda.snailjob.template.datasource.access.config;
import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -17,7 +16,7 @@ import java.util.List;
* @since 2.2.0
*/
@Component
public class SceneConfigAccess extends AbstractConfigAccess<SceneConfig> {
public class SceneConfigAccess extends AbstractConfigAccess<RetrySceneConfig> {
@Override
public boolean supports(String operationType) {
@ -27,42 +26,42 @@ public class SceneConfigAccess extends AbstractConfigAccess<SceneConfig> {
}
@Override
public List<SceneConfig> list(LambdaQueryWrapper<SceneConfig> query) {
public List<RetrySceneConfig> list(LambdaQueryWrapper<RetrySceneConfig> query) {
return sceneConfigMapper.selectList(query);
}
@Override
public int update(SceneConfig sceneConfig, LambdaUpdateWrapper<SceneConfig> query) {
return sceneConfigMapper.update(sceneConfig, query);
public int update(RetrySceneConfig retrySceneConfig, LambdaUpdateWrapper<RetrySceneConfig> query) {
return sceneConfigMapper.update(retrySceneConfig, query);
}
@Override
public int updateById(SceneConfig sceneConfig) {
return sceneConfigMapper.updateById(sceneConfig);
public int updateById(RetrySceneConfig retrySceneConfig) {
return sceneConfigMapper.updateById(retrySceneConfig);
}
@Override
public int delete(LambdaQueryWrapper<SceneConfig> query) {
public int delete(LambdaQueryWrapper<RetrySceneConfig> query) {
return sceneConfigMapper.delete(query);
}
@Override
public int insert(SceneConfig sceneConfig) {
return sceneConfigMapper.insert(sceneConfig);
public int insert(RetrySceneConfig retrySceneConfig) {
return sceneConfigMapper.insert(retrySceneConfig);
}
@Override
public SceneConfig one(LambdaQueryWrapper<SceneConfig> query) {
public RetrySceneConfig one(LambdaQueryWrapper<RetrySceneConfig> query) {
return sceneConfigMapper.selectOne(query);
}
@Override
public PageDTO<SceneConfig> listPage(PageDTO<SceneConfig> iPage, LambdaQueryWrapper<SceneConfig> query) {
public PageDTO<RetrySceneConfig> listPage(PageDTO<RetrySceneConfig> iPage, LambdaQueryWrapper<RetrySceneConfig> query) {
return sceneConfigMapper.selectPage(iPage, query);
}
@Override
public long count(LambdaQueryWrapper<SceneConfig> query) {
public long count(LambdaQueryWrapper<RetrySceneConfig> query) {
return sceneConfigMapper.selectCount(query);
}
}

View File

@ -3,11 +3,7 @@ package com.aizuda.snailjob.template.datasource.persistence.mapper;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardCardResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardLineResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySummary;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardRetryLineResponseDO.Rank;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardRetryLineResponseDO.Task;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardCardResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardLineResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.DashboardRetryLineResponseDO;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@ -34,11 +30,11 @@ public interface RetrySummaryMapper extends BaseMapper<RetrySummary> {
List<DashboardCardResponseDO.RetryTask> retryTaskBarList(@Param("ew") Wrapper<RetrySummary> wrapper);
IPage<DashboardRetryLineResponseDO.Task> retryTaskList(@Param("ew") Wrapper<SceneConfig> wrapper, Page<Object> page);
IPage<DashboardRetryLineResponseDO.Task> retryTaskList(@Param("ew") Wrapper<RetrySceneConfig> wrapper, Page<Object> page);
List<DashboardLineResponseDO> retryLineList(@Param("dateFormat") String dateFormat, @Param("ew") Wrapper<RetrySummary> wrapper);
List<DashboardRetryLineResponseDO.Rank> dashboardRank(@Param("ew") Wrapper<RetrySummary> wrapper);
long countRetryTask(@Param("ew") Wrapper<SceneConfig> wrapper);
long countRetryTask(@Param("ew") Wrapper<RetrySceneConfig> wrapper);
}

View File

@ -1,8 +1,8 @@
package com.aizuda.snailjob.template.datasource.persistence.mapper;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface SceneConfigMapper extends BaseMapper<SceneConfig> {
public interface SceneConfigMapper extends BaseMapper<RetrySceneConfig> {
}

View File

@ -21,7 +21,15 @@ public class NotifyConfig implements Serializable {
private String groupName;
private String sceneName;
/**
* 业务id (scene_name或job_id或workflow_id)
*/
private String businessId;
/**
* 任务类型 1重试任务 2回调任务 3JOB任务 4WORKFLOW任务
*/
private Integer systemTaskType;
private Integer notifyStatus;

View File

@ -16,8 +16,8 @@ import java.time.LocalDateTime;
* @since 2.5.0
*/
@Data
@TableName("sj_scene_config")
public class SceneConfig implements Serializable {
@TableName("sj_retry_scene_config")
public class RetrySceneConfig implements Serializable {
private static final long serialVersionUID = 1L;

View File

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper">
<resultMap id="BaseResultMap" type="com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig">
<resultMap id="BaseResultMap" type="com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig">
<id column="id" jdbcType="BIGINT" property="id" />
<result column="scene_name" jdbcType="VARCHAR" property="sceneName" />
<result column="group_name" jdbcType="VARCHAR" property="groupName" />

View File

@ -111,7 +111,7 @@
SELECT group_name AS groupName,
SUM(CASE WHEN (scene_status = 1) THEN 1 ELSE 0 END) AS run,
COUNT(*) AS total
FROM sj_scene_config
FROM sj_retry_scene_config
${ew.customSqlSegment}
GROUP BY namespace_id, group_name
</select>

View File

@ -5,13 +5,20 @@ import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.cache.CacheNotifyRateLimiter;
import com.aizuda.snailjob.server.common.dto.AlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -19,8 +26,10 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.util.CollectionUtils;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
@ -28,7 +37,7 @@ import java.util.concurrent.TimeUnit;
* @since 2.5.0
*/
@Slf4j
public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmInfo, T> extends AbstractFlowControl<E> implements ApplicationListener<E>, Runnable,
public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmInfo> implements ApplicationListener<E>, Runnable,
Lifecycle {
@Autowired
@ -53,14 +62,14 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
// 获取所有的组名称
Set<String> groupNames = new HashSet<>();
// 获取所有的场景名称
Set<T> sceneNames = new HashSet<>();
Set<String> sceneNames = new HashSet<>();
// 转换AlarmDTO 为了下面循环发送使用
Map<Triple<String, String, T>, List<A>> waitSendAlarmInfos = convertAlarmDTO(
Map<Triple<String, String, String>, List<A>> waitSendAlarmInfos = convertAlarmDTO(
alarmInfos, namespaceIds, groupNames, sceneNames);
// 批量获取通知配置
Map<Triple<String, String, T>, List<NotifyConfigInfo>> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames);
Map<Triple<String, String, String>, List<NotifyConfigInfo>> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames);
// 循环发送消息
waitSendAlarmInfos.forEach((key, list) -> {
@ -78,12 +87,39 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
}
}
protected abstract Map<Triple<String, String, T>, List<A>> convertAlarmDTO(List<A> alarmData, Set<String> namespaceIds, Set<String> groupNames, Set<T> sceneNames);
protected Map<Triple<String, String, String>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds, Set<String> groupNames, Set<String> businessIds) {
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.in(NotifyConfig::getSystemTaskType, getSystemTaskType())
.eq(NotifyConfig::getNotifyScene, getNotifyScene())
.in(NotifyConfig::getNamespaceId, namespaceIds)
.in(NotifyConfig::getGroupName, groupNames)
.in(NotifyConfig::getBusinessId, businessIds)
);
if (CollectionUtils.isEmpty(notifyConfigs)) {
return Maps.newHashMap();
}
List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs);
return notifyConfigInfos.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
return ImmutableTriple.of(namespaceId, groupName, i.getBusinessId());
}));
}
protected abstract List<SyetemTaskTypeEnum> getSystemTaskType();
protected abstract Map<Triple<String, String, String>, List<A>> convertAlarmDTO(List<A> alarmData, Set<String> namespaceIds, Set<String> groupNames, Set<String> sceneNames);
protected abstract Map<Triple<String/*命名空间*/, String/*组名称*/, T/*场景名称ORJobId*/>,
List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds,
Set<String> groupNames,
Set<T> sceneNames);
protected abstract List<A> poll() throws InterruptedException;
@ -111,11 +147,10 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
for (final NotifyConfigInfo notifyConfig : notifyConfigsList) {
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
// 限流
RateLimiter rateLimiter = getRateLimiter(rateLimiterKey(notifyConfig),
RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()),
notifyConfig.getRateLimiterThreshold());
// 每秒发送rateLimiterThreshold个告警
if (Objects.nonNull(rateLimiter) && !RateLimiter.create(notifyConfig.getRateLimiterThreshold())
.tryAcquire(1, TimeUnit.SECONDS)) {
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
continue;
}
}
@ -133,7 +168,14 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
}
}
protected abstract String rateLimiterKey(NotifyConfigInfo notifyConfig);
protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) {
RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key);
if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) {
CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold));
}
return rateLimiter;
}
protected abstract int getNotifyScene();
}

View File

@ -1,28 +0,0 @@
package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.server.common.cache.CacheNotifyRateLimiter;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import java.util.Objects;
/**
* @author: zuoJunLin
* @date : 2023-11-21 13:04
* @since 2.5.0
*/
@Slf4j
public abstract class AbstractFlowControl<E extends ApplicationEvent> implements ApplicationListener<E> {
protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) {
RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key);
if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) {
CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold));
}
return rateLimiter;
}
}

View File

@ -1,21 +1,11 @@
package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SystemModeEnum;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobNotifyConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.CollectionUtils;
import java.text.MessageFormat;
import java.util.List;
@ -28,57 +18,19 @@ import java.util.stream.Collectors;
* @date 2023-12-03 10:19:19
* @since 2.5.0
*/
public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, JobAlarmInfo, Long> {
@Autowired
private JobNotifyConfigMapper jobNotifyConfigMapper;
public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, JobAlarmInfo> {
@Override
protected Map<Triple<String, String, Long>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> alarmInfos, Set<String> namespaceIds, Set<String> groupNames, Set<Long> jobIds) {
protected Map<Triple<String, String, String>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> alarmInfos, Set<String> namespaceIds, Set<String> groupNames, Set<String> jobIds) {
return alarmInfos.stream().collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
Long jobId = i.getJobId();
String jobId = String.valueOf(i.getJobId());
namespaceIds.add(namespaceId);
groupNames.add(groupName);
jobIds.add(jobId);
return ImmutableTriple.of(namespaceId, groupName, jobId);
}));
}
@Override
protected Map<Triple<String, String, Long>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds, Set<String> groupNames, Set<Long> jobIds) {
// 批量获取所需的通知配置
List<JobNotifyConfig> jobNotifyConfigs = jobNotifyConfigMapper.selectList(
new LambdaQueryWrapper<JobNotifyConfig>()
.eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(JobNotifyConfig::getNotifyScene, getNotifyScene())
.in(JobNotifyConfig::getNamespaceId, namespaceIds)
.in(JobNotifyConfig::getGroupName, groupNames)
.in(JobNotifyConfig::getJobId, jobIds)
);
if (CollectionUtils.isEmpty(jobNotifyConfigs)) {
return Maps.newHashMap();
}
List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.jobToNotifyConfigInfos(jobNotifyConfigs);
return notifyConfigInfos.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
Long jobId = i.getJobId();
return ImmutableTriple.of(namespaceId, groupName, jobId);
}));
}
@Override
protected String rateLimiterKey(NotifyConfigInfo notifyConfig) {
return MessageFormat.format("{}_{}", SystemModeEnum.JOB.name(), notifyConfig.getId());
}
}

View File

@ -1,19 +1,11 @@
package com.aizuda.snailjob.server.common.alarm;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SystemModeEnum;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Maps;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.CollectionUtils;
import java.text.MessageFormat;
import java.util.List;
@ -26,7 +18,7 @@ import java.util.stream.Collectors;
* @date 2023-12-03 10:19:19
* @since 2.5.0
*/
public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, RetryAlarmInfo, String> {
public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, RetryAlarmInfo> {
@Override
protected Map<Triple<String, String, String>, List<RetryAlarmInfo>> convertAlarmDTO(
List<RetryAlarmInfo> alarmDataList,
@ -49,39 +41,4 @@ public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends Abs
}));
}
@Override
protected Map<Triple<String, String, String>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds, Set<String> groupNames, Set<String> sceneNames) {
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, getNotifyScene())
.in(NotifyConfig::getNamespaceId, namespaceIds)
.in(NotifyConfig::getGroupName, groupNames)
.in(NotifyConfig::getSceneName, sceneNames)
);
if (CollectionUtils.isEmpty(notifyConfigs)) {
return Maps.newHashMap();
}
List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs);
return notifyConfigInfos.stream()
.collect(Collectors.groupingBy(config -> {
String namespaceId = config.getNamespaceId();
String groupName = config.getGroupName();
String sceneName = config.getSceneName();
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
}
@Override
protected String rateLimiterKey(NotifyConfigInfo notifyConfig) {
return MessageFormat.format("{}_{}", SystemModeEnum.RETRY.name(), notifyConfig.getId());
}
}

View File

@ -16,11 +16,11 @@ public class NotifyConfigInfo {
private String groupName;
// job告警时使用
private Long jobId;
// 业务id (scene_name或job_id或workflow_id)
private String businessId;
// retry告警时使用
private String sceneName;
// 任务类型 1重试任务 2回调任务 3JOB任务 4WORKFLOW任务
private Integer systemTaskType;
private Integer notifyStatus;

View File

@ -8,6 +8,8 @@ import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.alarm.AbstractJobAlarm;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
@ -15,11 +17,14 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMa
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
@ -31,16 +36,14 @@ import java.util.concurrent.LinkedBlockingQueue;
* @since 2.5.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmEvent> {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
/**
* job任务失败数据
*/
private LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(1000);
private final LinkedBlockingQueue<Long> queue = new LinkedBlockingQueue<>(1000);
private static String jobTaskFailTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 Job任务执行失败</font> \n" +
@ -90,6 +93,10 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
return JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene();
}
@Override
protected List<SyetemTaskTypeEnum> getSystemTaskType() {
return Lists.newArrayList(SyetemTaskTypeEnum.JOB);
}
@Override
public void onApplicationEvent(JobTaskFailAlarmEvent event) {

View File

@ -12,7 +12,6 @@ import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.generator.id.IdGenerator;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext.TaskInfo;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.snailjob.server.common.WaitStrategy;
@ -23,9 +22,9 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -56,7 +55,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
public void taskGenerator(TaskContext taskContext) {
SnailJobLog.LOCAL.debug("received report data. {}", JsonUtil.toJsonString(taskContext));
SceneConfig sceneConfig = checkAndInitScene(taskContext);
RetrySceneConfig retrySceneConfig = checkAndInitScene(taskContext);
//客户端上报任务根据幂等id去重
List<TaskContext.TaskInfo> taskInfos = taskContext.getTaskInfos().stream().collect(Collectors.collectingAndThen(
@ -86,7 +85,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
LocalDateTime now = LocalDateTime.now();
for (TaskContext.TaskInfo taskInfo : taskInfos) {
Pair<List<RetryTask>, List<RetryTaskLog>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo,
sceneConfig);
retrySceneConfig);
waitInsertTasks.addAll(pair.getKey());
waitInsertTaskLogs.addAll(pair.getValue());
}
@ -107,11 +106,11 @@ public abstract class AbstractGenerator implements TaskGenerator {
* @param retryTaskMap
* @param now
* @param taskInfo
* @param sceneConfig
* @param retrySceneConfig
*/
private Pair<List<RetryTask>, List<RetryTaskLog>> doConvertTask(Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap,
TaskContext taskContext, LocalDateTime now,
TaskContext.TaskInfo taskInfo, SceneConfig sceneConfig) {
TaskContext.TaskInfo taskInfo, RetrySceneConfig retrySceneConfig) {
List<RetryTask> waitInsertTasks = new ArrayList<>();
List<RetryTaskLog> waitInsertTaskLogs = new ArrayList<>();
@ -144,9 +143,9 @@ public abstract class AbstractGenerator implements TaskGenerator {
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(now);
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
waitInsertTasks.add(retryTask);
@ -161,11 +160,11 @@ public abstract class AbstractGenerator implements TaskGenerator {
protected abstract Integer initStatus(TaskContext taskContext);
private SceneConfig checkAndInitScene(TaskContext taskContext) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
private RetrySceneConfig checkAndInitScene(TaskContext taskContext) {
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(taskContext.getGroupName(), taskContext.getSceneName(),
taskContext.getNamespaceId());
if (Objects.isNull(sceneConfig)) {
if (Objects.isNull(retrySceneConfig)) {
GroupConfig groupConfig = accessTemplate.getGroupConfigAccess()
.getGroupConfigByGroupName(taskContext.getGroupName(), taskContext.getNamespaceId());
@ -180,11 +179,11 @@ public abstract class AbstractGenerator implements TaskGenerator {
taskContext.getGroupName(), taskContext.getSceneName());
} else {
// 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景
sceneConfig = initScene(taskContext.getGroupName(), taskContext.getSceneName(), taskContext.getNamespaceId());
retrySceneConfig = initScene(taskContext.getGroupName(), taskContext.getSceneName(), taskContext.getNamespaceId());
}
}
return sceneConfig;
return retrySceneConfig;
}
@ -195,18 +194,18 @@ public abstract class AbstractGenerator implements TaskGenerator {
* @param groupName 组名称
* @param sceneName 场景名称
*/
private SceneConfig initScene(String groupName, String sceneName, String namespaceId) {
SceneConfig sceneConfig = new SceneConfig();
sceneConfig.setNamespaceId(namespaceId);
sceneConfig.setGroupName(groupName);
sceneConfig.setSceneName(sceneName);
sceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType());
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig),
private RetrySceneConfig initScene(String groupName, String sceneName, String namespaceId) {
RetrySceneConfig retrySceneConfig = new RetrySceneConfig();
retrySceneConfig.setNamespaceId(namespaceId);
retrySceneConfig.setGroupName(groupName);
retrySceneConfig.setSceneName(sceneName);
retrySceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
retrySceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType());
retrySceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
retrySceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(retrySceneConfig),
() -> new SnailJobServerException("init scene error"));
return sceneConfig;
return retrySceneConfig;
}
/**

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.retry.task.support;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import java.util.Set;
@ -81,5 +81,5 @@ public interface RetryContext<V> {
*
* @return 路由策略
*/
SceneConfig sceneConfig();
RetrySceneConfig sceneConfig();
}

View File

@ -4,7 +4,7 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import lombok.Data;
import java.util.Objects;
@ -51,7 +51,7 @@ public class CallbackRetryContext<V> implements RetryContext<V> {
/**
* 场景配置
*/
private SceneConfig sceneConfig;
private RetrySceneConfig retrySceneConfig;
@Override
public boolean hasException() {
@ -59,8 +59,8 @@ public class CallbackRetryContext<V> implements RetryContext<V> {
}
@Override
public SceneConfig sceneConfig() {
return sceneConfig;
public RetrySceneConfig sceneConfig() {
return retrySceneConfig;
}
}

View File

@ -4,7 +4,7 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import lombok.Data;
import lombok.Getter;
@ -54,7 +54,7 @@ public class MaxAttemptsPersistenceRetryContext<V> implements RetryContext<V> {
/**
* 场景配置
*/
private SceneConfig sceneConfig;
private RetrySceneConfig retrySceneConfig;
@Override
public void setCallResult(V v) {
@ -72,8 +72,8 @@ public class MaxAttemptsPersistenceRetryContext<V> implements RetryContext<V> {
}
@Override
public SceneConfig sceneConfig() {
return sceneConfig;
public RetrySceneConfig sceneConfig() {
return retrySceneConfig;
}
}

View File

@ -19,7 +19,7 @@ import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -54,7 +54,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
CallbackRetryContext context = (CallbackRetryContext) retryExecutor.getRetryContext();
RetryTask retryTask = context.getRetryTask();
RegisterNodeInfo serverNode = context.getServerNode();
SceneConfig sceneConfig = context.getSceneConfig();
RetrySceneConfig retrySceneConfig = context.getRetrySceneConfig();
// RetryTaskLogDTO retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask);
// retryTaskLog.setTriggerTime(LocalDateTime.now());
@ -64,7 +64,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
if (Objects.nonNull(serverNode)) {
retryExecutor.call((Callable<Result<Void>>) () -> {
Result<Void> result = callClient(retryTask, serverNode, sceneConfig);
Result<Void> result = callClient(retryTask, serverNode, retrySceneConfig);
return result;
});
}
@ -87,7 +87,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
* @param callbackTask {@link RetryTask} 回调任务
* @return 重试结果返回值
*/
private Result callClient(RetryTask callbackTask, RegisterNodeInfo serverNode, SceneConfig sceneConfig) {
private Result callClient(RetryTask callbackTask, RegisterNodeInfo serverNode, RetrySceneConfig retrySceneConfig) {
String retryTaskUniqueId = callbackRetryTaskHandler.getRetryTaskUniqueId(callbackTask.getUniqueId());
@ -116,9 +116,9 @@ public class ExecCallbackUnitActor extends AbstractActor {
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(serverNode)
.failover(Boolean.TRUE)
.routeKey(sceneConfig.getRouteKey())
.allocKey(sceneConfig.getSceneName())
.executorTimeout(sceneConfig.getExecutorTimeout())
.routeKey(retrySceneConfig.getRouteKey())
.allocKey(retrySceneConfig.getSceneName())
.executorTimeout(retrySceneConfig.getExecutorTimeout())
.client(RetryRpcClient.class)
.build();
return rpcClient.callback(retryCallbackDTO);

View File

@ -18,7 +18,7 @@ import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
@ -46,7 +46,7 @@ public class ExecUnitActor extends AbstractActor {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext();
RetryTask retryTask = context.getRetryTask();
RegisterNodeInfo serverNode = context.getServerNode();
SceneConfig sceneConfig = context.getSceneConfig();
RetrySceneConfig retrySceneConfig = context.getRetrySceneConfig();
try {
@ -54,7 +54,7 @@ public class ExecUnitActor extends AbstractActor {
retryExecutor.call((Callable<Result<DispatchRetryResultDTO>>) () -> {
Result<DispatchRetryResultDTO> result = callClient(retryTask, serverNode, sceneConfig);
Result<DispatchRetryResultDTO> result = callClient(retryTask, serverNode, retrySceneConfig);
// 回调接口请求成功处理返回值
if (StatusEnum.YES.getStatus() != result.getStatus()) {
} else {
@ -85,7 +85,7 @@ public class ExecUnitActor extends AbstractActor {
* @param retryTask {@link RetryTask} 需要重试的数据
* @return 重试结果返回值
*/
private Result<DispatchRetryResultDTO> callClient(RetryTask retryTask, RegisterNodeInfo serverNode, SceneConfig sceneConfig) {
private Result<DispatchRetryResultDTO> callClient(RetryTask retryTask, RegisterNodeInfo serverNode, RetrySceneConfig retrySceneConfig) {
DispatchRetryDTO dispatchRetryDTO = new DispatchRetryDTO();
dispatchRetryDTO.setIdempotentId(retryTask.getIdempotentId());
@ -106,8 +106,8 @@ public class ExecUnitActor extends AbstractActor {
.nodeInfo(serverNode)
.failover(Boolean.TRUE)
.allocKey(retryTask.getSceneName())
.routeKey(sceneConfig.getRouteKey())
.executorTimeout(sceneConfig.getExecutorTimeout())
.routeKey(retrySceneConfig.getRouteKey())
.executorTimeout(retrySceneConfig.getExecutorTimeout())
.client(RetryRpcClient.class)
.build();

View File

@ -16,7 +16,7 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -64,7 +64,7 @@ public class FailureActor extends AbstractActor {
try {
// 超过最大等级
SceneConfig sceneConfig =
RetrySceneConfig retrySceneConfig =
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(),
retryTask.getNamespaceId());
@ -76,7 +76,7 @@ public class FailureActor extends AbstractActor {
if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
maxRetryCount = systemProperties.getCallback().getMaxCount();
} else {
maxRetryCount = sceneConfig.getMaxRetryCount();
maxRetryCount = retrySceneConfig.getMaxRetryCount();
}
if (maxRetryCount <= retryTask.getRetryCount()) {

View File

@ -18,8 +18,8 @@ import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorS
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerWheel;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import io.netty.util.TimerTask;
@ -116,17 +116,17 @@ public abstract class AbstractScanGroup extends AbstractActor {
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, final ScanTask scanTask) {
// 批次查询场景
Map<String, SceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks, scanTask);
Map<String, RetrySceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks, scanTask);
List<RetryTask> waitUpdateRetryTasks = new ArrayList<>();
for (PartitionTask task : partitionTasks) {
RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task;
SceneConfig sceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName());
if (Objects.isNull(sceneConfig)) {
RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName());
if (Objects.isNull(retrySceneConfig)) {
continue;
}
RetryTask retryTask = processRetryTask(retryPartitionTask, sceneConfig);
RetryTask retryTask = processRetryTask(retryPartitionTask, retrySceneConfig);
waitUpdateRetryTasks.add(retryTask);
}
@ -147,22 +147,22 @@ public abstract class AbstractScanGroup extends AbstractActor {
}
private Map<String, SceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
private Map<String, RetrySceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
Set<String> sceneNameSet = partitionTasks.stream()
.map(partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()).collect(Collectors.toSet());
List<SceneConfig> sceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<SceneConfig>()
.select(SceneConfig::getBackOff, SceneConfig::getTriggerInterval, SceneConfig::getSceneName)
.eq(SceneConfig::getNamespaceId, scanTask.getNamespaceId())
.eq(SceneConfig::getGroupName, scanTask.getGroupName())
.in(SceneConfig::getSceneName, sceneNameSet));
return sceneConfigs.stream()
.collect(Collectors.toMap(SceneConfig::getSceneName, i -> i));
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName)
.eq(RetrySceneConfig::getNamespaceId, scanTask.getNamespaceId())
.eq(RetrySceneConfig::getGroupName, scanTask.getGroupName())
.in(RetrySceneConfig::getSceneName, sceneNameSet));
return retrySceneConfigs.stream()
.collect(Collectors.toMap(RetrySceneConfig::getSceneName, i -> i));
}
private RetryTask processRetryTask(RetryPartitionTask partitionTask, SceneConfig sceneConfig) {
private RetryTask processRetryTask(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig) {
RetryTask retryTask = new RetryTask();
retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask, sceneConfig));
retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask, retrySceneConfig));
retryTask.setId(partitionTask.getId());
return retryTask;
}
@ -174,7 +174,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected abstract void putLastId(String groupName, Long lastId);
protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask,
final SceneConfig sceneConfig);
final RetrySceneConfig retrySceneConfig);
protected abstract TimerTask timerTask(RetryPartitionTask partitionTask);

View File

@ -10,7 +10,7 @@ import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyCon
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.snailjob.server.retry.task.support.timer.CallbackTimerTask;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -59,7 +59,7 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
@Override
protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask,
final SceneConfig sceneConfig) {
final RetrySceneConfig retrySceneConfig) {
long triggerInterval = systemProperties.getCallback().getTriggerInterval();
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType());

View File

@ -11,7 +11,7 @@ import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyCon
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -59,7 +59,7 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
}
@Override
protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final SceneConfig sceneConfig) {
protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final RetrySceneConfig retrySceneConfig) {
// 更新下次触发时间
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
@ -71,10 +71,10 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
}
waitStrategyContext.setNextTriggerAt(DateUtils.toEpochMilli(nextTriggerAt));
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1);
// 更新触发时间, 任务进入时间轮
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
return DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext));
}

View File

@ -12,8 +12,8 @@ import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@ -44,11 +44,11 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
// 重试次数累加
retryCountIncrement(retryTask);
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(),
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(),
retryTask.getNamespaceId());
RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask, sceneConfig);
RetryExecutor executor = builderResultRetryExecutor(retryContext, sceneConfig);
RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask, retrySceneConfig);
RetryExecutor executor = builderResultRetryExecutor(retryContext, retrySceneConfig);
if (!preCheck(retryContext, executor)) {
return;
@ -92,10 +92,10 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
}
protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask,
final SceneConfig sceneConfig);
final RetrySceneConfig retrySceneConfig);
protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext,
final SceneConfig sceneConfig);
final RetrySceneConfig retrySceneConfig);
protected abstract ActorRef getActorRef();

View File

@ -12,7 +12,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.springframework.stereotype.Component;
/**
@ -27,22 +27,22 @@ public class CallbackTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask,
final SceneConfig sceneConfig) {
final RetrySceneConfig retrySceneConfig) {
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId()));
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId()));
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(),
retryTask.getGroupName(),
retryTask.getNamespaceId(),
sceneConfig.getRouteKey()));
retryContext.setSceneConfig(sceneConfig);
retrySceneConfig.getRouteKey()));
retryContext.setRetrySceneConfig(retrySceneConfig);
return retryContext;
}
@Override
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) {
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final RetrySceneConfig retrySceneConfig) {
return RetryBuilder.<Result>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())

View File

@ -15,7 +15,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.springframework.stereotype.Component;
/**
@ -30,22 +30,22 @@ public class ManualCallbackTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask,
final SceneConfig sceneConfig) {
final RetrySceneConfig retrySceneConfig) {
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId()));
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId()));
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(),
retryTask.getGroupName(),
retryTask.getNamespaceId(),
sceneConfig.getRouteKey()));
retryContext.setSceneConfig(sceneConfig);
retrySceneConfig.getRouteKey()));
retryContext.setRetrySceneConfig(retrySceneConfig);
return retryContext;
}
@Override
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) {
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final RetrySceneConfig retrySceneConfig) {
return RetryBuilder.<Result>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())

View File

@ -16,7 +16,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.springframework.stereotype.Component;
/**
@ -32,21 +32,21 @@ public class ManualRetryTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
final RetryTask retryTask,
final SceneConfig sceneConfig) {
final RetrySceneConfig retrySceneConfig) {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(
accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId()));
accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId()));
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(),
retryTask.getGroupName(), retryTask.getNamespaceId(), sceneConfig.getRouteKey()));
retryContext.setSceneConfig(sceneConfig);
retryTask.getGroupName(), retryTask.getNamespaceId(), retrySceneConfig.getRouteKey()));
retryContext.setRetrySceneConfig(retrySceneConfig);
return retryContext;
}
@Override
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext,
final SceneConfig sceneConfig) {
final RetrySceneConfig retrySceneConfig) {
RetryTask retryTask = retryContext.getRetryTask();
return RetryBuilder.<Result>newBuilder()
@ -69,9 +69,9 @@ public class ManualRetryTaskExecutor extends AbstractTaskExecutor {
private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName, String namespaceId) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(groupName, sceneName, namespaceId);
Integer backOff = sceneConfig.getBackOff();
Integer backOff = retrySceneConfig.getBackOff();
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
}

View File

@ -13,7 +13,7 @@ import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.snailjob.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.springframework.stereotype.Component;
/**
@ -29,27 +29,27 @@ public class RetryTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
final RetryTask retryTask,
final SceneConfig sceneConfig) {
final RetrySceneConfig retrySceneConfig) {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, sceneConfig.getNamespaceId()));
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName, retrySceneConfig.getNamespaceId()));
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(),
retryTask.getGroupName(),
retryTask.getNamespaceId(),
sceneConfig.getRouteKey()));
retryContext.setSceneConfig(sceneConfig);
retrySceneConfig.getRouteKey()));
retryContext.setRetrySceneConfig(retrySceneConfig);
return retryContext;
}
@Override
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext,
final SceneConfig sceneConfig) {
final RetrySceneConfig retrySceneConfig) {
return RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getWaitWaitStrategy(sceneConfig))
.withWaitStrategy(getWaitWaitStrategy(retrySceneConfig))
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
@ -64,8 +64,8 @@ public class RetryTaskExecutor extends AbstractTaskExecutor {
return TaskExecutorSceneEnum.AUTO_RETRY;
}
private WaitStrategy getWaitWaitStrategy(SceneConfig sceneConfig) {
Integer backOff = sceneConfig.getBackOff();
private WaitStrategy getWaitWaitStrategy(RetrySceneConfig retrySceneConfig) {
Integer backOff = retrySceneConfig.getBackOff();
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
}

View File

@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
@ -46,6 +47,11 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm<Ret
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Override
protected List<SyetemTaskTypeEnum> getSystemTaskType() {
return Lists.newArrayList(SyetemTaskTypeEnum.RETRY);
}
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {

View File

@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
@ -88,6 +89,11 @@ public class RetryTaskFailMoreThresholdAlarmListener extends
SnailJobLog.LOCAL.info("RetryTaskFailMoreThresholdAlarmListener started");
}
@Override
protected List<SyetemTaskTypeEnum> getSystemTaskType() {
return Lists.newArrayList(SyetemTaskTypeEnum.RETRY);
}
@Override
protected int getNotifyScene() {
return NotifySceneEnum.RETRY_TASK_REACH_THRESHOLD.getNotifyScene();

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.web.interceptor;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.exception.SnailJobAuthenticationException;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
import com.aizuda.snailjob.server.web.annotation.LoginRequired;
@ -23,6 +24,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.method.HandlerMethod;
@ -41,17 +43,13 @@ import java.util.stream.Collectors;
* @date:2023-04-26 12:52
*/
@Configuration
@RequiredArgsConstructor
public class AuthenticationInterceptor implements HandlerInterceptor {
public static final String AUTHENTICATION = "SNAIL-JOB-AUTH";
public static final String NAMESPACE_ID = "SNAIL-JOB-NAMESPACE-ID";
@Autowired
private SystemUserMapper systemUserMapper;
@Autowired
private NamespaceMapper namespaceMapper;
@Autowired
private SystemUserPermissionMapper systemUserPermissionMapper;
private final SystemUserMapper systemUserMapper;
private final NamespaceMapper namespaceMapper;
private final SystemUserPermissionMapper systemUserPermissionMapper;
@Override
public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object object) throws Exception {
@ -75,11 +73,11 @@ public class AuthenticationInterceptor implements HandlerInterceptor {
if (loginRequired.required()) {
// 执行认证
if (token == null) {
throw new SnailJobServerException("登陆过期,请重新登陆");
throw new SnailJobAuthenticationException("登陆过期,请重新登陆");
}
if (StrUtil.isBlank(namespaceId)) {
throw new SnailJobServerException("{} 命名空间不存在", namespaceId);
throw new SnailJobAuthenticationException("{} 命名空间不存在", namespaceId);
}
// 获取 token 中的 user id
@ -87,12 +85,12 @@ public class AuthenticationInterceptor implements HandlerInterceptor {
try {
systemUser = JsonUtil.parseObject(JWT.decode(token).getAudience().get(0), SystemUser.class);
} catch (JWTDecodeException j) {
throw new SnailJobServerException("登陆过期,请重新登陆");
throw new SnailJobAuthenticationException("登陆过期,请重新登陆");
}
systemUser = systemUserMapper.selectById(systemUser.getId());
if (Objects.isNull(systemUser)) {
throw new SnailJobServerException("用户不存在");
throw new SnailJobAuthenticationException("用户不存在");
}
Long count = namespaceMapper.selectCount(
@ -122,7 +120,7 @@ public class AuthenticationInterceptor implements HandlerInterceptor {
try {
jwtVerifier.verify(token);
} catch (JWTVerificationException e) {
throw new SnailJobServerException("登陆过期,请重新登陆");
throw new SnailJobAuthenticationException("登陆过期,请重新登陆");
}
RoleEnum role = loginRequired.role();
@ -132,7 +130,7 @@ public class AuthenticationInterceptor implements HandlerInterceptor {
if (role == RoleEnum.ADMIN) {
if (role != RoleEnum.getEnumTypeMap().get(systemUser.getRole())) {
throw new SnailJobServerException("不具备访问权限");
throw new SnailJobAuthenticationException("不具备访问权限");
}
}

View File

@ -20,7 +20,17 @@ public class NotifyConfigRequestVO {
@Pattern(regexp = "^[A-Za-z0-9_]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母和下划线")
private String groupName;
private String sceneName;
/**
* 业务id (scene_name或job_id或workflow_id)
*/
@NotBlank(message = "业务id不能为空")
private String businessId;
/**
* 任务类型 1重试任务 2回调任务 3JOB任务 4WORKFLOW任务
*/
@NotNull(message = "任务类型不能为空")
private Integer systemTaskType;
@NotNull(message = "通知状态不能为空")
private Integer notifyStatus;

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.server.web.model.request.SceneConfigRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
@ -14,6 +14,6 @@ public interface SceneConfigConverter {
SceneConfigConverter INSTANCE = Mappers.getMapper(SceneConfigConverter.class);
SceneConfig toSceneConfigRequestVO(SceneConfigRequestVO requestVO);
RetrySceneConfig toSceneConfigRequestVO(SceneConfigRequestVO requestVO);
}

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.server.web.model.response.SceneConfigResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
@ -16,7 +16,7 @@ public interface SceneConfigResponseVOConverter {
SceneConfigResponseVOConverter INSTANCE = Mappers.getMapper(SceneConfigResponseVOConverter.class);
SceneConfigResponseVO convert(SceneConfig sceneConfig);
SceneConfigResponseVO convert(RetrySceneConfig retrySceneConfig);
List<SceneConfigResponseVO> batchConvert(List<SceneConfig> sceneConfigs);
List<SceneConfigResponseVO> batchConvert(List<RetrySceneConfig> retrySceneConfigs);
}

View File

@ -40,12 +40,10 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.RetrySummaryMa
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobSummary;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySummary;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
import com.aizuda.snailjob.server.web.model.response.DashboardLineResponseVO;
import com.aizuda.snailjob.server.web.service.convert.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
@ -160,9 +158,9 @@ public class DashBoardServiceImpl implements DashBoardService {
DashboardRetryLineResponseVO dashboardRetryLineResponseVO = new DashboardRetryLineResponseVO();
// 重试任务列表
Page<Object> pager = new Page<>(baseQueryVO.getPage(), baseQueryVO.getSize());
LambdaQueryWrapper<SceneConfig> wrapper = new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.in(CollUtil.isNotEmpty(groupNames), SceneConfig::getGroupName, groupNames);
LambdaQueryWrapper<RetrySceneConfig> wrapper = new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
.in(CollUtil.isNotEmpty(groupNames), RetrySceneConfig::getGroupName, groupNames);
// 针对SQL Server的分页COUNT, 自定义statement ID
if (DbTypeEnum.SQLSERVER == DbUtils.getDbType()) {
pager.setSearchCount(false);

View File

@ -16,10 +16,9 @@ import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.server.web.service.convert.NotifyConfigResponseVOConverter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@ -30,10 +29,9 @@ import java.util.List;
* @date : 2022-03-03 11:17
*/
@Service
@RequiredArgsConstructor
public class NotifyConfigServiceImpl implements NotifyConfigService {
@Autowired
private AccessTemplate accessTemplate;
private final AccessTemplate accessTemplate;
@Override
public PageResult<List<NotifyConfigResponseVO>> getNotifyConfigList(NotifyConfigQueryVO queryVO) {
@ -51,7 +49,7 @@ public class NotifyConfigServiceImpl implements NotifyConfigService {
queryWrapper.eq(NotifyConfig::getGroupName, queryVO.getGroupName());
}
if (StrUtil.isNotBlank(queryVO.getSceneName())) {
queryWrapper.eq(NotifyConfig::getSceneName, queryVO.getSceneName());
queryWrapper.eq(NotifyConfig::getBusinessId, queryVO.getSceneName());
}
queryWrapper.orderByDesc(NotifyConfig::getId);

View File

@ -24,11 +24,9 @@ import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.web.service.convert.RetryDeadLetterResponseVOConverter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -117,21 +115,21 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
Assert.notEmpty(retryDeadLetterList, () -> new SnailJobServerException("数据不存在"));
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
ConfigAccess<RetrySceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
Set<String> sceneNameSet = retryDeadLetterList.stream().map(RetryDeadLetter::getSceneName)
.collect(Collectors.toSet());
List<SceneConfig> sceneConfigs = sceneConfigAccess.list(new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.in(SceneConfig::getSceneName, sceneNameSet));
List<RetrySceneConfig> retrySceneConfigs = sceneConfigAccess.list(new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
.in(RetrySceneConfig::getSceneName, sceneNameSet));
Map<String, SceneConfig> sceneConfigMap = sceneConfigs.stream().collect(Collectors.toMap((sceneConfig) ->
Map<String, RetrySceneConfig> sceneConfigMap = retrySceneConfigs.stream().collect(Collectors.toMap((sceneConfig) ->
sceneConfig.getGroupName() + sceneConfig.getSceneName(), Function.identity()));
List<RetryTask> waitRollbackList = new ArrayList<>();
for (RetryDeadLetter retryDeadLetter : retryDeadLetterList) {
SceneConfig sceneConfig = sceneConfigMap.get(
RetrySceneConfig retrySceneConfig = sceneConfigMap.get(
retryDeadLetter.getGroupName() + retryDeadLetter.getSceneName());
Assert.notNull(sceneConfig,
Assert.notNull(retrySceneConfig,
() -> new SnailJobServerException("未查询到场景. [{}]", retryDeadLetter.getSceneName()));
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter);
@ -140,9 +138,9 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(LocalDateTime.now());
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
retryTask.setCreateDt(LocalDateTime.now());
waitRollbackList.add(retryTask);

View File

@ -45,18 +45,9 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext;
import com.aizuda.snailjob.server.retry.task.generator.task.TaskGenerator;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutor;
import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter;
import com.aizuda.snailjob.server.web.service.convert.TaskContextConverter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -175,13 +166,13 @@ public class RetryTaskServiceImpl implements RetryTaskService {
// 若恢复重试则需要重新计算下次触发时间
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), namespaceId);
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli());
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(retryTask.getRetryCount() + 1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(retrySceneConfig.getBackOff());
retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
}
@ -238,12 +229,12 @@ public class RetryTaskServiceImpl implements RetryTaskService {
Assert.notEmpty(serverNodes,
() -> new SnailJobServerException("生成idempotentId失败: 不存在活跃的客户端节点"));
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(),
generateRetryIdempotentIdVO.getSceneName(), namespaceId);
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(sceneConfig.getSceneName(),
sceneConfig.getGroupName(), sceneConfig.getNamespaceId(), sceneConfig.getRouteKey());
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(retrySceneConfig.getSceneName(),
retrySceneConfig.getGroupName(), retrySceneConfig.getNamespaceId(), retrySceneConfig.getRouteKey());
// 委托客户端生成idempotentId
GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO();

View File

@ -17,9 +17,7 @@ import com.aizuda.snailjob.server.web.service.convert.SceneConfigResponseVOConve
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.SceneConfig;
import com.aizuda.snailjob.server.web.service.convert.SceneConfigConverter;
import com.aizuda.snailjob.server.web.service.convert.SceneConfigResponseVOConverter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -43,27 +41,27 @@ public class SceneConfigServiceImpl implements SceneConfigService {
@Override
public PageResult<List<SceneConfigResponseVO>> getSceneConfigPageList(SceneConfigQueryVO queryVO) {
PageDTO<SceneConfig> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
PageDTO<RetrySceneConfig> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
UserSessionVO userSessionVO = UserSessionUtils.currentUserSession();
String namespaceId = userSessionVO.getNamespaceId();
LambdaQueryWrapper<SceneConfig> sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<>();
sceneConfigLambdaQueryWrapper.eq(SceneConfig::getNamespaceId, namespaceId);
LambdaQueryWrapper<RetrySceneConfig> sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<>();
sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getNamespaceId, namespaceId);
if (userSessionVO.isUser()) {
sceneConfigLambdaQueryWrapper.in(SceneConfig::getGroupName, userSessionVO.getGroupNames());
sceneConfigLambdaQueryWrapper.in(RetrySceneConfig::getGroupName, userSessionVO.getGroupNames());
}
if (StrUtil.isNotBlank(queryVO.getGroupName())) {
sceneConfigLambdaQueryWrapper.eq(SceneConfig::getGroupName, queryVO.getGroupName().trim());
sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getGroupName, queryVO.getGroupName().trim());
}
if (StrUtil.isNotBlank(queryVO.getSceneName())) {
sceneConfigLambdaQueryWrapper.eq(SceneConfig::getSceneName, queryVO.getSceneName().trim());
sceneConfigLambdaQueryWrapper.eq(RetrySceneConfig::getSceneName, queryVO.getSceneName().trim());
}
pageDTO = accessTemplate.getSceneConfigAccess()
.listPage(pageDTO, sceneConfigLambdaQueryWrapper.orderByDesc(SceneConfig::getCreateDt));
.listPage(pageDTO, sceneConfigLambdaQueryWrapper.orderByDesc(RetrySceneConfig::getCreateDt));
return new PageResult<>(pageDTO, SceneConfigResponseVOConverter.INSTANCE.batchConvert(pageDTO.getRecords()));
@ -74,14 +72,14 @@ public class SceneConfigServiceImpl implements SceneConfigService {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<SceneConfig> sceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<SceneConfig>()
.select(SceneConfig::getSceneName, SceneConfig::getDescription, SceneConfig::getMaxRetryCount)
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, groupName)
.orderByDesc(SceneConfig::getCreateDt));
List<RetrySceneConfig> retrySceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getSceneName, RetrySceneConfig::getDescription, RetrySceneConfig::getMaxRetryCount)
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
.eq(RetrySceneConfig::getGroupName, groupName)
.orderByDesc(RetrySceneConfig::getCreateDt));
return SceneConfigResponseVOConverter.INSTANCE.batchConvert(sceneConfigs);
return SceneConfigResponseVOConverter.INSTANCE.batchConvert(retrySceneConfigs);
}
@Override
@ -89,21 +87,21 @@ public class SceneConfigServiceImpl implements SceneConfigService {
checkExecuteInterval(requestVO);
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
ConfigAccess<RetrySceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
Assert.isTrue(0 == sceneConfigAccess.count(
new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, requestVO.getGroupName())
.eq(SceneConfig::getSceneName, requestVO.getSceneName())
new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
.eq(RetrySceneConfig::getGroupName, requestVO.getGroupName())
.eq(RetrySceneConfig::getSceneName, requestVO.getSceneName())
), () -> new SnailJobServerException("场景名称重复. {}", requestVO.getSceneName()));
SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO);
sceneConfig.setCreateDt(LocalDateTime.now());
sceneConfig.setNamespaceId(namespaceId);
Assert.isTrue(1 == sceneConfigAccess.insert(sceneConfig),
() -> new SnailJobServerException("failed to insert scene. sceneConfig:[{}]",
JsonUtil.toJsonString(sceneConfig)));
RetrySceneConfig retrySceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO);
retrySceneConfig.setCreateDt(LocalDateTime.now());
retrySceneConfig.setNamespaceId(namespaceId);
Assert.isTrue(1 == sceneConfigAccess.insert(retrySceneConfig),
() -> new SnailJobServerException("failed to insert scene. retrySceneConfig:[{}]",
JsonUtil.toJsonString(retrySceneConfig)));
return Boolean.TRUE;
}
@ -125,29 +123,29 @@ public class SceneConfigServiceImpl implements SceneConfigService {
@Override
public Boolean updateSceneConfig(SceneConfigRequestVO requestVO) {
checkExecuteInterval(requestVO);
SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO);
RetrySceneConfig retrySceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO);
// 防止更新
sceneConfig.setSceneName(null);
sceneConfig.setGroupName(null);
sceneConfig.setNamespaceId(null);
retrySceneConfig.setSceneName(null);
retrySceneConfig.setGroupName(null);
retrySceneConfig.setNamespaceId(null);
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
sceneConfig.setTriggerInterval(Optional.ofNullable(sceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(sceneConfig,
new LambdaUpdateWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, requestVO.getGroupName())
.eq(SceneConfig::getSceneName, requestVO.getSceneName())),
() -> new SnailJobServerException("failed to update scene. sceneConfig:[{}]",
JsonUtil.toJsonString(sceneConfig)));
retrySceneConfig.setTriggerInterval(Optional.ofNullable(retrySceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(retrySceneConfig,
new LambdaUpdateWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
.eq(RetrySceneConfig::getGroupName, requestVO.getGroupName())
.eq(RetrySceneConfig::getSceneName, requestVO.getSceneName())),
() -> new SnailJobServerException("failed to update scene. retrySceneConfig:[{}]",
JsonUtil.toJsonString(retrySceneConfig)));
return Boolean.TRUE;
}
@Override
public SceneConfigResponseVO getSceneConfigDetail(Long id) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().one(new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getId, id));
return SceneConfigResponseVOConverter.INSTANCE.convert(sceneConfig);
RetrySceneConfig retrySceneConfig = accessTemplate.getSceneConfigAccess().one(new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getId, id));
return SceneConfigResponseVOConverter.INSTANCE.convert(retrySceneConfig);
}
}