feat(sj_1.0.0): 优化同步配置逻辑

This commit is contained in:
opensnail 2024-05-08 18:18:01 +08:00
parent d5b5c1cef2
commit 0005ce430d
13 changed files with 202 additions and 197 deletions

View File

@ -1,11 +1,8 @@
package com.aizuda.snailjob.client.common.intercepter;
package com.aizuda.snailjob.client.common;
import com.aizuda.snailjob.client.common.rpc.supports.scan.EndPointInfo;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpRequest;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpRequest;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse;
import com.aizuda.snailjob.client.common.rpc.supports.scan.EndPointInfo;
/**
* @author: opensnail

View File

@ -1,15 +1,7 @@
package com.aizuda.snailjob.client.common.cache;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@ -19,9 +11,8 @@ import java.util.Objects;
* @author: opensnail
* @date : 2022-05-02 21:06
*/
@Component
@Order
public class GroupVersionCache implements Lifecycle {
public final class GroupVersionCache {
private GroupVersionCache() {}
private static ConfigDTO CONFIG;
@ -78,30 +69,4 @@ public class GroupVersionCache implements Lifecycle {
return null;
}
@Override
public void start() {
try {
NettyClient client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
.client(NettyClient.class)
.callback(nettyResult -> {
if (Objects.isNull(nettyResult.getData())) {
SnailJobLog.LOCAL.error("获取配置结果为null");
return;
}
GroupVersionCache.setConfig(JsonUtil.parseObject(nettyResult.getData().toString(), ConfigDTO.class));
})
.build();
client.getConfig(0);
} catch (Exception e) {
SnailJobLog.LOCAL.error("同步版本失败", e);
}
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,60 @@
package com.aizuda.snailjob.client.common.handler;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author: opensnail
* @date : 2024-05-08
* @since : sj_1.0.0
*/
@Component
public class SyncRemoteConfig implements Lifecycle {
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "sync-remote-config"));
@Override
public void start() {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
try {
NettyClient client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
.client(NettyClient.class)
.callback(nettyResult -> {
if (Objects.isNull(nettyResult.getData())) {
SnailJobLog.LOCAL.error("获取配置结果为null");
return;
}
GroupVersionCache.setConfig(
JsonUtil.parseObject(nettyResult.getData().toString(), ConfigDTO.class));
})
.build();
client.getConfig(0);
} catch (Exception e) {
SnailJobLog.LOCAL.error("同步版本失败", e);
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("通知配置失败", e);
}
}, 0, 1, TimeUnit.MINUTES);
}
@Override
public void close() {
}
}

View File

@ -5,7 +5,7 @@ import cn.hutool.core.util.ServiceLoaderUtil;
import com.aizuda.snailjob.client.common.cache.EndPointInfoCache;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.common.intercepter.HandlerInterceptor;
import com.aizuda.snailjob.client.common.HandlerInterceptor;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.common.rpc.supports.scan.EndPointInfo;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpRequest;

View File

@ -76,7 +76,9 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
return notifyConfigMapper.selectList(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNamespaceId, namespaceId)
.eq(NotifyConfig::getGroupName, groupName));
.eq(NotifyConfig::getGroupName, groupName)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
);
}

View File

@ -2,12 +2,10 @@ package com.aizuda.snailjob.server.common.register;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Register;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.config.SystemProperties;

View File

@ -71,9 +71,6 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
try {
// 同步版本
syncConfig(headers);
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorSceneEnum.CLIENT_REPORT.getScene()))
.findFirst().orElseThrow(() -> new SnailJobServerException("没有匹配的任务生成器"));
@ -143,10 +140,4 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
}
}
private void syncConfig(HttpHeaders headers) {
ConfigVersionSyncHandler syncHandler = SpringContext.getBeanByType(ConfigVersionSyncHandler.class);
Integer clientVersion = headers.getInt(HeadersEnum.VERSION.getKey());
String namespace = headers.getAsString(HeadersEnum.NAMESPACE.getKey());
syncHandler.addSyncTask(headers.get(HeadersEnum.GROUP_NAME.getKey()), namespace, clientVersion);
}
}

