refactor:2.5.0

1.优化场景重试数据到达阈值告警和优化场景重试失败数据到达阈值
2.修复查询重试数据无传入namespaceId错误
This commit is contained in:
byteblogs168 2023-12-02 20:13:30 +08:00
parent b253df6d7f
commit 4d2c1ebfe5
32 changed files with 454 additions and 329 deletions

View File

@ -16,22 +16,22 @@ import java.util.List;
*/
public interface TaskAccess<T> extends Access<T> {
List<T> list(String groupName, LambdaQueryWrapper<T> query);
List<T> list(String groupName, String namespaceId, LambdaQueryWrapper<T> query);
T one(String groupName, LambdaQueryWrapper<T> query);
T one(String groupName, String namespaceId, LambdaQueryWrapper<T> query);
int update(String groupName, T t, LambdaUpdateWrapper<T> query);
int update(String groupName, String namespaceId, T t, LambdaUpdateWrapper<T> query);
int updateById(String groupName, T t);
int updateById(String groupName, String namespaceId, T t);
int delete(String groupName, LambdaQueryWrapper<T> query);
int delete(String groupName, String namespaceId, LambdaQueryWrapper<T> query);
int insert(String groupName, T t);
int insert(String groupName, String namespaceId, T t);
int batchInsert(String groupName, List<T> list);
int batchInsert(String groupName, String namespaceId, List<T> list);
PageDTO<T> listPage(String groupName, PageDTO<T> iPage, LambdaQueryWrapper<T> query);
PageDTO<T> listPage(String groupName, String namespaceId, PageDTO<T> iPage, LambdaQueryWrapper<T> query);
long count(String groupName, LambdaQueryWrapper<T> query);
long count(String groupName, String namespaceId, LambdaQueryWrapper<T> query);
}

View File

