feat(sj_1.0.0): 优化通知配置

This commit is contained in:
opensnail 2024-04-28 18:22:15 +08:00
parent 87a4adf45d
commit d5c78014f9
23 changed files with 431 additions and 282 deletions

View File

@ -60,14 +60,6 @@ public class GroupVersionCache implements Lifecycle {
return SystemConstants.DEFAULT_DDL;
}
public static Set<String> getSceneBlacklist() {
if (Objects.isNull(CONFIG)) {
return new HashSet<>();
}
return CONFIG.getSceneBlacklist();
}
public static ConfigDTO.Notify getNotifyAttribute(Integer notifyScene) {
List<ConfigDTO.Notify> notifyList = CONFIG.getNotifyList();
for (ConfigDTO.Notify notify : notifyList) {

View File

@ -21,7 +21,9 @@ import com.aizuda.snailjob.common.core.model.SnailJobHeaders;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify.Recipient;
import com.google.common.base.Defaults;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
@ -35,7 +37,9 @@ import java.io.Serializable;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@ -211,22 +215,26 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
private void sendMessage(Exception e) {
try {
ConfigDTO.Notify notifyAttribute = GroupVersionCache.getNotifyAttribute(
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(
RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
if (Objects.nonNull(notifyAttribute)) {
AlarmContext context = AlarmContext.build()
if (Objects.nonNull(notify)) {
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
for (final Recipient recipient : recipients) {
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
standardEnvironment.getProperty("snail-job.namespace", StrUtil.EMPTY),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
standardEnvironment.getProperty("snail-job.namespace", StrUtil.EMPTY),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
.title("retry component handling exception:[{}]", SnailJobProperties.getGroup())
.notifyAttribute(notifyAttribute.getNotifyAttribute());
.notifyAttribute(recipient.getNotifyAttribute());
Alarm<AlarmContext> alarmType = SpringContext.getBeanByType(SnailJobAlarmFactory.class).getAlarmType(recipient.getNotifyType());
alarmType.asyncSendMessage(context);
}
Alarm<AlarmContext> alarmType = SpringContext.getBeanByType(SnailJobAlarmFactory.class).getAlarmType(notifyAttribute.getNotifyType());
alarmType.asyncSendMessage(context);
}
} catch (Exception e1) {
SnailJobLog.LOCAL.error("Client failed to send component exception alert.", e1);

View File

@ -19,8 +19,10 @@ import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.window.Listener;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify.Recipient;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
import com.github.rholder.retry.*;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
@ -28,6 +30,7 @@ import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@ -107,26 +110,30 @@ public class ReportListener implements Listener<RetryTaskDTO> {
private void sendMessage(Throwable e) {
try {
ConfigDTO.Notify notifyAttribute = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
if (Objects.isNull(notifyAttribute)) {
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
if (Objects.isNull(notify)) {
return;
}
SnailJobProperties properties = SpringContext.getBean(SnailJobProperties.class);
AlarmContext context = AlarmContext.build()
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
for (final Recipient recipient : recipients) {
AlarmContext context = AlarmContext.build()
.text(reportErrorTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
properties.getNamespace(),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
properties.getNamespace(),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
.title("上报异常:[{}]", SnailJobProperties.getGroup())
.notifyAttribute(notifyAttribute.getNotifyAttribute());
.notifyAttribute(recipient.getNotifyAttribute());
SnailJobAlarmFactory snailJobAlarmFactory = SpringContext.getBeanByType(SnailJobAlarmFactory.class);
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(recipient.getNotifyType());
alarmType.asyncSendMessage(context);
}
SnailJobAlarmFactory snailJobAlarmFactory = SpringContext.getBeanByType(SnailJobAlarmFactory.class);
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(notifyAttribute.getNotifyType());
alarmType.asyncSendMessage(context);
} catch (Exception e1) {
SnailJobLog.LOCAL.error("客户端发送组件异常告警失败", e1);
}

View File

@ -16,7 +16,9 @@ import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify.Recipient;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -24,7 +26,9 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@ -91,25 +95,29 @@ public class SyncReport extends AbstractReport {
private void sendMessage(Throwable e) {
try {
ConfigDTO.Notify notifyAttribute = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
if (Objects.isNull(notifyAttribute)) {
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
if (Objects.isNull(notify)) {
return;
}
AlarmContext context = AlarmContext.build()
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
for (final Recipient recipient : recipients) {
AlarmContext context = AlarmContext.build()
.text(reportErrorTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
snailJobProperties.getNamespace(),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
snailJobProperties.getNamespace(),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
.title("同步上报异常:[{}]", SnailJobProperties.getGroup())
.notifyAttribute(notifyAttribute.getNotifyAttribute());
.notifyAttribute(recipient.getNotifyAttribute());
SnailJobAlarmFactory snailJobAlarmFactory = SpringContext.getBeanByType(SnailJobAlarmFactory.class);
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(recipient.getNotifyType());
alarmType.asyncSendMessage(context);
}
SnailJobAlarmFactory snailJobAlarmFactory = SpringContext.getBeanByType(SnailJobAlarmFactory.class);
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(notifyAttribute.getNotifyType());
alarmType.asyncSendMessage(context);
} catch (Exception e1) {
SnailJobLog.LOCAL.error("客户端发送组件异常告警失败", e1);
}

View File

@ -19,9 +19,11 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify.Recipient;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.StopStrategy;
import com.github.rholder.retry.WaitStrategy;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -29,6 +31,7 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
@ -175,21 +178,24 @@ public abstract class AbstractRetryStrategies implements RetryStrategy {
private void sendMessage(Exception e) {
try {
ConfigDTO.Notify notifyAttribute = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
if (Objects.nonNull(notifyAttribute)) {
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
snailJobProperties.getNamespace(),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
.title("retry component handling exception:[{}]", SnailJobProperties.getGroup())
.notifyAttribute(notifyAttribute.getNotifyAttribute());
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
if (Objects.nonNull(notify)) {
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(notifyAttribute.getNotifyType());
alarmType.asyncSendMessage(context);
for (final Recipient recipient : recipients) {
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
NetUtil.getLocalIpStr(),
snailJobProperties.getNamespace(),
SnailJobProperties.getGroup(),
LocalDateTime.now().format(formatter),
e.getMessage())
.title("retry component handling exception:[{}]", SnailJobProperties.getGroup())
.notifyAttribute(recipient.getNotifyAttribute());
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(recipient.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
} catch (Exception e1) {
SnailJobLog.LOCAL.error("Client failed to send component exception alert.", e1);

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.common.core.alarm;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -10,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @author: opensnail
* @date : 2021-11-25 09:20
*/
@Getter
@Component
public class SnailJobAlarmFactory {
@ -22,10 +24,6 @@ public class SnailJobAlarmFactory {
}
}
public Map<Integer, Alarm> getAlarmMap() {
return alarmMap;
}
public Alarm getAlarmType(Integer alarmType) {
return alarmMap.get(alarmType);
}

View File

@ -26,11 +26,6 @@ public class ConfigDTO {
*/
private List<Notify> notifyList;
/**
* 场景黑名单
*/
private Set<String> sceneBlacklist;
/**
* 版本号
*/
@ -54,14 +49,9 @@ public class ConfigDTO {
public static class Notify {
/**
* 通知类型 {@link AlarmTypeEnum}
* 通知
*/
private Integer notifyType;
/**
* 通知地址
*/
private String notifyAttribute;
private List<Recipient> recipients;
/**
* 触发阈值
@ -72,6 +62,21 @@ public class ConfigDTO {
* 场景场景 {@link RetryNotifySceneEnum}
*/
private Integer notifyScene;
@Data
public static class Recipient {
/**
* 通知类型 {@link AlarmTypeEnum}
*/
private Integer notifyType;
/**
* 通知地址
*/
private String notifyAttribute;
}
}

View File

@ -20,12 +20,6 @@ import java.util.Set;
*/
public interface ConfigAccess<T> extends Access<T> {
/**
* 获取所有组id列表
*
* @return 组id列表
*/
Set<String> getGroupNameList(String namespaceId);
/**
* 根据组id获取缓存上下文
@ -44,26 +38,6 @@ public interface ConfigAccess<T> extends Access<T> {
*/
RetrySceneConfig getSceneConfigByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId);
/**
* 获取通知配置
*
* @param groupName 组名称
* @param notifyScene {@link RetryNotifySceneEnum} 场景类型
* @return {@link NotifyConfig} 场景配置
*/
List<NotifyConfig> getNotifyConfigByGroupName(String groupName,Integer notifyScene, String namespaceId);
/**
* 获取通知配置
*
* @param groupName 组名称
* @param groupName 场景名称
* @param notifyScene {@link RetryNotifySceneEnum} 场景类型
* @return {@link NotifyConfig} 场景配置
*/
@Deprecated
List<NotifyConfig> getNotifyConfigByGroupNameAndSceneName(String groupName, String sceneName, Integer notifyScene);
/**
* 获取通知配置
@ -82,12 +56,6 @@ public interface ConfigAccess<T> extends Access<T> {
*/
List<RetrySceneConfig> getSceneConfigByGroupName(String groupName);
/**
* 获取已开启的组配置信息
*
*/
List<GroupConfig> getAllOpenGroupConfig(String namespaceId);
/**
* 场景黑名单
*

View File

@ -2,15 +2,20 @@ package com.aizuda.snailjob.template.datasource.access.config;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify;
import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify.Recipient;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.SceneConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -34,6 +39,8 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
protected SceneConfigMapper sceneConfigMapper;
@Autowired
protected GroupConfigMapper groupConfigMapper;
@Autowired
protected NotifyRecipientMapper notifyRecipientMapper;
protected static final List<String> ALLOW_DB = Arrays.asList(
DbTypeEnum.MYSQL.getDb(),
@ -46,22 +53,6 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
return DbUtils.getDbType();
}
protected List<NotifyConfig> getByGroupIdAndNotifyScene(String groupName, Integer notifyScene, String namespaceId) {
return notifyConfigMapper.selectList(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNamespaceId, namespaceId)
.eq(NotifyConfig::getGroupName, groupName)
.eq(NotifyConfig::getNotifyScene, notifyScene));
}
private List<NotifyConfig> getByGroupIdAndSceneIdAndNotifyScene(String groupName, String sceneName,
Integer notifyScene) {
return notifyConfigMapper.selectList(
new LambdaQueryWrapper<NotifyConfig>().eq(NotifyConfig::getGroupName, groupName)
.eq(NotifyConfig::getBusinessId, sceneName)
.eq(NotifyConfig::getNotifyScene, notifyScene));
}
protected RetrySceneConfig getByGroupNameAndSceneName(String groupName, String sceneName, String namespaceId) {
return sceneConfigMapper.selectOne(new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
@ -87,11 +78,6 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
.eq(NotifyConfig::getGroupName, groupName));
}
@Override
public Set<String> getGroupNameList(String namespaceId) {
List<GroupConfig> groupList = getAllConfigGroupList(namespaceId);
return groupList.stream().map(GroupConfig::getGroupName).collect(Collectors.toSet());
}
@Override
public GroupConfig getGroupConfigByGroupName(String groupName, String namespaceId) {
@ -103,17 +89,6 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
return getByGroupNameAndSceneName(groupName, sceneName, namespaceId);
}
@Override
public List<NotifyConfig> getNotifyConfigByGroupName(String groupName, Integer notifyScene, String namespaceId) {
return getByGroupIdAndNotifyScene(groupName, notifyScene, namespaceId);
}
@Override
public List<NotifyConfig> getNotifyConfigByGroupNameAndSceneName(String groupName, String sceneName,
Integer notifyScene) {
return getByGroupIdAndSceneIdAndNotifyScene(groupName, sceneName, notifyScene);
}
@Override
public List<NotifyConfig> getNotifyListConfigByGroupName(String groupName, String namespaceId) {
return getNotifyConfigs(groupName, namespaceId);
@ -124,18 +99,13 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
return getSceneConfigs(groupName);
}
@Override
public List<GroupConfig> getAllOpenGroupConfig(String namespaceId) {
return getAllConfigGroupList(namespaceId).stream().filter(i -> StatusEnum.YES.getStatus().equals(i.getGroupStatus()))
.collect(Collectors.toList());
}
@Override
public Set<String> getBlacklist(String groupName, String namespaceId) {
GroupConfig groupConfig = getByGroupName(groupName, namespaceId);
if (Objects.isNull(groupConfig)) {
return Collections.EMPTY_SET;
return new HashSet<>();
}
LambdaQueryWrapper<RetrySceneConfig> sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<RetrySceneConfig>()
@ -148,7 +118,7 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
List<RetrySceneConfig> retrySceneConfigs = sceneConfigMapper.selectList(sceneConfigLambdaQueryWrapper);
if (CollectionUtils.isEmpty(retrySceneConfigs)) {
return Collections.EMPTY_SET;
return new HashSet<>();
}
return retrySceneConfigs.stream().map(RetrySceneConfig::getSceneName).collect(Collectors.toSet());
@ -161,7 +131,7 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
.eq(GroupConfig::getNamespaceId, namespaceId)
.orderByAsc(GroupConfig::getId));
if (CollectionUtils.isEmpty(allSystemConfigGroupList)) {
return Collections.EMPTY_LIST;
return new ArrayList<>();
}
return allSystemConfigGroupList;
@ -172,7 +142,7 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
List<RetrySceneConfig> allSystemConfigSceneList = sceneConfigMapper.selectList(
new LambdaQueryWrapper<RetrySceneConfig>().orderByAsc(RetrySceneConfig::getId));
if (CollectionUtils.isEmpty(allSystemConfigSceneList)) {
return Collections.EMPTY_LIST;
return new ArrayList<>();
}
return allSystemConfigSceneList;
}
@ -191,7 +161,6 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
public ConfigDTO getConfigInfo(String groupName, final String namespaceId) {
ConfigDTO configDTO = new ConfigDTO();
configDTO.setSceneBlacklist(getBlacklist(groupName, namespaceId));
configDTO.setVersion(getConfigVersion(groupName, namespaceId));
List<NotifyConfig> notifyList = getNotifyListConfigByGroupName(groupName, namespaceId);
@ -206,12 +175,10 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
continue;
}
ConfigDTO.Notify notify = new ConfigDTO.Notify();
notify.setNotifyScene(notifyConfig.getNotifyScene());
notify.setNotifyType(notifyConfig.getNotifyType());
notify.setNotifyThreshold(notifyConfig.getNotifyThreshold());
notify.setNotifyAttribute(notifyConfig.getNotifyAttribute());
notifies.add(notify);
String recipientIds = notifyConfig.getRecipientIds();
List<NotifyRecipient> notifyRecipients = notifyRecipientMapper.selectBatchIds(
JsonUtil.parseList(recipientIds, Long.class));
notifies.add(getNotify(notifyConfig, notifyRecipients));
}
configDTO.setNotifyList(notifies);
@ -229,4 +196,20 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
configDTO.setSceneList(sceneList);
return configDTO;
}
private static Notify getNotify(final NotifyConfig notifyConfig, final List<NotifyRecipient> notifyRecipients) {
List<Recipient> recipients = new ArrayList<>();
for (final NotifyRecipient notifyRecipient : notifyRecipients) {
Recipient recipient = new Recipient();
recipient.setNotifyAttribute(notifyRecipient.getNotifyAttribute());
recipient.setNotifyType(notifyRecipient.getNotifyType());
recipients.add(recipient);
}
Notify notify = new Notify();
notify.setNotifyScene(notifyConfig.getNotifyScene());
notify.setNotifyThreshold(notifyConfig.getNotifyThreshold());
notify.setRecipients(recipients);
return notify;
}
}

View File

@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@ -12,6 +13,7 @@ import java.time.LocalDateTime;
@TableName("sj_notify_config")
public class NotifyConfig implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
@ -33,9 +35,7 @@ public class NotifyConfig implements Serializable {
private Integer notifyStatus;
private Integer notifyType;
private String notifyAttribute;
private String recipientIds;
private Integer notifyThreshold;

View File

@ -1,5 +1,7 @@
package com.aizuda.snailjob.server.common;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
@ -16,7 +18,9 @@ import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
@ -42,6 +46,19 @@ public interface AlarmInfoConverter {
List<NotifyConfigInfo> retryToNotifyConfigInfos(List<NotifyConfig> notifyConfigs);
@Mappings({
@Mapping(target = "recipientIds", expression = "java(AlarmInfoConverter.toNotifyRecipientIds(notifyConfig.getRecipientIds()))")
})
NotifyConfigInfo retryToNotifyConfigInfos(NotifyConfig notifyConfig);
static Set<Long> toNotifyRecipientIds(String notifyRecipientIdsStr) {
if (StrUtil.isBlank(notifyRecipientIdsStr)) {
return new HashSet<>();
}
return new HashSet<>(JsonUtil.parseList(notifyRecipientIdsStr, Long.class));
}
List<NotifyConfigInfo> jobToNotifyConfigInfos(List<JobNotifyConfig> notifyConfigs);
List<JobAlarmInfo> toJobAlarmInfos(List<JobBatchResponseDO> jobBatchResponse);

View File

@ -4,21 +4,26 @@ import com.aizuda.snailjob.common.core.alarm.Alarm;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.cache.CacheNotifyRateLimiter;
import com.aizuda.snailjob.server.common.dto.AlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo.RecipientInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.server.common.triple.Triple;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -42,9 +47,11 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
@Autowired
private SnailJobAlarmFactory snailJobAlarmFactory;
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
protected NotifyRecipientMapper recipientMapper;
@Override
public void run() {
@ -103,15 +110,40 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
return Maps.newHashMap();
}
Set<Long> recipientIds = notifyConfigs.stream()
.map(config -> {
return new HashSet<>(JsonUtil.parseList(config.getRecipientIds(), Long.class));
})
.reduce((a, b) -> {
HashSet<Long> set = Sets.newHashSet();
set.addAll(a);
set.addAll(b);
return set;
}).orElse(new HashSet<>());
List<NotifyRecipient> notifyRecipients = recipientMapper.selectBatchIds(recipientIds);
Map<Long, NotifyRecipient> recipientMap = notifyRecipients.stream()
.collect(Collectors.toMap(NotifyRecipient::getId, i->i));
List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs);
return notifyConfigInfos.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
List<RecipientInfo> recipients = i.getRecipientIds().stream().map(recipientId -> {
NotifyRecipient notifyRecipient = recipientMap.get(recipientId);
if (Objects.isNull(notifyRecipient)) {
return null;
}
return ImmutableTriple.of(namespaceId, groupName, i.getBusinessId());
RecipientInfo notifyConfigInfo = new RecipientInfo();
notifyConfigInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute());
notifyConfigInfo.setNotifyType(notifyRecipient.getNotifyType());
return notifyConfigInfo;
}).collect(Collectors.toList());
i.setRecipientInfos(recipients);
return ImmutableTriple.of(i.getNamespaceId(), i.getGroupName(), i.getBusinessId());
}));
}
@ -161,10 +193,17 @@ public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmI
}
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(
notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
for (final RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) {
if (Objects.isNull(recipientInfo)) {
continue;
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
context.setNotifyAttribute(recipientInfo.getNotifyAttribute());
Alarm<AlarmContext> alarmType = snailJobAlarmFactory.getAlarmType(
recipientInfo.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}

View File

@ -2,6 +2,9 @@ package com.aizuda.snailjob.server.common.dto;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
* @date 2023-12-03 10:02:43
@ -19,15 +22,13 @@ public class NotifyConfigInfo {
// 业务id (scene_name或job_id或workflow_id)
private String businessId;
private Set<Long> recipientIds;
// 任务类型 1重试任务 2回调任务 3JOB任务 4WORKFLOW任务
private Integer systemTaskType;
private Integer notifyStatus;
private Integer notifyType;
private String notifyAttribute;
private Integer notifyThreshold;
private Integer notifyScene;
@ -36,4 +37,14 @@ public class NotifyConfigInfo {
private Integer rateLimiterThreshold;
private List<RecipientInfo> recipientInfos;
@Data
public static class RecipientInfo {
private Integer notifyType;
private String notifyAttribute;
}
}

View File

@ -79,8 +79,7 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
alarmDTO.getExecutorInfo(),
alarmDTO.getArgsStr(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile())
.notifyAttribute(notifyConfig.getNotifyAttribute());
.title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile());
}
@Override

View File

@ -31,7 +31,8 @@ import java.util.concurrent.TimeUnit;
*/
@Component
@Slf4j
public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm<RetryTaskFailDeadLetterAlarmEvent> implements Runnable, Lifecycle {
public class RetryTaskFailDeadLetterAlarmListener extends
AbstractRetryAlarm<RetryTaskFailDeadLetterAlarmEvent> implements Runnable, Lifecycle {
/**
* 死信告警数据
@ -39,13 +40,13 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm<Ret
private LinkedBlockingQueue<List<RetryDeadLetter>> queue = new LinkedBlockingQueue<>(1000);
private static String retryTaskDeadTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" +
"> 空间ID:{} \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" +
"> 空间ID:{} \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Override
protected List<SyetemTaskTypeEnum> getSystemTaskType() {
@ -66,7 +67,7 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm<Ret
@Override
public void onApplicationEvent(RetryTaskFailDeadLetterAlarmEvent event) {
if (!queue.offer(event.getRetryDeadLetters())) {
SnailJobLog.LOCAL.warn("任务重试失败进入死信队列告警队列已满");
SnailJobLog.LOCAL.warn("任务重试失败进入死信队列告警队列已满");
}
}
@ -75,21 +76,20 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm<Ret
// 预警
return AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
retryAlarmInfo.getNamespaceId(),
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getExecutorName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getArgsStr(),
DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列",
retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
EnvironmentUtils.getActiveProfile(),
retryAlarmInfo.getNamespaceId(),
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getExecutorName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getArgsStr(),
DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列",
retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName());
}
@Override
protected void startLog() {
SnailJobLog.LOCAL.info("RetryTaskFailDeadLetterAlarmListener started");
SnailJobLog.LOCAL.info("RetryTaskFailDeadLetterAlarmListener started");
}
@Override

View File

@ -30,21 +30,21 @@ import java.util.concurrent.LinkedBlockingQueue;
@Component
@Slf4j
public class RetryTaskFailMoreThresholdAlarmListener extends
AbstractRetryAlarm<RetryTaskFailMoreThresholdAlarmEvent> implements Runnable, Lifecycle {
AbstractRetryAlarm<RetryTaskFailMoreThresholdAlarmEvent> implements Runnable, Lifecycle {
/**
* 重试任务失败数量超过阈值告警数据
*/
private LinkedBlockingQueue<RetryTask> queue = new LinkedBlockingQueue<>(1000);
private static String retryTaskFailMoreThresholdMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 任务重试次数超过{}个</font> \n" +
"> 空间ID:{} \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 重试次数:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 任务重试次数超过{}个</font> \n" +
"> 空间ID:{} \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 重试次数:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {
@ -61,7 +61,7 @@ public class RetryTaskFailMoreThresholdAlarmListener extends
@Override
public void onApplicationEvent(RetryTaskFailMoreThresholdAlarmEvent event) {
if (!queue.offer(event.getRetryTask())) {
SnailJobLog.LOCAL.warn("任务失败数量超过阈值告警队列已满");
SnailJobLog.LOCAL.warn("任务失败数量超过阈值告警队列已满");
}
}
@ -70,23 +70,22 @@ public class RetryTaskFailMoreThresholdAlarmListener extends
// 预警
return AlarmContext.build().text(retryTaskFailMoreThresholdMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
notifyConfig.getNotifyThreshold(),
retryAlarmInfo.getNamespaceId(),
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getExecutorName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getRetryCount(),
retryAlarmInfo.getArgsStr(),
DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境任务重试次数超过阈值", retryAlarmInfo.getGroupName(),
retryAlarmInfo.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
EnvironmentUtils.getActiveProfile(),
notifyConfig.getNotifyThreshold(),
retryAlarmInfo.getNamespaceId(),
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getExecutorName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getRetryCount(),
retryAlarmInfo.getArgsStr(),
DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境任务重试次数超过阈值", retryAlarmInfo.getGroupName(),
retryAlarmInfo.getSceneName());
}
@Override
protected void startLog() {
SnailJobLog.LOCAL.info("RetryTaskFailMoreThresholdAlarmListener started");
SnailJobLog.LOCAL.info("RetryTaskFailMoreThresholdAlarmListener started");
}
@Override

View File

@ -1,53 +0,0 @@
package com.aizuda.snailjob.server.web.controller;
import com.aizuda.snailjob.server.web.annotation.LoginRequired;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigQueryVO;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigRequestVO;
import com.aizuda.snailjob.server.web.model.response.JobNotifyConfigResponseVO;
import com.aizuda.snailjob.server.web.service.JobNotifyConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* Job通知配置接口
*
* @author: zuoJunLin
* @date : 2023-12-02 11:32
* @since 2.5.0
*/
@RestController
@RequestMapping("/job/notify/config")
public class JobNotifyConfigController {
@Autowired
private JobNotifyConfigService jobNotifyConfigService;
@LoginRequired
@GetMapping("/page/list")
public PageResult<List<JobNotifyConfigResponseVO>> getNotifyConfigList(JobNotifyConfigQueryVO queryVO) {
return jobNotifyConfigService.getJobNotifyConfigList(queryVO);
}
@LoginRequired
@GetMapping("{id}")
public JobNotifyConfigResponseVO getJobNotifyConfigDetail(@PathVariable("id") Long id) {
return jobNotifyConfigService.getJobNotifyConfigDetail(id);
}
@LoginRequired
@PostMapping
public Boolean saveNotify(@RequestBody @Validated JobNotifyConfigRequestVO requestVO) {
return jobNotifyConfigService.saveJobNotify(requestVO);
}
@LoginRequired
@PutMapping
public Boolean updateNotify(@RequestBody @Validated JobNotifyConfigRequestVO requestVO) {
return jobNotifyConfigService.updateJobNotify(requestVO);
}
}

View File

@ -1,11 +1,15 @@
package com.aizuda.snailjob.server.web.model.request;
import jakarta.validation.constraints.NotEmpty;
import lombok.Data;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import java.util.List;
import java.util.Set;
/**
* @author opensnail
* @date 2023-10-25 08:40:10
@ -35,11 +39,8 @@ public class NotifyConfigRequestVO {
@NotNull(message = "通知状态不能为空")
private Integer notifyStatus;
@NotNull(message = "通知类型不能为空")
private Integer notifyType;
@NotBlank(message = "通知属性不能为空")
private String notifyAttribute;
@NotEmpty(message = "通知人列表")
private Set<Long> notifyRecipientIds;
private Integer notifyThreshold;

View File

@ -2,8 +2,11 @@ package com.aizuda.snailjob.server.web.model.response;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
@Data
public class NotifyConfigResponseVO implements Serializable {
@ -12,15 +15,25 @@ public class NotifyConfigResponseVO implements Serializable {
private String groupName;
private String sceneName;
/**
* 业务id (scene_name或job_id或workflow_id)
*/
private String businessId;
private String businessName;
/**
* 任务类型 1重试任务 2回调任务 3JOB任务 4WORKFLOW任务
*/
private Integer systemTaskType;
private Integer notifyStatus;
private String notifyName;
private Integer notifyType;
private Set<Long> recipientIds;
private String notifyAttribute;
private Set<String> recipientNames;
private Integer notifyThreshold;
@ -36,6 +49,7 @@ public class NotifyConfigResponseVO implements Serializable {
private LocalDateTime updateDt;
@Serial
private static final long serialVersionUID = 1L;

View File

@ -24,4 +24,7 @@ public interface NotifyConfigService {
Boolean updateNotify(NotifyConfigRequestVO requestVO);
NotifyConfigResponseVO getNotifyConfigDetail(Long id);
Boolean updateStatus(Long id, Integer status);
}

View File

@ -1,9 +1,15 @@
package com.aizuda.snailjob.server.web.service.convert;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.web.model.request.NotifyConfigRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import org.springframework.util.CollectionUtils;
import java.util.Set;
/**
* @author: opensnail
@ -14,5 +20,16 @@ public interface NotifyConfigConverter {
NotifyConfigConverter INSTANCE = Mappers.getMapper(NotifyConfigConverter.class);
@Mappings({
@Mapping(target = "recipientIds", expression = "java(NotifyConfigConverter.toNotifyRecipientIdsStr(notifyConfigVO.getNotifyRecipientIds()))")
})
NotifyConfig toNotifyConfig(NotifyConfigRequestVO notifyConfigVO);
static String toNotifyRecipientIdsStr(Set<Long> notifyRecipientIds) {
if (CollectionUtils.isEmpty(notifyRecipientIds)) {
return null;
}
return JsonUtil.toJsonString(notifyRecipientIds);
}
}

View File

@ -1,11 +1,17 @@
package com.aizuda.snailjob.server.web.service.convert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.server.web.model.response.NotifyConfigResponseVO;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @author: opensnail
@ -16,7 +22,18 @@ public interface NotifyConfigResponseVOConverter {
NotifyConfigResponseVOConverter INSTANCE = Mappers.getMapper(NotifyConfigResponseVOConverter.class);
@Mappings({
@Mapping(target = "recipientIds", expression = "java(NotifyConfigResponseVOConverter.toNotifyRecipientIds(notifyConfig.getRecipientIds()))")
})
NotifyConfigResponseVO convert(NotifyConfig notifyConfig);
List<NotifyConfigResponseVO> batchConvert(List<NotifyConfig> notifyConfigs);
static Set<Long> toNotifyRecipientIds(String notifyRecipientIdsStr) {
if (StrUtil.isBlank(notifyRecipientIdsStr)) {
return new HashSet<>();
}
return new HashSet<>(JsonUtil.parseList(notifyRecipientIdsStr, Long.class));
}
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.service.impl;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.NotifyConfigQueryVO;
@ -15,14 +16,31 @@ import com.aizuda.snailjob.server.web.service.convert.NotifyConfigResponseVOConv
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author: opensnail
@ -31,7 +49,11 @@ import java.util.List;
@Service
@RequiredArgsConstructor
public class NotifyConfigServiceImpl implements NotifyConfigService {
private final AccessTemplate accessTemplate;
private final NotifyRecipientMapper notifyRecipientMapper;
private final JobMapper jobMapper;
private final WorkflowMapper workflowMapper;
@Override
public PageResult<List<NotifyConfigResponseVO>> getNotifyConfigList(NotifyConfigQueryVO queryVO) {
@ -53,19 +75,92 @@ public class NotifyConfigServiceImpl implements NotifyConfigService {
}
queryWrapper.orderByDesc(NotifyConfig::getId);
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().listPage(pageDTO, queryWrapper).getRecords();
return new PageResult<>(pageDTO, NotifyConfigResponseVOConverter.INSTANCE.batchConvert(notifyConfigs));
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().listPage(pageDTO, queryWrapper)
.getRecords();
if (CollectionUtils.isEmpty(notifyConfigs)) {
return new PageResult<>(pageDTO, Lists.newArrayList());
}
List<NotifyConfigResponseVO> notifyConfigResponseVOS = NotifyConfigResponseVOConverter.INSTANCE.batchConvert(
notifyConfigs);
Map<Long, String> recipientNameMap = getRecipientNameMap(notifyConfigResponseVOS);
Map<Long, String> jobNameMap = getJobNameMap(notifyConfigResponseVOS);
Map<Long, String> workflowNameMap = getWorkflowNameMap(notifyConfigResponseVOS);
for (final NotifyConfigResponseVO notifyConfigResponseVO : notifyConfigResponseVOS) {
notifyConfigResponseVO.setRecipientNames(notifyConfigResponseVO.getRecipientIds()
.stream().map(recipientId -> recipientNameMap.getOrDefault(recipientId, StrUtil.EMPTY))
.collect(Collectors.toSet()));
if (Objects.equals(notifyConfigResponseVO.getSystemTaskType(), SyetemTaskTypeEnum.RETRY.getType()) ||
Objects.equals(notifyConfigResponseVO.getSystemTaskType(), SyetemTaskTypeEnum.CALLBACK.getType())) {
notifyConfigResponseVO.setBusinessName(notifyConfigResponseVO.getBusinessId());
} else if (Objects.equals(notifyConfigResponseVO.getSystemTaskType(), SyetemTaskTypeEnum.JOB.getType())) {
notifyConfigResponseVO.setBusinessName(
jobNameMap.get(Long.parseLong(notifyConfigResponseVO.getBusinessId())));
} else if (Objects.equals(notifyConfigResponseVO.getSystemTaskType(),
SyetemTaskTypeEnum.WORKFLOW.getType())) {
notifyConfigResponseVO.setBusinessName(
workflowNameMap.get(Long.parseLong(notifyConfigResponseVO.getBusinessId())));
}
}
return new PageResult<>(pageDTO, notifyConfigResponseVOS);
}
private Map<Long, String> getWorkflowNameMap(final List<NotifyConfigResponseVO> notifyConfigResponseVOS) {
Set<Long> workflowIds = notifyConfigResponseVOS.stream().filter(responseVO ->
responseVO.getSystemTaskType().equals(SyetemTaskTypeEnum.WORKFLOW.getType()))
.map(responseVO -> Long.parseLong(responseVO.getBusinessId()))
.collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(workflowIds)) {
List<Workflow> workflows = workflowMapper.selectBatchIds(workflowIds);
return workflows.stream().collect(Collectors.toMap(Workflow::getId, Workflow::getWorkflowName));
}
return new HashMap<>();
}
private Map<Long, String> getJobNameMap(final List<NotifyConfigResponseVO> notifyConfigResponseVOS) {
Set<Long> jobIds = notifyConfigResponseVOS.stream().filter(responseVO ->
responseVO.getSystemTaskType().equals(SyetemTaskTypeEnum.JOB.getType()))
.map(responseVO -> Long.parseLong(responseVO.getBusinessId()))
.collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(jobIds)) {
List<Job> jobs = jobMapper.selectBatchIds(jobIds);
return jobs.stream().collect(Collectors.toMap(Job::getId, Job::getJobName));
}
return new HashMap<>();
}
@NotNull
private Map<Long, String> getRecipientNameMap(final List<NotifyConfigResponseVO> notifyConfigResponseVOS) {
Set<Long> recipientIds = notifyConfigResponseVOS.stream()
.map(NotifyConfigResponseVO::getRecipientIds)
.reduce((a, b) -> {
HashSet<Long> set = Sets.newHashSet();
set.addAll(a);
set.addAll(b);
return set;
}).orElse(new HashSet<>());
List<NotifyRecipient> notifyRecipients = notifyRecipientMapper.selectBatchIds(recipientIds);
return notifyRecipients.stream()
.collect(Collectors.toMap(NotifyRecipient::getId, NotifyRecipient::getRecipientName));
}
@Override
public Boolean saveNotify(NotifyConfigRequestVO requestVO) {
NotifyConfig notifyConfig = NotifyConfigConverter.INSTANCE.toNotifyConfig(requestVO);
notifyConfig.setCreateDt(LocalDateTime.now());
notifyConfig.setRecipientIds(JsonUtil.toJsonString(requestVO.getNotifyRecipientIds()));
notifyConfig.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
ConfigAccess<NotifyConfig> notifyConfigAccess = accessTemplate.getNotifyConfigAccess();
Assert.isTrue(1 == notifyConfigAccess.insert(notifyConfig),
() -> new SnailJobServerException("failed to insert notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig)));
() -> new SnailJobServerException("failed to insert notify. sceneConfig:[{}]",
JsonUtil.toJsonString(notifyConfig)));
return Boolean.TRUE;
}
@ -73,17 +168,32 @@ public class NotifyConfigServiceImpl implements NotifyConfigService {
public Boolean updateNotify(NotifyConfigRequestVO requestVO) {
Assert.notNull(requestVO.getId(), () -> new SnailJobServerException("参数异常"));
NotifyConfig notifyConfig = NotifyConfigConverter.INSTANCE.toNotifyConfig(requestVO);
notifyConfig.setRecipientIds(JsonUtil.toJsonString(requestVO.getNotifyRecipientIds()));
// 防止被覆盖
notifyConfig.setNamespaceId(null);
Assert.isTrue(1 == accessTemplate.getNotifyConfigAccess().updateById(notifyConfig),
() -> new SnailJobServerException("failed to update notify. sceneConfig:[{}]", JsonUtil.toJsonString(notifyConfig)));
() -> new SnailJobServerException("failed to update notify. sceneConfig:[{}]",
JsonUtil.toJsonString(notifyConfig)));
return Boolean.TRUE;
}
@Override
public NotifyConfigResponseVO getNotifyConfigDetail(Long id) {
NotifyConfig notifyConfig = accessTemplate.getNotifyConfigAccess().one(new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getId, id));
.eq(NotifyConfig::getId, id));
return NotifyConfigResponseVOConverter.INSTANCE.convert(notifyConfig);
}
@Override
public Boolean updateStatus(final Long id, final Integer status) {
NotifyConfig config = new NotifyConfig();
config.setNotifyStatus(status);
config.setUpdateDt(LocalDateTime.now());
return 1 == accessTemplate.getNotifyConfigAccess()
.update(config, new LambdaUpdateWrapper<NotifyConfig>()
.eq(NotifyConfig::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.eq(NotifyConfig::getId, id)
);
}
}