View File

@ -13,7 +13,7 @@ import org.springframework.boot.test.context.SpringBootTest;
public class ConfigVersionSyncHandlerTest {
// @Autowired
// private ConfigVersionSyncHandler configVersionSyncHandler;
// private SyncConfigHandler configVersionSyncHandler;
//
// @SneakyThrows
// @Test

View File

@ -1,28 +0,0 @@
package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigQueryVO;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigRequestVO;
import com.aizuda.snailjob.server.web.model.response.JobNotifyConfigResponseVO;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigQueryVO;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigRequestVO;
import com.aizuda.snailjob.server.web.model.response.JobNotifyConfigResponseVO;
import java.util.List;
/**
* @author: zuoJunLin
* @date : 2023-12-02 12:54
* @since 2.5.0
*/
public interface JobNotifyConfigService {
PageResult<List<JobNotifyConfigResponseVO>> getJobNotifyConfigList(JobNotifyConfigQueryVO queryVO);
Boolean saveJobNotify(JobNotifyConfigRequestVO requestVO);
Boolean updateJobNotify(JobNotifyConfigRequestVO requestVO);
JobNotifyConfigResponseVO getJobNotifyConfigDetail(Long id);
}

View File

@ -0,0 +1,105 @@
package com.aizuda.snailjob.server.web.service.handler;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.client.CommonRpcClient;
import com.aizuda.snailjob.server.common.dto.ConfigSyncTask;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 版本同步
*
* @author opensnail
* @date 2023-06-10
* @since 1.6.0
*/
@Component
@RequiredArgsConstructor
public class SyncConfigHandler implements Lifecycle, Runnable {
private static final LinkedBlockingQueue<ConfigSyncTask> QUEUE = new LinkedBlockingQueue<>(256);
public Thread THREAD = null;
protected final AccessTemplate accessTemplate;
/**
* 添加任务
*
* @param groupName
* @return false-队列容量已满 true-添加成功
*/
public static boolean addSyncTask(String groupName, String namespaceId) {
ConfigSyncTask configSyncTask = new ConfigSyncTask();
configSyncTask.setNamespaceId(namespaceId);
configSyncTask.setGroupName(groupName);
return QUEUE.offer(configSyncTask);
}
/**
* 同步版本
*
* @param groupName
* @param namespaceId 空间id
*/
public void syncVersion(String groupName, final String namespaceId) {
try {
Set<RegisterNodeInfo> serverNodeSet = CacheRegisterTable.getServerNodeSet(groupName, namespaceId);
// 同步版本到每个客户端节点
for (final RegisterNodeInfo registerNodeInfo : serverNodeSet) {
ConfigDTO configDTO = accessTemplate.getGroupConfigAccess().getConfigInfo(groupName, namespaceId);
CommonRpcClient rpcClient = RequestBuilder.<CommonRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.client(CommonRpcClient.class)
.build();
SnailJobLog.LOCAL.info("同步结果 [{}]", rpcClient.syncConfig(configDTO));
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("version sync error. groupName:[{}]", groupName, e);
}
}
@Override
public void start() {
THREAD = new Thread(this, "config-version-sync");
THREAD.start();
}
@Override
public void close() {
if (Objects.nonNull(THREAD)) {
THREAD.interrupt();
}
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
ConfigSyncTask task = QUEUE.take();
syncVersion(task.getGroupName(), task.getNamespaceId());
} catch (InterruptedException e) {
SnailJobLog.LOCAL.info("[{}] thread stop.", Thread.currentThread().getName());
} catch (Exception e) {
SnailJobLog.LOCAL.error("client refresh expireAt error.", e);
} finally {
try {
// 防止刷的过快休眠1s
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
}
}
}
}

View File

@ -1,108 +0,0 @@
package com.aizuda.snailjob.server.web.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigQueryVO;
import com.aizuda.snailjob.server.web.model.request.JobNotifyConfigRequestVO;
import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
import com.aizuda.snailjob.server.web.model.response.JobNotifyConfigResponseVO;
import com.aizuda.snailjob.server.web.service.JobNotifyConfigService;
import com.aizuda.snailjob.server.web.service.convert.JobNotifyConfigConverter;
import com.aizuda.snailjob.server.web.service.convert.JobNotifyConfigResponseVOConverter;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobNotifyConfigQueryDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobNotifyConfigResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobNotifyConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig;
import com.aizuda.snailjob.server.web.model.response.JobNotifyConfigResponseVO;
import com.aizuda.snailjob.server.web.service.convert.JobNotifyConfigConverter;
import com.aizuda.snailjob.server.web.service.convert.JobNotifyConfigResponseVOConverter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
/**
* @author: zuoJunLin
* @date : 2023-12-02 12:54
* @since 2.5.0
*/
@Service
public class JobNotifyConfigServiceImpl implements JobNotifyConfigService {
@Autowired
private JobNotifyConfigMapper jobNotifyConfigMapper;
@Override
public PageResult<List<JobNotifyConfigResponseVO>> getJobNotifyConfigList(JobNotifyConfigQueryVO queryVO) {
PageDTO<JobNotifyConfig> pageDTO = new PageDTO<>();
UserSessionVO userSessionVO = UserSessionUtils.currentUserSession();
JobNotifyConfigQueryDO queryDO = new JobNotifyConfigQueryDO();
queryDO.setNamespaceId(userSessionVO.getNamespaceId());
List<String> groupNames = Lists.newArrayList();
if (userSessionVO.isUser()) {
groupNames = userSessionVO.getGroupNames();
}
if (StrUtil.isNotBlank(queryVO.getGroupName())) {
// 说明当前组不在用户配置的权限中
if (!CollectionUtils.isEmpty(groupNames) && !groupNames.contains(queryVO.getGroupName())) {
return new PageResult<>(pageDTO, Lists.newArrayList());
} else {
groupNames = Lists.newArrayList(queryVO.getGroupName());
}
}
queryDO.setGroupNames(groupNames);
if (Objects.nonNull(queryVO.getJobId())) {
queryDO.setJobId(queryVO.getJobId());
}
QueryWrapper<JobNotifyConfig> queryWrapper = new QueryWrapper<JobNotifyConfig>()
.eq("a.namespace_id", queryDO.getNamespaceId())
.eq(queryDO.getJobId() != null, "a.job_id", queryDO.getJobId())
.in(CollUtil.isNotEmpty(queryDO.getGroupNames()), "a.group_name", queryDO.getGroupNames())
.orderByDesc("a.id");
List<JobNotifyConfigResponseDO> batchResponseDOList = jobNotifyConfigMapper.selectJobNotifyConfigList(pageDTO, queryWrapper);
return new PageResult<>(pageDTO, JobNotifyConfigResponseVOConverter.INSTANCE.batchConvert(batchResponseDOList));
}
@Override
public Boolean saveJobNotify(JobNotifyConfigRequestVO requestVO) {
JobNotifyConfig jobNotifyConfig = JobNotifyConfigConverter.INSTANCE.toJobNotifyConfig(requestVO);
jobNotifyConfig.setCreateDt(LocalDateTime.now());
jobNotifyConfig.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
Assert.isTrue(1 == jobNotifyConfigMapper.insert(jobNotifyConfig),
() -> new SnailJobServerException("failed to insert jobNotifyConfig. sceneConfig:[{}]", JsonUtil.toJsonString(jobNotifyConfig)));
return Boolean.TRUE;
}
@Override
public Boolean updateJobNotify(JobNotifyConfigRequestVO requestVO) {
Assert.notNull(requestVO.getId(), () -> new SnailJobServerException("参数异常"));
JobNotifyConfig jobNotifyConfig = JobNotifyConfigConverter.INSTANCE.toJobNotifyConfig(requestVO);
// 防止被覆盖
jobNotifyConfig.setNamespaceId(null);
Assert.isTrue(1 == jobNotifyConfigMapper.updateById(jobNotifyConfig),
() -> new SnailJobServerException("failed to update jobNotifyConfig. sceneConfig:[{}]", JsonUtil.toJsonString(jobNotifyConfig)));
return Boolean.TRUE;
}
@Override
public JobNotifyConfigResponseVO getJobNotifyConfigDetail(Long id) {
JobNotifyConfig jobNotifyConfig = jobNotifyConfigMapper.selectOne(new LambdaQueryWrapper<JobNotifyConfig>()
.eq(JobNotifyConfig::getId, id));
return JobNotifyConfigResponseVOConverter.INSTANCE.convert(jobNotifyConfig);
}
}

View File

@ -13,12 +13,14 @@ import com.aizuda.snailjob.server.web.model.response.NotifyConfigResponseVO;
import com.aizuda.snailjob.server.web.service.NotifyConfigService;
import com.aizuda.snailjob.server.web.service.convert.NotifyConfigConverter;
import com.aizuda.snailjob.server.web.service.convert.NotifyConfigResponseVOConverter;
import com.aizuda.snailjob.server.web.service.handler.SyncConfigHandler;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
@ -192,14 +194,28 @@ public class NotifyConfigServiceImpl implements NotifyConfigService {
@Override
public Boolean updateStatus(final Long id, final Integer status) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
NotifyConfig notifyConfig = accessTemplate.getNotifyConfigAccess().one(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getId, id)
.eq(NotifyConfig::getNamespaceId, namespaceId)
);
Assert.notNull(notifyConfig, () -> new SnailJobServerException("通知配置不存在"));
// 同步配置到客户端
SyncConfigHandler.addSyncTask(notifyConfig.getGroupName(), namespaceId);
NotifyConfig config = new NotifyConfig();
config.setNotifyStatus(status);
config.setUpdateDt(LocalDateTime.now());
return 1 == accessTemplate.getNotifyConfigAccess()
int update = accessTemplate.getNotifyConfigAccess()
.update(config, new LambdaUpdateWrapper<NotifyConfig>()
.eq(NotifyConfig::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.eq(NotifyConfig::getNamespaceId, namespaceId)
.eq(NotifyConfig::getId, id)
);
return 1 == update;
}
@Override

View File

@ -14,6 +14,7 @@ import com.aizuda.snailjob.server.web.model.response.SceneConfigResponseVO;
import com.aizuda.snailjob.server.web.service.SceneConfigService;
import com.aizuda.snailjob.server.web.service.convert.SceneConfigConverter;
import com.aizuda.snailjob.server.web.service.convert.SceneConfigResponseVOConverter;
import com.aizuda.snailjob.server.web.service.handler.SyncConfigHandler;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
@ -22,7 +23,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@ -34,10 +35,9 @@ import java.util.Optional;
* @date : 2022-03-03 10:55
*/
@Service
@RequiredArgsConstructor
public class SceneConfigServiceImpl implements SceneConfigService {
@Autowired
private AccessTemplate accessTemplate;
private final AccessTemplate accessTemplate;
@Override
public PageResult<List<SceneConfigResponseVO>> getSceneConfigPageList(SceneConfigQueryVO queryVO) {
@ -106,6 +106,10 @@ public class SceneConfigServiceImpl implements SceneConfigService {
Assert.isTrue(1 == sceneConfigAccess.insert(retrySceneConfig),
() -> new SnailJobServerException("failed to insert scene. retrySceneConfig:[{}]",
JsonUtil.toJsonString(retrySceneConfig)));
// 同步配置到客户端
SyncConfigHandler.addSyncTask(requestVO.getGroupName(), namespaceId);
return Boolean.TRUE;
}
@ -143,6 +147,9 @@ public class SceneConfigServiceImpl implements SceneConfigService {
.eq(RetrySceneConfig::getSceneName, requestVO.getSceneName())),
() -> new SnailJobServerException("failed to update scene. retrySceneConfig:[{}]",
JsonUtil.toJsonString(retrySceneConfig)));
// 同步配置到客户端
SyncConfigHandler.addSyncTask(requestVO.getGroupName(), namespaceId);
return Boolean.TRUE;
}