@ -33,60 +33,61 @@ public abstract class AbstractTaskAccess<T> implements TaskAccess<T> {
* 设置分区
*
* @param groupName 组名称
* @param namespaceId 命名空间id
*/
public void setPartition(String groupName) {
RequestDataHelper.setPartition(groupName);
public void setPartition(String groupName, String namespaceId) {
RequestDataHelper.setPartition(groupName, namespaceId);
}
@Override
public List<T> list(String groupName, LambdaQueryWrapper<T> query) {
setPartition(groupName);
public List<T> list(String groupName, String namespaceId, LambdaQueryWrapper<T> query) {
setPartition(groupName, namespaceId);
return doList(query);
}
@Override
public int update(String groupName, T t, LambdaUpdateWrapper<T> query) {
setPartition(groupName);
public int update(String groupName, String namespaceId, T t, LambdaUpdateWrapper<T> query) {
setPartition(groupName, namespaceId);
return doUpdate(t, query);
}
protected abstract int doUpdate(T t, LambdaUpdateWrapper<T> query);
@Override
public int updateById(String groupName, T t) {
setPartition(groupName);
public int updateById(String groupName, String namespaceId, T t) {
setPartition(groupName, namespaceId);
return doUpdateById(t);
}
@Override
public int delete(String groupName, LambdaQueryWrapper<T> query) {
setPartition(groupName);
public int delete(String groupName, String namespaceId, LambdaQueryWrapper<T> query) {
setPartition(groupName, namespaceId);
return doDelete(query);
}
@Override
public int insert(String groupName, T t) {
setPartition(groupName);
public int insert(String groupName, String namespaceId, T t) {
setPartition(groupName, namespaceId);
return doInsert(t);
}
@Override
public int batchInsert(String groupName, List<T> list) {
setPartition(groupName);
public int batchInsert(String groupName, String namespaceId, List<T> list) {
setPartition(groupName, namespaceId);
return doBatchInsert(list);
}
protected abstract int doBatchInsert(List<T> list);
@Override
public PageDTO<T> listPage(final String groupName, final PageDTO<T> iPage, final LambdaQueryWrapper<T> query) {
setPartition(groupName);
public PageDTO<T> listPage(String groupName, String namespaceId, final PageDTO<T> iPage, final LambdaQueryWrapper<T> query) {
setPartition(groupName, namespaceId);
return doListPage(iPage, query);
}
@Override
public T one(String groupName, LambdaQueryWrapper<T> query) {
setPartition(groupName);
public T one(String groupName, String namespaceId, LambdaQueryWrapper<T> query) {
setPartition(groupName, namespaceId);
return doOne(query);
}
@ -95,8 +96,8 @@ public abstract class AbstractTaskAccess<T> implements TaskAccess<T> {
protected abstract PageDTO<T> doListPage(final PageDTO<T> iPage, final LambdaQueryWrapper<T> query);
@Override
public long count(final String groupName, final LambdaQueryWrapper<T> query) {
setPartition(groupName);
public long count(String groupName, String namespaceId, final LambdaQueryWrapper<T> query) {
setPartition(groupName, namespaceId);
return doCount(query);
}

View File

@ -55,16 +55,19 @@ public class RequestDataHelper {
*
* @param groupName 组名称
*/
public static void setPartition(String groupName) {
public static void setPartition(String groupName, String namespaceId) {
if (StrUtil.isBlank(groupName)) {
throw new EasyRetryDatasourceException("组名称不能为空");
if (StrUtil.isBlank(groupName) && StrUtil.isNotBlank(namespaceId)) {
throw new EasyRetryDatasourceException("组名称或者命名空间ID不能为空");
}
GroupConfigMapper groupConfigMapper = SpringContext.getBeanByType(GroupConfigMapper.class);
GroupConfig groupConfig = groupConfigMapper.selectOne(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, groupName));
GroupConfig groupConfig = groupConfigMapper.selectOne(
new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupPartition)
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupName, groupName));
if (Objects.isNull(groupConfig)) {
throw new EasyRetryDatasourceException("groupName:[{}]不存在", groupName);
}
@ -105,7 +108,6 @@ public class RequestDataHelper {
}
public static void remove() {
REQUEST_DATA.remove();
}

View File

@ -22,10 +22,10 @@
insert into server_node (namespace_id, group_name, host_id, host_ip, host_port,
expire_at, node_type, ext_attrs, context_path, create_dt)
values
<foreach collection="records" item="item" index="index" separator="," open="(" close=")">
#{item.namespaceId,jdbcType=VARCHAR}, #{item.groupName,jdbcType=VARCHAR}, #{item.hostId,jdbcType=VARCHAR}, #{item.hostIp,jdbcType=VARCHAR},
<foreach collection="records" item="item" index="index" separator=",">
(#{item.namespaceId,jdbcType=VARCHAR}, #{item.groupName,jdbcType=VARCHAR}, #{item.hostId,jdbcType=VARCHAR}, #{item.hostIp,jdbcType=VARCHAR},
#{item.hostPort,jdbcType=INTEGER}, #{item.expireAt,jdbcType=TIMESTAMP}, #{item.nodeType,jdbcType=TINYINT},
#{item.extAttrs,jdbcType=VARCHAR}, #{item.contextPath,jdbcType=VARCHAR}, #{item.createDt,jdbcType=TIMESTAMP}
#{item.extAttrs,jdbcType=VARCHAR}, #{item.contextPath,jdbcType=VARCHAR}, #{item.createDt,jdbcType=TIMESTAMP})
</foreach>
ON DUPLICATE KEY UPDATE
expire_at = values(`expire_at`)

View File

@ -0,0 +1,39 @@
package com.aizuda.easy.retry.server.retry.task.dto;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author xiaowoniu
* @date 2023-10-25 22:23:24
* @since 2.5.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class NotifyConfigPartitionTask extends PartitionTask {
private String namespaceId;
private String groupName;
private String sceneName;
private Integer notifyStatus;
private Integer notifyType;
private String notifyAttribute;
private Integer notifyThreshold;
private Integer notifyScene;
private Integer rateLimiterStatus;
private Integer rateLimiterThreshold;
}

View File

@ -61,34 +61,33 @@ public abstract class AbstractGenerator implements TaskGenerator {
//客户端上报任务根据幂等id去重
List<TaskContext.TaskInfo> taskInfos = taskContext.getTaskInfos().stream().collect(Collectors.collectingAndThen(
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(TaskContext.TaskInfo::getIdempotentId))),
ArrayList::new));
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(TaskContext.TaskInfo::getIdempotentId))),
ArrayList::new));
Set<String> idempotentIdSet = taskInfos.stream().map(TaskContext.TaskInfo::getIdempotentId)
.collect(Collectors.toSet());
.collect(Collectors.toSet());
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
// 获取相关的任务用户幂等校验
RequestDataHelper.setPartition(taskContext.getGroupName());
List<RetryTask> retryTasks = retryTaskAccess.list(taskContext.getGroupName(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, taskContext.getNamespaceId())
.eq(RetryTask::getGroupName, taskContext.getGroupName())
.eq(RetryTask::getSceneName, taskContext.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
.in(RetryTask::getIdempotentId, idempotentIdSet));
List<RetryTask> retryTasks = retryTaskAccess.list(taskContext.getGroupName(), taskContext.getNamespaceId(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, taskContext.getNamespaceId())
.eq(RetryTask::getGroupName, taskContext.getGroupName())
.eq(RetryTask::getSceneName, taskContext.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
.in(RetryTask::getIdempotentId, idempotentIdSet));
Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap = retryTasks.stream()
.collect(Collectors.groupingBy(RetryTask::getIdempotentId));
.collect(Collectors.groupingBy(RetryTask::getIdempotentId));
List<RetryTask> waitInsertTasks = new ArrayList<>();
List<RetryTaskLog> waitInsertTaskLogs = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
for (TaskContext.TaskInfo taskInfo : taskInfos) {
Pair<List<RetryTask>, List<RetryTaskLog>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo,
sceneConfig);
sceneConfig);
waitInsertTasks.addAll(pair.getKey());
waitInsertTaskLogs.addAll(pair.getValue());
}
@ -98,10 +97,11 @@ public abstract class AbstractGenerator implements TaskGenerator {
}
Assert.isTrue(
waitInsertTasks.size() == retryTaskAccess.batchInsert(taskContext.getGroupName(), waitInsertTasks),
() -> new EasyRetryServerException("failed to report data"));
waitInsertTasks.size() == retryTaskAccess.batchInsert(taskContext.getGroupName(),
taskContext.getNamespaceId(), waitInsertTasks),
() -> new EasyRetryServerException("failed to report data"));
Assert.isTrue(waitInsertTaskLogs.size() == retryTaskLogMapper.batchInsert(waitInsertTaskLogs),
() -> new EasyRetryServerException("新增重试日志失败"));
() -> new EasyRetryServerException("新增重试日志失败"));
}
/**
@ -111,17 +111,17 @@ public abstract class AbstractGenerator implements TaskGenerator {
* @param sceneConfig
*/
private Pair<List<RetryTask>, List<RetryTaskLog>> doConvertTask(Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap,
TaskContext taskContext, LocalDateTime now,
TaskInfo taskInfo, SceneConfig sceneConfig) {
TaskContext taskContext, LocalDateTime now,
TaskInfo taskInfo, SceneConfig sceneConfig) {
List<RetryTask> waitInsertTasks = new ArrayList<>();
List<RetryTaskLog> waitInsertTaskLogs = new ArrayList<>();
// 判断是否存在与幂等ID相同的任务
List<RetryTask> list = retryTaskMap.getOrDefault(taskInfo.getIdempotentId(), new ArrayList<>()).stream()
.filter(retryTask ->
taskContext.getGroupName().equals(retryTask.getGroupName())
&& taskContext.getNamespaceId().equals(retryTask.getNamespaceId())
&& taskContext.getSceneName().equals(retryTask.getSceneName())).collect(Collectors.toList());
.filter(retryTask ->
taskContext.getGroupName().equals(retryTask.getGroupName())
&& taskContext.getNamespaceId().equals(retryTask.getNamespaceId())
&& taskContext.getSceneName().equals(retryTask.getSceneName())).collect(Collectors.toList());
// 说明存在相同的任务
if (!CollectionUtils.isEmpty(list)) {
LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(taskInfo));
@ -164,21 +164,21 @@ public abstract class AbstractGenerator implements TaskGenerator {
private SceneConfig checkAndInitScene(TaskContext taskContext) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(taskContext.getGroupName(), taskContext.getSceneName(),
taskContext.getNamespaceId());
.getSceneConfigByGroupNameAndSceneName(taskContext.getGroupName(), taskContext.getSceneName(),
taskContext.getNamespaceId());
if (Objects.isNull(sceneConfig)) {
GroupConfig groupConfig = accessTemplate.getGroupConfigAccess()
.getGroupConfigByGroupName(taskContext.getGroupName(), taskContext.getNamespaceId());
.getGroupConfigByGroupName(taskContext.getGroupName(), taskContext.getNamespaceId());
if (Objects.isNull(groupConfig)) {
throw new EasyRetryServerException(
"failed to report data, no group configuration found. groupName:[{}]", taskContext.getGroupName());
"failed to report data, no group configuration found. groupName:[{}]", taskContext.getGroupName());
}
if (groupConfig.getInitScene().equals(StatusEnum.NO.getStatus())) {
throw new EasyRetryServerException(
"failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]",
taskContext.getGroupName(), taskContext.getSceneName());
"failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]",
taskContext.getGroupName(), taskContext.getSceneName());
} else {
// 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景
sceneConfig = initScene(taskContext.getGroupName(), taskContext.getSceneName(), taskContext.getNamespaceId());
@ -206,7 +206,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig),
() -> new EasyRetryServerException("init scene error"));
() -> new EasyRetryServerException("init scene error"));
return sceneConfig;
}

View File

@ -17,8 +17,9 @@ public interface RetryService {
* 迁移到达最大重试次数到死信队列
* 删除重试完成的数据
*
* @param groupId 组id
* @param groupName 组id
* @param namespaceId 命名空间id
* @return true- 处理成功 false- 处理失败
*/
Boolean moveDeadLetterAndDelFinish(String groupId);
Boolean moveDeadLetterAndDelFinish(String groupName, String namespaceId);
}

View File

@ -1,4 +1,5 @@
package com.aizuda.easy.retry.server.retry.task.service.impl;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
@ -20,6 +21,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@ -42,16 +44,17 @@ public class RetryServiceImpl implements RetryService {
@Transactional
@Override
public Boolean moveDeadLetterAndDelFinish(String groupName) {
public Boolean moveDeadLetterAndDelFinish(String groupName, String namespaceId) {
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RequestDataHelper.setPartition(groupName);
List<RetryTask> callbackRetryTasks = retryTaskAccess.listPage(groupName, new PageDTO<>(0, 100),
new LambdaQueryWrapper<RetryTask>()
.in(RetryTask::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(),
RetryStatusEnum.FINISH.getStatus())
.eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType())
.eq(RetryTask::getGroupName, groupName)).getRecords();
RequestDataHelper.setPartition(groupName, namespaceId);
List<RetryTask> callbackRetryTasks = retryTaskAccess.listPage(groupName, namespaceId, new PageDTO<>(0, 100),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.in(RetryTask::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(),
RetryStatusEnum.FINISH.getStatus())
.eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType())
.eq(RetryTask::getGroupName, groupName)).getRecords();
if (CollectionUtils.isEmpty(callbackRetryTasks)) {
return Boolean.TRUE;
@ -62,44 +65,46 @@ public class RetryServiceImpl implements RetryService {
return callbackTaskUniqueId.substring(callbackTaskUniqueId.lastIndexOf(StrUtil.UNDERLINE) + 1);
}).collect(Collectors.toSet());
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess().list(groupName, new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
.in(RetryTask::getUniqueId, uniqueIdSet)
);
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess()
.list(groupName, namespaceId, new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
.in(RetryTask::getUniqueId, uniqueIdSet)
);
// 迁移重试失败的数据
List<RetryTask> waitMoveDeadLetters = new ArrayList<>();
List<RetryTask> maxCountRetryTaskList = retryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) {
waitMoveDeadLetters.addAll(maxCountRetryTaskList);
}
List<RetryTask> maxCountCallbackRetryTaskList = callbackRetryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.MAX_COUNT.getStatus())).collect(
Collectors.toList());
if (!CollectionUtils.isEmpty(maxCountRetryTaskList)) {
waitMoveDeadLetters.addAll(maxCountCallbackRetryTaskList);
}
moveDeadLetters(groupName, waitMoveDeadLetters);
moveDeadLetters(groupName, namespaceId, waitMoveDeadLetters);
// 删除重试完成的数据
Set<Long> waitDelRetryFinishSet = new HashSet<>();
Set<Long> finishRetryIdList = retryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(RetryTask::getId)
.collect(Collectors.toSet());
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(RetryTask::getId)
.collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(finishRetryIdList)) {
waitDelRetryFinishSet.addAll(finishRetryIdList);
}
Set<Long> finishCallbackRetryIdList = callbackRetryTasks.stream()
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(RetryTask::getId)
.collect(Collectors.toSet());
.filter(retryTask -> retryTask.getRetryStatus().equals(RetryStatusEnum.FINISH.getStatus()))
.map(RetryTask::getId)
.collect(Collectors.toSet());
// 迁移重试失败的数据
if (!CollectionUtils.isEmpty(finishCallbackRetryIdList)) {
@ -111,9 +116,11 @@ public class RetryServiceImpl implements RetryService {
}
Assert.isTrue(waitDelRetryFinishSet.size() == accessTemplate.getRetryTaskAccess()
.delete(groupName, new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getGroupName, groupName).in(RetryTask::getId, waitDelRetryFinishSet)),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
.delete(groupName, namespaceId, new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, groupName)
.in(RetryTask::getId, waitDelRetryFinishSet)),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
return Boolean.TRUE;
}
@ -123,7 +130,7 @@ public class RetryServiceImpl implements RetryService {
* @param groupName 组id
* @param retryTasks 待迁移数据
*/
private void moveDeadLetters(String groupName, List<RetryTask> retryTasks) {
private void moveDeadLetters(String groupName, String namespaceId, List<RetryTask> retryTasks) {
if (CollectionUtils.isEmpty(retryTasks)) {
return;
}
@ -134,14 +141,17 @@ public class RetryServiceImpl implements RetryService {
retryDeadLetter.setCreateDt(now);
}
Assert.isTrue(retryDeadLetters.size() == accessTemplate
.getRetryDeadLetterAccess().batchInsert(groupName, retryDeadLetters),
() -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
.getRetryDeadLetterAccess().batchInsert(groupName, namespaceId, retryDeadLetters),
() -> new EasyRetryServerException("插入死信队列失败 [{}]", JsonUtil.toJsonString(retryDeadLetters)));
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
List<Long> ids = retryTasks.stream().map(RetryTask::getId).collect(Collectors.toList());
Assert.isTrue(retryTasks.size() == retryTaskAccess.delete(groupName, new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getGroupName, groupName).in(RetryTask::getId, ids)),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
Assert.isTrue(retryTasks.size() == retryTaskAccess.delete(groupName, namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, groupName)
.in(RetryTask::getId, ids)),
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
context.publishEvent(new RetryTaskFailDeadLetterAlarmEvent(retryDeadLetters));
}

View File

@ -1,9 +1,11 @@
package com.aizuda.easy.retry.server.retry.task.support;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
@ -41,4 +43,6 @@ public interface RetryTaskConverter {
List<RetryPartitionTask> toRetryTaskLogPartitionTasks(List<RetryTaskLog> retryTaskLogList);
RetryTimerContext toRetryTimerContext(RetryPartitionTask retryPartitionTask);
List<NotifyConfigPartitionTask> toNotifyConfigPartitionTask(List<NotifyConfig> notifyConfigs);
}

View File

@ -116,7 +116,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
String retryTaskUniqueId = callbackRetryTaskHandler.getRetryTaskUniqueId(callbackTask.getUniqueId());
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RetryTask retryTask = retryTaskAccess.one(callbackTask.getGroupName(),
RetryTask retryTask = retryTaskAccess.one(callbackTask.getGroupName(), callbackTask.getNamespaceId(),
new LambdaQueryWrapper<RetryTask>()
.select(RetryTask::getRetryStatus)
.eq(RetryTask::getNamespaceId, serverNode.getNamespaceId())

View File

@ -87,7 +87,7 @@ public class FailureActor extends AbstractActor {
retryTask.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.updateById(retryTask.getGroupName(), retryTask),
.updateById(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask),
() -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));

View File

@ -38,7 +38,7 @@ import java.time.LocalDateTime;
@Component(ActorGenerator.FINISH_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class FinishActor extends AbstractActor {
public class FinishActor extends AbstractActor {
@Autowired
private AccessTemplate accessTemplate;
@ -52,28 +52,28 @@ public class FinishActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTask.class, retryTask->{
return receiveBuilder().match(RetryTask.class, retryTask -> {
LogUtils.info(log, "FinishActor params:[{}]", retryTask);
retryTask.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
try {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
retryTask.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.updateById(retryTask.getGroupName(), retryTask),
() -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
retryTask.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.updateById(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask),
() -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
}
});
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
}
});
}catch (Exception e) {
} catch (Exception e) {
LogUtils.error(log, "更新重试任务失败", e);
} finally {
// 清除幂等标识位

View File

@ -50,11 +50,11 @@ public class NoRetryActor extends AbstractActor {
try {
retryTask.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.updateById(retryTask.getGroupName(),retryTask), () ->
new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
}catch (Exception e) {
LogUtils.error(log,"更新重试任务失败", e);
.updateById(retryTask.getGroupName(), retryTask.getNamespaceId() , retryTask), () ->
new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
} catch (Exception e) {
LogUtils.error(log, "更新重试任务失败", e);
} finally {
// 清除幂等标识位
idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId());

View File

@ -183,7 +183,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
public List<RetryPartitionTask> listAvailableTasks(String groupName, String namespaceId, Long lastId, Integer taskType) {
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess()
.listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
.listPage(groupName, namespaceId, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
new LambdaQueryWrapper<RetryTask>()
.select(RetryTask::getId, RetryTask::getNextTriggerAt, RetryTask::getUniqueId,
RetryTask::getGroupName, RetryTask::getRetryCount, RetryTask::getSceneName,

View File

@ -81,14 +81,13 @@ public class CallbackRetryTaskHandler {
try {
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),
.insert(callbackRetryTask.getGroupName(), callbackRetryTask.getNamespaceId(), callbackRetryTask),
() -> new EasyRetryServerException("failed to report data"));
} catch (DuplicateKeyException e) {
log.warn("回调数据重复新增. [{}]", JsonUtil.toJsonString(retryTask));
return;
}
// 初始化回调日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(callbackRetryTask);
// 记录重试日志

View File

@ -4,22 +4,33 @@ import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
@ -36,11 +47,11 @@ import java.util.List;
@Slf4j
public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle {
private static String retryErrorMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试失败数据监控</font> \n" +
"> 组名称:{} \n" +
"> 场景名称:{} \n" +
"> 时间窗口:{} ~ {} \n" +
"> **共计:{}** \n";
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试失败数据监控</font> \n" +
"> 组名称:{} \n" +
"> 场景名称:{} \n" +
"> 时间窗口:{} ~ {} \n" +
"> **共计:{}** \n";
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@ -60,35 +71,51 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
@Override
protected void doExecute() {
LogUtils.info(log, "retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (SceneConfig sceneConfig : accessTemplate.getSceneConfigAccess().getAllConfigSceneList()) {
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(sceneConfig.getGroupName(),sceneConfig.getSceneName(), NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene());
if (CollectionUtils.isEmpty(notifyConfigs)) {
continue;
}
// x分钟内x组x场景进入死信队列的数据量
LocalDateTime now = LocalDateTime.now();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
long count = retryDeadLetterAccess.count(sceneConfig.getGroupName(), new LambdaQueryWrapper<RetryDeadLetter>().
PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0);
}
private void doHandler(List<? extends PartitionTask> partitionTasks) {
for (PartitionTask partitionTask : partitionTasks) {
doSendAlarm((NotifyConfigPartitionTask) partitionTask);
}
}
private void doSendAlarm(NotifyConfigPartitionTask notifyConfigPartitionTask) {
// x分钟内x组x场景进入死信队列的数据量
LocalDateTime now = LocalDateTime.now();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
long count = retryDeadLetterAccess.count(notifyConfigPartitionTask.getGroupName(), notifyConfigPartitionTask.getNamespaceId(),
new LambdaQueryWrapper<RetryDeadLetter>().
between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now)
.eq(RetryDeadLetter::getGroupName,sceneConfig.getGroupName())
.eq(RetryDeadLetter::getSceneName,sceneConfig.getSceneName()));
for (NotifyConfig notifyConfig : notifyConfigs) {
if (count > notifyConfig.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build().text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
sceneConfig.getGroupName(),
sceneConfig.getSceneName(),
DateUtils.format(now.minusMinutes(30),
DateUtils.NORM_DATETIME_PATTERN),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), count)
.title("{}环境 场景重试失败数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}
.eq(RetryDeadLetter::getGroupName, notifyConfigPartitionTask.getGroupName())
.eq(RetryDeadLetter::getSceneName, notifyConfigPartitionTask.getSceneName()));
if (count > notifyConfigPartitionTask.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
notifyConfigPartitionTask.getGroupName(),
notifyConfigPartitionTask.getSceneName(),
DateUtils.format(now.minusMinutes(30),
DateUtils.NORM_DATETIME_PATTERN),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), count)
.title("{}环境 场景重试失败数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(notifyConfigPartitionTask.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfigPartitionTask.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
private List<NotifyConfigPartitionTask> getNotifyConfigPartitions(Long startId) {
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess()
.listPage(new PageDTO<>(startId, 1000), new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene()))
.getRecords();
return RetryTaskConverter.INSTANCE.toNotifyConfigPartitionTask(notifyConfigs);
}
@Override

View File

@ -5,17 +5,23 @@ import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -38,11 +44,11 @@ import java.util.List;
@RequiredArgsConstructor
public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle {
private static String retryTaskMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试数据监控</font> \n" +
"> 组名称:{} \n" +
"> 场景名称:{} \n" +
"> 告警时间:{} \n" +
"> **共计:{}** \n";
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试数据监控</font> \n" +
"> 组名称:{} \n" +
"> 场景名称:{} \n" +
"> 告警时间:{} \n" +
"> **共计:{}** \n";
private final EasyRetryAlarmFactory easyRetryAlarmFactory;
private final AccessTemplate accessTemplate;
@ -60,33 +66,49 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
@Override
protected void doExecute() {
LogUtils.info(log, "retryTaskMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (SceneConfig sceneConfig : accessTemplate.getSceneConfigAccess().getAllConfigSceneList()) {
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(sceneConfig.getGroupName(),sceneConfig.getSceneName(), NotifySceneEnum.MAX_RETRY.getNotifyScene());
if (CollectionUtils.isEmpty(notifyConfigs)) {
continue;
}
long count = accessTemplate.getRetryTaskAccess().count(sceneConfig.getGroupName(), new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getGroupName, sceneConfig.getGroupName())
.eq(RetryTask::getSceneName,sceneConfig.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
for (NotifyConfig notifyConfig : notifyConfigs) {
if (count > notifyConfig.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build()
.text(retryTaskMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
sceneConfig.getGroupName(),
sceneConfig.getSceneName(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN),
count)
.title("{}环境 场景重试数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}
PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0);
}
private void doHandler(List<? extends PartitionTask> partitionTasks) {
for (PartitionTask partitionTask : partitionTasks) {
doSendAlarm((NotifyConfigPartitionTask) partitionTask);
}
}
private void doSendAlarm(NotifyConfigPartitionTask partitionTask) {
long count = accessTemplate.getRetryTaskAccess()
.count(partitionTask.getGroupName(), partitionTask.getNamespaceId(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, partitionTask.getNamespaceId())
.eq(RetryTask::getGroupName, partitionTask.getGroupName())
.eq(RetryTask::getSceneName, partitionTask.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
if (count > partitionTask.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build()
.text(retryTaskMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
partitionTask.getGroupName(),
partitionTask.getSceneName(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN),
count)
.title("{}环境 场景重试数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(partitionTask.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(partitionTask.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
private List<NotifyConfigPartitionTask> getNotifyConfigPartitions(Long startId) {
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess()
.listPage(new PageDTO<>(startId, 1000), new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, NotifySceneEnum.MAX_RETRY.getNotifyScene()))
.getRecords();
return RetryTaskConverter.INSTANCE.toNotifyConfigPartitionTask(notifyConfigs);
}
@Override
public String lockName() {

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.retry.task.support.schedule;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
@ -47,14 +48,18 @@ public class RetryTaskSchedule extends AbstractSchedule implements Lifecycle {
@Override
protected void doExecute() {
try {
Set<String> groupNameList = accessTemplate.getGroupConfigAccess()
Set<Pair<String/*groupName*/, String/*namespaceId*/>> groupNameList = accessTemplate.getGroupConfigAccess()
.list(new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName)
.select(GroupConfig::getGroupName, GroupConfig::getNamespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()))
.stream().map(GroupConfig::getGroupName).collect(Collectors.toSet());
.stream().map(groupConfig -> {
for (String groupName : groupNameList) {
retryService.moveDeadLetterAndDelFinish(groupName);
return Pair.of(groupConfig.getGroupName(), groupConfig.getNamespaceId());
}).collect(Collectors.toSet());
for (Pair<String/*groupName*/, String/*namespaceId*/> pair : groupNameList) {
retryService.moveDeadLetterAndDelFinish(pair.getKey(), pair.getValue());
}
} catch (Exception e) {

View File

@ -36,11 +36,12 @@ public class CallbackTimerTask extends AbstractTimerTask {
log.info("回调任务执行 {}", LocalDateTime.now());
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, context.getNamespaceId())
.eq(RetryTask::getGroupName, context.getGroupName())
.eq(RetryTask::getUniqueId, context.getUniqueId())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), context.getNamespaceId(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, context.getNamespaceId())
.eq(RetryTask::getGroupName, context.getGroupName())
.eq(RetryTask::getUniqueId, context.getUniqueId())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
if (Objects.isNull(retryTask)) {
return;
}

View File

@ -31,10 +31,11 @@ public class RetryTimerTask extends AbstractTimerTask {
}
@Override
public void doRun(final Timeout timeout){
public void doRun(final Timeout timeout) {
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper<RetryTask>()
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), context.getNamespaceId(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, context.getNamespaceId())
.eq(RetryTask::getGroupName, context.getGroupName())
.eq(RetryTask::getUniqueId, context.getUniqueId())

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -13,6 +13,6 @@ import lombok.EqualsAndHashCode;
@Data
public class NamespaceQueryVO extends BaseQueryVO {
private String name;
private String keyword;
}

View File

@ -69,8 +69,9 @@ public class GroupConfigServiceImpl implements GroupConfigService {
ConfigAccess<GroupConfig> groupConfigAccess = accessTemplate.getGroupConfigAccess();
Assert.isTrue(groupConfigAccess.count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, groupConfigRequestVO.getGroupName())) == 0,
() -> new EasyRetryServerException("GroupName已经存在 {}", groupConfigRequestVO.getGroupName()));
.eq(GroupConfig::getNamespaceId, systemUser.getNamespaceId())
.eq(GroupConfig::getGroupName, groupConfigRequestVO.getGroupName())) == 0,
() -> new EasyRetryServerException("GroupName已经存在 {}", groupConfigRequestVO.getGroupName()));
// 保存组配置
doSaveGroupConfig(systemUser, groupConfigRequestVO);
@ -94,8 +95,8 @@ public class GroupConfigServiceImpl implements GroupConfigService {
sequenceAlloc.setStep(systemProperties.getStep());
sequenceAlloc.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == sequenceAllocMapper.insert(sequenceAlloc),
() -> new EasyRetryServerException("failed to save sequence generation rule configuration [{}].",
groupConfigRequestVO.getGroupName()));
() -> new EasyRetryServerException("failed to save sequence generation rule configuration [{}].",
groupConfigRequestVO.getGroupName()));
}
@Override
@ -107,7 +108,9 @@ public class GroupConfigServiceImpl implements GroupConfigService {
ConfigAccess<GroupConfig> groupConfigAccess = accessTemplate.getGroupConfigAccess();
long count = groupConfigAccess.count(
new LambdaQueryWrapper<GroupConfig>().eq(GroupConfig::getGroupName, groupName));
new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupName, groupName));
if (count <= 0) {
return false;
}
@ -117,20 +120,19 @@ public class GroupConfigServiceImpl implements GroupConfigService {
// 使用@TableField(value = "version", update= "%s+1") 进行更新version, 这里必须初始化一个值
groupConfig.setVersion(1);
Assert.isTrue(systemProperties.getTotalPartition() > groupConfigRequestVO.getGroupPartition(),
() -> new EasyRetryServerException("分区超过最大分区. [{}]", systemProperties.getTotalPartition() - 1));
() -> new EasyRetryServerException("分区超过最大分区. [{}]", systemProperties.getTotalPartition() - 1));
Assert.isTrue(groupConfigRequestVO.getGroupPartition() >= 0,
() -> new EasyRetryServerException("分区不能是负数."));
() -> new EasyRetryServerException("分区不能是负数."));
// 校验retry_task_x和retry_dead_letter_x是否存在
checkGroupPartition(groupConfig);
checkGroupPartition(groupConfig, namespaceId);
Assert.isTrue(1 == groupConfigAccess.update(groupConfig,
new LambdaUpdateWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId,namespaceId)
.eq(GroupConfig::getGroupName, groupName)),
() -> new EasyRetryServerException("exception occurred while adding group. groupConfigVO[{}]",
groupConfigRequestVO));
new LambdaUpdateWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupName, groupName)),
() -> new EasyRetryServerException("exception occurred while adding group. groupConfigVO[{}]",
groupConfigRequestVO));
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
// 同步版本 版本为0代表需要同步到客户端
@ -150,9 +152,9 @@ public class GroupConfigServiceImpl implements GroupConfigService {
groupConfig.setGroupStatus(status);
ConfigAccess<GroupConfig> groupConfigAccess = accessTemplate.getGroupConfigAccess();
return groupConfigAccess.update(groupConfig,
new LambdaUpdateWrapper<GroupConfig>()
new LambdaUpdateWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, groupName)) == 1;
.eq(GroupConfig::getGroupName, groupName)) == 1;
}
@Override
@ -160,7 +162,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
LambdaQueryWrapper<GroupConfig> groupConfigLambdaQueryWrapper = new LambdaQueryWrapper<>();
groupConfigLambdaQueryWrapper.eq(GroupConfig::getNamespaceId,
UserSessionUtils.currentUserSession().getNamespaceId());
UserSessionUtils.currentUserSession().getNamespaceId());
if (StrUtil.isNotBlank(queryVO.getGroupName())) {
groupConfigLambdaQueryWrapper.like(GroupConfig::getGroupName, queryVO.getGroupName() + "%");
}
@ -169,24 +171,24 @@ public class GroupConfigServiceImpl implements GroupConfigService {
groupConfigLambdaQueryWrapper.orderByDesc(GroupConfig::getId);
PageDTO<GroupConfig> groupConfigPageDTO = groupConfigAccess.listPage(
new PageDTO<>(queryVO.getPage(), queryVO.getSize()), groupConfigLambdaQueryWrapper);
new PageDTO<>(queryVO.getPage(), queryVO.getSize()), groupConfigLambdaQueryWrapper);
List<GroupConfig> records = groupConfigPageDTO.getRecords();
if (CollectionUtils.isEmpty(records)) {
return new PageResult<>(groupConfigPageDTO.getCurrent(), groupConfigPageDTO.getSize(),
groupConfigPageDTO.getTotal());
groupConfigPageDTO.getTotal());
}
PageResult<List<GroupConfigResponseVO>> pageResult = new PageResult<>(groupConfigPageDTO.getCurrent(),
groupConfigPageDTO.getSize(), groupConfigPageDTO.getTotal());
groupConfigPageDTO.getSize(), groupConfigPageDTO.getTotal());
List<GroupConfigResponseVO> responseVOList = GroupConfigResponseVOConverter.INSTANCE.toGroupConfigResponseVO(
records);
records);
for (GroupConfigResponseVO groupConfigResponseVO : responseVOList) {
Optional.ofNullable(IdGeneratorMode.modeOf(groupConfigResponseVO.getIdGeneratorMode()))
.ifPresent(idGeneratorMode -> {
groupConfigResponseVO.setIdGeneratorModeName(idGeneratorMode.getDesc());
});
.ifPresent(idGeneratorMode -> {
groupConfigResponseVO.setIdGeneratorModeName(idGeneratorMode.getDesc());
});
}
pageResult.setData(responseVOList);
@ -203,50 +205,50 @@ public class GroupConfigServiceImpl implements GroupConfigService {
groupConfig.setDescription(Optional.ofNullable(groupConfigRequestVO.getDescription()).orElse(StrUtil.EMPTY));
if (Objects.isNull(groupConfigRequestVO.getGroupPartition())) {
groupConfig.setGroupPartition(
HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getTotalPartition());
HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getTotalPartition());
} else {
Assert.isTrue(systemProperties.getTotalPartition() > groupConfigRequestVO.getGroupPartition(),
() -> new EasyRetryServerException("分区超过最大分区. [{}]", systemProperties.getTotalPartition() - 1));
() -> new EasyRetryServerException("分区超过最大分区. [{}]", systemProperties.getTotalPartition() - 1));
Assert.isTrue(groupConfigRequestVO.getGroupPartition() >= 0,
() -> new EasyRetryServerException("分区不能是负数."));
() -> new EasyRetryServerException("分区不能是负数."));
}
groupConfig.setBucketIndex(
HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getBucketTotal());
HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getBucketTotal());
ConfigAccess<GroupConfig> groupConfigAccess = accessTemplate.getGroupConfigAccess();
Assert.isTrue(1 == groupConfigAccess.insert(groupConfig),
() -> new EasyRetryServerException("新增组异常异常 groupConfigVO[{}]", groupConfigRequestVO));
() -> new EasyRetryServerException("新增组异常异常 groupConfigVO[{}]", groupConfigRequestVO));
// 校验retry_task_x和retry_dead_letter_x是否存在
checkGroupPartition(groupConfig);
checkGroupPartition(groupConfig, systemUser.getNamespaceId());
}
/**
* 校验retry_task_x和retry_dead_letter_x是否存在
*/
private void checkGroupPartition(GroupConfig groupConfig) {
private void checkGroupPartition(GroupConfig groupConfig, String namespaceId) {
try {
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
retryTaskAccess.one(groupConfig.getGroupName(),
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getId, 1));
retryTaskAccess.count(groupConfig.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getId, 1));
} catch (BadSqlGrammarException e) {
Optional.ofNullable(e.getMessage()).ifPresent(s -> {
if (s.contains("retry_task_" + groupConfig.getGroupPartition()) && s.contains("doesn't exist")) {
throw new EasyRetryServerException("分区:[{}] '未配置表retry_task_{}', 请联系管理员进行配置",
groupConfig.getGroupPartition(), groupConfig.getGroupPartition());
groupConfig.getGroupPartition(), groupConfig.getGroupPartition());
}
});
}
try {
TaskAccess<RetryDeadLetter> retryTaskAccess = accessTemplate.getRetryDeadLetterAccess();
retryTaskAccess.one(groupConfig.getGroupName(),
new LambdaQueryWrapper<RetryDeadLetter>().eq(RetryDeadLetter::getId, 1));
retryTaskAccess.one(groupConfig.getGroupName(), groupConfig.getNamespaceId(),
new LambdaQueryWrapper<RetryDeadLetter>().eq(RetryDeadLetter::getId, 1));
} catch (BadSqlGrammarException e) {
Optional.ofNullable(e.getMessage()).ifPresent(s -> {
if (s.contains("retry_dead_letter_" + groupConfig.getGroupPartition()) && s.contains("doesn't exist")) {
throw new EasyRetryServerException("分区:[{}] '未配置表retry_dead_letter_{}', 请联系管理员进行配置",
groupConfig.getGroupPartition(), groupConfig.getGroupPartition());
groupConfig.getGroupPartition(), groupConfig.getGroupPartition());
}
});
}
@ -257,10 +259,12 @@ public class GroupConfigServiceImpl implements GroupConfigService {
ConfigAccess<GroupConfig> groupConfigAccess = accessTemplate.getGroupConfigAccess();
GroupConfig groupConfig = groupConfigAccess.one(
new LambdaQueryWrapper<GroupConfig>().eq(GroupConfig::getGroupName, groupName));
new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.eq(GroupConfig::getGroupName, groupName));
GroupConfigResponseVO groupConfigResponseVO = GroupConfigResponseVOConverter.INSTANCE.toGroupConfigResponseVO(
groupConfig);
groupConfig);
Optional.ofNullable(IdGeneratorMode.modeOf(groupConfig.getIdGeneratorMode())).ifPresent(idGeneratorMode -> {
groupConfigResponseVO.setIdGeneratorModeName(idGeneratorMode.getDesc());
@ -271,7 +275,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
@Override
public List<GroupConfigResponseVO> getAllGroupConfigList(final List<String> namespaceIds) {
if(CollectionUtils.isEmpty(namespaceIds)) {
if (CollectionUtils.isEmpty(namespaceIds)) {
return new ArrayList<>();
}
@ -288,10 +292,10 @@ public class GroupConfigServiceImpl implements GroupConfigService {
ConfigAccess<GroupConfig> groupConfigAccess = accessTemplate.getGroupConfigAccess();
List<GroupConfig> groupConfigs = groupConfigAccess.list(new LambdaQueryWrapper<GroupConfig>()
.in(GroupConfig::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.select(GroupConfig::getGroupName))
.stream()
.collect(Collectors.toList());
.in(GroupConfig::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.select(GroupConfig::getGroupName))
.stream()
.collect(Collectors.toList());
return groupConfigs.stream().map(GroupConfig::getGroupName).collect(Collectors.toList());
}

View File

@ -67,8 +67,10 @@ public class NamespaceServiceImpl implements NamespaceService {
PageDTO<Namespace> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
LambdaQueryWrapper<Namespace> queryWrapper = new LambdaQueryWrapper<>();
if (StrUtil.isNotBlank(queryVO.getName())) {
queryWrapper.like(Namespace::getName, queryVO.getName() + "%");
if (StrUtil.isNotBlank(queryVO.getKeyword())) {
queryWrapper.like(Namespace::getName, queryVO.getKeyword().trim() + "%")
.or().like(Namespace::getUniqueId, queryVO.getKeyword().trim() + "%")
;
}
queryWrapper.eq(Namespace::getDeleted, StatusEnum.NO);

View File

@ -67,8 +67,9 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
return new PageResult<>(pageDTO, new ArrayList<>());
}
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
LambdaQueryWrapper<RetryDeadLetter> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(RetryDeadLetter::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId());
queryWrapper.eq(RetryDeadLetter::getNamespaceId, namespaceId);
queryWrapper.eq(RetryDeadLetter::getGroupName, queryVO.getGroupName());
if (StrUtil.isNotBlank(queryVO.getSceneName())) {
@ -88,17 +89,19 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
}
PageDTO<RetryDeadLetter> retryDeadLetterPageDTO = accessTemplate.getRetryDeadLetterAccess()
.listPage(queryVO.getGroupName(), pageDTO, queryWrapper);
.listPage(queryVO.getGroupName(), namespaceId, pageDTO, queryWrapper);
return new PageResult<>(retryDeadLetterPageDTO,
RetryDeadLetterResponseVOConverter.INSTANCE.batchConvert(retryDeadLetterPageDTO.getRecords()));
RetryDeadLetterResponseVOConverter.INSTANCE.batchConvert(retryDeadLetterPageDTO.getRecords()));
}
@Override
public RetryDeadLetterResponseVO getRetryDeadLetterById(String groupName, Long id) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
RetryDeadLetter retryDeadLetter = retryDeadLetterAccess.one(groupName,
new LambdaQueryWrapper<RetryDeadLetter>().eq(RetryDeadLetter::getId, id));
RetryDeadLetter retryDeadLetter = retryDeadLetterAccess.one(groupName, namespaceId,
new LambdaQueryWrapper<RetryDeadLetter>().eq(RetryDeadLetter::getId, id));
return RetryDeadLetterResponseVOConverter.INSTANCE.convert(retryDeadLetter);
}
@ -106,32 +109,32 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
@Transactional
public int rollback(BatchRollBackRetryDeadLetterVO rollBackRetryDeadLetterVO) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
String groupName = rollBackRetryDeadLetterVO.getGroupName();
List<Long> ids = rollBackRetryDeadLetterVO.getIds();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
List<RetryDeadLetter> retryDeadLetterList = retryDeadLetterAccess.list(groupName,
new LambdaQueryWrapper<RetryDeadLetter>().in(RetryDeadLetter::getId, ids));
List<RetryDeadLetter> retryDeadLetterList = retryDeadLetterAccess.list(groupName, namespaceId,
new LambdaQueryWrapper<RetryDeadLetter>().in(RetryDeadLetter::getId, ids));
Assert.notEmpty(retryDeadLetterList, () -> new EasyRetryServerException("数据不存在"));
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
Set<String> sceneNameSet = retryDeadLetterList.stream().map(RetryDeadLetter::getSceneName)
.collect(Collectors.toSet());
.collect(Collectors.toSet());
List<SceneConfig> sceneConfigs = sceneConfigAccess.list(new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.in(SceneConfig::getSceneName, sceneNameSet));
.eq(SceneConfig::getNamespaceId, namespaceId)
.in(SceneConfig::getSceneName, sceneNameSet));
Map<String, SceneConfig> sceneConfigMap = sceneConfigs.stream().collect(Collectors.toMap((sceneConfig) ->
sceneConfig.getGroupName() + sceneConfig.getSceneName(), Function.identity()));
sceneConfig.getGroupName() + sceneConfig.getSceneName(), Function.identity()));
List<RetryTask> waitRollbackList = new ArrayList<>();
for (RetryDeadLetter retryDeadLetter : retryDeadLetterList) {
SceneConfig sceneConfig = sceneConfigMap.get(
retryDeadLetter.getGroupName() + retryDeadLetter.getSceneName());
retryDeadLetter.getGroupName() + retryDeadLetter.getSceneName());
Assert.notNull(sceneConfig,
() -> new EasyRetryServerException("未查询到场景. [{}]", retryDeadLetter.getSceneName()));
() -> new EasyRetryServerException("未查询到场景. [{}]", retryDeadLetter.getSceneName()));
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter);
retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
@ -148,16 +151,16 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
}
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
Assert.isTrue(waitRollbackList.size() == retryTaskAccess.batchInsert(groupName, waitRollbackList),
() -> new EasyRetryServerException("新增重试任务失败"));
Assert.isTrue(waitRollbackList.size() == retryTaskAccess.batchInsert(groupName, namespaceId, waitRollbackList),
() -> new EasyRetryServerException("新增重试任务失败"));
Set<Long> waitDelRetryDeadLetterIdSet = retryDeadLetterList.stream().map(RetryDeadLetter::getId)
.collect(Collectors.toSet());
Assert.isTrue(waitDelRetryDeadLetterIdSet.size() == retryDeadLetterAccess.delete(groupName,
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, groupName)
.in(RetryDeadLetter::getId, waitDelRetryDeadLetterIdSet)),
() -> new EasyRetryServerException("删除死信队列数据失败"))
.collect(Collectors.toSet());
Assert.isTrue(waitDelRetryDeadLetterIdSet.size() == retryDeadLetterAccess.delete(groupName, namespaceId,
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, groupName)
.in(RetryDeadLetter::getId, waitDelRetryDeadLetterIdSet)),
() -> new EasyRetryServerException("删除死信队列数据失败"))
;
// 变更日志的状态
@ -166,11 +169,11 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
Set<String> uniqueIdSet = waitRollbackList.stream().map(RetryTask::getUniqueId).collect(Collectors.toSet());
int update = retryTaskLogMapper.update(retryTaskLog, new LambdaUpdateWrapper<RetryTaskLog>()
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.in(RetryTaskLog::getUniqueId, uniqueIdSet)
.eq(RetryTaskLog::getGroupName, groupName));
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.in(RetryTaskLog::getUniqueId, uniqueIdSet)
.eq(RetryTaskLog::getGroupName, groupName));
Assert.isTrue(update == uniqueIdSet.size(),
() -> new EasyRetryServerException("回滚日志状态失败, 可能原因: 日志信息缺失或存在多个相同uniqueId"));
() -> new EasyRetryServerException("回滚日志状态失败, 可能原因: 日志信息缺失或存在多个相同uniqueId"));
return update;
}
@ -179,10 +182,10 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
public int batchDelete(BatchDeleteRetryDeadLetterVO deadLetterVO) {
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
return retryDeadLetterAccess.delete(deadLetterVO.getGroupName(),
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds()));
return retryDeadLetterAccess.delete(deadLetterVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds()));
}
}

View File

@ -119,49 +119,49 @@ public class RetryTaskServiceImpl implements RetryTaskService {
retryTaskLambdaQueryWrapper.eq(RetryTask::getRetryStatus, queryVO.getRetryStatus());
}
RequestDataHelper.setPartition(queryVO.getGroupName());
retryTaskLambdaQueryWrapper.select(RetryTask::getId, RetryTask::getBizNo, RetryTask::getIdempotentId,
RetryTask::getGroupName, RetryTask::getNextTriggerAt, RetryTask::getRetryCount,
RetryTask::getRetryStatus, RetryTask::getUpdateDt, RetryTask::getSceneName, RetryTask::getUniqueId,
RetryTask::getTaskType);
pageDTO = accessTemplate.getRetryTaskAccess().listPage(queryVO.getGroupName(), pageDTO,
retryTaskLambdaQueryWrapper.orderByDesc(RetryTask::getCreateDt));
pageDTO = accessTemplate.getRetryTaskAccess()
.listPage(queryVO.getGroupName(), namespaceId,
pageDTO,
retryTaskLambdaQueryWrapper.orderByDesc(RetryTask::getCreateDt));
return new PageResult<>(pageDTO,
RetryTaskResponseVOConverter.INSTANCE.toRetryTaskResponseVO(pageDTO.getRecords()));
}
@Override
public RetryTaskResponseVO getRetryTaskById(String groupName, Long id) {
RequestDataHelper.setPartition(groupName);
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RetryTask retryTask = retryTaskAccess.one(groupName,
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getId, id));
RetryTask retryTask = retryTaskAccess.one(groupName, UserSessionUtils.currentUserSession().getNamespaceId(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getId, id));
return RetryTaskResponseVOConverter.INSTANCE.toRetryTaskResponseVO(retryTask);
}
@Override
@Transactional
public int updateRetryTaskStatus(RetryTaskUpdateStatusRequestVO retryTaskUpdateStatusRequestVO) {
public int updateRetryTaskStatus(RetryTaskUpdateStatusRequestVO requestVO) {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskUpdateStatusRequestVO.getRetryStatus());
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(requestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new EasyRetryServerException("重试状态错误. [{}]", retryTaskUpdateStatusRequestVO.getRetryStatus());
throw new EasyRetryServerException("重试状态错误. [{}]", requestVO.getRetryStatus());
}
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RetryTask retryTask = retryTaskAccess.one(retryTaskUpdateStatusRequestVO.getGroupName(),
RetryTask retryTask = retryTaskAccess.one(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getId, retryTaskUpdateStatusRequestVO.getId()));
.eq(RetryTask::getId, requestVO.getId()));
if (Objects.isNull(retryTask)) {
throw new EasyRetryServerException("未查询到重试任务");
}
retryTask.setRetryStatus(retryTaskUpdateStatusRequestVO.getRetryStatus());
retryTask.setGroupName(retryTaskUpdateStatusRequestVO.getGroupName());
retryTask.setRetryStatus(requestVO.getRetryStatus());
retryTask.setGroupName(requestVO.getGroupName());
// 若恢复重试则需要重新计算下次触发时间
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
@ -194,7 +194,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
}
retryTask.setUpdateDt(LocalDateTime.now());
return retryTaskAccess.updateById(retryTaskUpdateStatusRequestVO.getGroupName(), retryTask);
return retryTaskAccess.updateById(requestVO.getGroupName(), namespaceId, retryTask);
}
@Override
@ -273,7 +273,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
// 根据重试数据id更新执行器名称
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
return retryTaskAccess.update(requestVO.getGroupName(), retryTask,
return retryTaskAccess.update(requestVO.getGroupName(), namespaceId, retryTask,
new LambdaUpdateWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, requestVO.getGroupName())
@ -284,7 +284,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
public Integer deleteRetryTask(final BatchDeleteRetryTaskVO requestVO) {
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
return retryTaskAccess.delete(requestVO.getGroupName(),
return retryTaskAccess.delete(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, requestVO.getGroupName())
@ -355,7 +355,8 @@ public class RetryTaskServiceImpl implements RetryTaskService {
List<String> uniqueIds = requestVO.getUniqueIds();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(),
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(
requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
@ -379,7 +380,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(),
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType())

View File

@ -54,7 +54,7 @@ public class SceneConfigServiceImpl implements SceneConfigService {
}
pageDTO = accessTemplate.getSceneConfigAccess()
.listPage(pageDTO, sceneConfigLambdaQueryWrapper.orderByDesc(SceneConfig::getCreateDt));
.listPage(pageDTO, sceneConfigLambdaQueryWrapper.orderByDesc(SceneConfig::getCreateDt));
return new PageResult<>(pageDTO, SceneConfigResponseVOConverter.INSTANCE.batchConvert(pageDTO.getRecords()));
@ -66,11 +66,11 @@ public class SceneConfigServiceImpl implements SceneConfigService {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<SceneConfig> sceneConfigs = accessTemplate.getSceneConfigAccess()
.list(new LambdaQueryWrapper<SceneConfig>()
.select(SceneConfig::getSceneName, SceneConfig::getDescription)
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, groupName)
.orderByDesc(SceneConfig::getCreateDt));
.list(new LambdaQueryWrapper<SceneConfig>()
.select(SceneConfig::getSceneName, SceneConfig::getDescription)
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, groupName)
.orderByDesc(SceneConfig::getCreateDt));
return SceneConfigResponseVOConverter.INSTANCE.batchConvert(sceneConfigs);
}
@ -79,23 +79,29 @@ public class SceneConfigServiceImpl implements SceneConfigService {
public Boolean saveSceneConfig(SceneConfigRequestVO requestVO) {
checkExecuteInterval(requestVO);
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
Assert.isTrue(0 == sceneConfigAccess.count(
new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, requestVO.getGroupName())
.eq(SceneConfig::getSceneName, requestVO.getSceneName())
), () -> new EasyRetryServerException("场景名称重复. {}", requestVO.getSceneName()));
SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO);
sceneConfig.setCreateDt(LocalDateTime.now());
sceneConfig.setNamespaceId(namespaceId);
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
Assert.isTrue(1 == sceneConfigAccess.insert(sceneConfig),
() -> new EasyRetryServerException("failed to insert scene. sceneConfig:[{}]",
JsonUtil.toJsonString(sceneConfig)));
() -> new EasyRetryServerException("failed to insert scene. sceneConfig:[{}]",
JsonUtil.toJsonString(sceneConfig)));
return Boolean.TRUE;
}
private static void checkExecuteInterval(SceneConfigRequestVO requestVO) {
if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(),
WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getBackOff())) {
WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getBackOff())) {
if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) {
throw new EasyRetryServerException("间隔时间不得小于10");
}
@ -120,19 +126,19 @@ public class SceneConfigServiceImpl implements SceneConfigService {
sceneConfig.setTriggerInterval(Optional.ofNullable(sceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(sceneConfig,
new LambdaUpdateWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, requestVO.getGroupName())
.eq(SceneConfig::getSceneName, requestVO.getSceneName())),
() -> new EasyRetryServerException("failed to update scene. sceneConfig:[{}]",
JsonUtil.toJsonString(sceneConfig)));
new LambdaUpdateWrapper<SceneConfig>()
.eq(SceneConfig::getNamespaceId, namespaceId)
.eq(SceneConfig::getGroupName, requestVO.getGroupName())
.eq(SceneConfig::getSceneName, requestVO.getSceneName())),
() -> new EasyRetryServerException("failed to update scene. sceneConfig:[{}]",
JsonUtil.toJsonString(sceneConfig)));
return Boolean.TRUE;
}
@Override
public SceneConfigResponseVO getSceneConfigDetail(Long id) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().one(new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getId, id));
.eq(SceneConfig::getId, id));
return SceneConfigResponseVOConverter.INSTANCE.convert(sceneConfig);
}
}

View File

@ -13,7 +13,7 @@
:disabled="isEdit"
v-decorator="[
'uniqueId',
{rules: [{ required: false, message: '请输入空间名称', whitespace: true},{required: true, max: 64, message: '最多支持64个字符'}, {validator: validate, trigger: ['change', 'blur']}]}
{rules: [{ required: false, message: '请输入空间名称', whitespace: true},{required: false, max: 64, message: '最多支持64个字符'}, {validator: validate, trigger: ['change', 'blur']}]}
]" />
</a-form-item>
<a-form-item

View File

@ -7,7 +7,7 @@
<a-row :gutter="48">
<a-col :md="8" :sm="24">
<a-form-item label="用户名">
<a-input v-model="queryParam.name" placeholder="请输入空间名称" allowClear/>
<a-input v-model="queryParam.keyword" placeholder="请输入空间名称/唯一标识" allowClear/>
</a-form-item>
</a-col>
<a-col :md="!advanced && 8 || 24" :sm="24">
@ -36,12 +36,6 @@
<span slot="serial" slot-scope="record">
{{ record.id }}
</span>
<!-- <span slot="groupNameList" slot-scope="text, record">-->
<!-- {{ record.role === 2 ? '所有组' : text.toString() }}-->
<!-- </span>-->
<!-- <span slot="role" slot-scope="text, record">-->
<!-- {{ record.role === 2 ? '管理员' : '普通用户' }}-->
<!-- </span>-->
<span slot="action" slot-scope="text, record">
<template>
<a @click="handleEdit(record)">编辑</a>

View File

@ -81,7 +81,7 @@
>
<a-descriptions :column="8" bordered>
<a-descriptions-item :label="item.namespaceName + ' (' + item.namespaceId + ')'" :key="item.namespaceId" :span="8" v-for="item in systemPermissions">
<a-tag v-for="item in item.groupNames" :key="item">
<a-tag color="#64a6ea" v-for="item in item.groupNames" :key="item">
{{ item }}
</a-tag>
</a-descriptions-item>