feat: 2.3.0

1. 手动执行不符合条件时,提醒用户具体的失败原因
This commit is contained in:
byteblogs168 2023-09-12 18:16:20 +08:00
parent ab65e55fdc
commit 5e0e798629
5 changed files with 78 additions and 27 deletions

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.service.impl;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO; import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
@ -334,7 +335,8 @@ public class RetryTaskServiceImpl implements RetryTaskService {
.withRetryContext(retryContext) .withRetryContext(retryContext)
.build(); .build();
Assert.isTrue(executor.filter(), () -> new EasyRetryServerException("任务:{}不满足执行条件.具体原因请查看系统日志", retryTask.getUniqueId())); Pair<Boolean, StringBuilder> pair = executor.filter();
Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString()));
productExecUnitActor(executor, ActorGenerator.execUnitActor()); productExecUnitActor(executor, ActorGenerator.execUnitActor());
} }
@ -373,8 +375,9 @@ public class RetryTaskServiceImpl implements RetryTaskService {
.withRetryContext(retryContext) .withRetryContext(retryContext)
.build(); .build();
Assert.isTrue(executor.filter(), () -> new EasyRetryServerException("任务:{}不满足执行条件.具体原因请查看系统日志", retryTask.getUniqueId())); Pair<Boolean, StringBuilder> pair = executor.filter();
Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString()));
productExecUnitActor(executor, ActorGenerator.execCallbackUnitActor()); productExecUnitActor(executor, ActorGenerator.execCallbackUnitActor());
} }

View File

@ -1,5 +1,7 @@
package com.aizuda.easy.retry.server.support; package com.aizuda.easy.retry.server.support;
import cn.hutool.core.lang.Pair;
/** /**
* 重试过滤策略为了判断哪些重试数据符合条件 * 重试过滤策略为了判断哪些重试数据符合条件
* *
@ -14,7 +16,7 @@ public interface FilterStrategy {
* @param retryContext {@link RetryContext} 重试上下文 * @param retryContext {@link RetryContext} 重试上下文
* @return true- 符合重试条 false- 不满足重试条件 * @return true- 符合重试条 false- 不满足重试条件
*/ */
boolean filter(RetryContext retryContext); Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext);
/** /**
* 按照正序排列重试过滤器 * 按照正序排列重试过滤器

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.scan;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties; import com.aizuda.easy.retry.server.config.SystemProperties;
@ -81,7 +82,11 @@ public abstract class AbstractScanGroup extends AbstractActor {
RetryContext retryContext = builderRetryContext(groupName, retryTask); RetryContext retryContext = builderRetryContext(groupName, retryTask);
RetryExecutor executor = builderResultRetryExecutor(retryContext); RetryExecutor executor = builderResultRetryExecutor(retryContext);
if (!executor.filter()) { Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> pair = executor.filter();
if (!pair.getKey()) {
log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]",
retryContext.getRetryTask().getGroupName(),
retryContext.getRetryTask().getUniqueId(), pair.getValue().toString());
continue; continue;
} }

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.support.retry; package com.aizuda.easy.retry.server.support.retry;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.server.akka.ActorGenerator; import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.support.FilterStrategy; import com.aizuda.easy.retry.server.support.FilterStrategy;
import com.aizuda.easy.retry.server.support.RetryContext; import com.aizuda.easy.retry.server.support.RetryContext;
@ -36,18 +37,16 @@ public class RetryExecutor<V> {
this.retryContext = retryContext; this.retryContext = retryContext;
} }
public boolean filter() { public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter() {
for (FilterStrategy filterStrategy : filterStrategies) { for (FilterStrategy filterStrategy : filterStrategies) {
if (!filterStrategy.filter(retryContext)) { Pair<Boolean, StringBuilder> pair = filterStrategy.filter(retryContext);
log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], filter:[{}]", if (!pair.getKey()) {
retryContext.getRetryTask().getGroupName(), return pair;
retryContext.getRetryTask().getUniqueId(), filterStrategy.getClass().getName());
return false;
} }
} }
return true; return Pair.of(Boolean.TRUE, new StringBuilder());
} }
/** /**

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.support.strategy; package com.aizuda.easy.retry.server.support.strategy;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance; import com.aizuda.easy.retry.server.support.handler.ServerNodeBalance;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -15,6 +16,7 @@ import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter; import com.aizuda.easy.retry.server.support.cache.CacheGroupRateLimiter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -93,9 +95,17 @@ public class FilterStrategies {
private static final class TriggerAtFilterStrategies implements FilterStrategy { private static final class TriggerAtFilterStrategies implements FilterStrategy {
@Override @Override
public boolean filter(RetryContext retryContext) { public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
LocalDateTime nextTriggerAt = retryContext.getRetryTask().getNextTriggerAt(); RetryTask retryTask = retryContext.getRetryTask();
return nextTriggerAt.isBefore(LocalDateTime.now()); LocalDateTime nextTriggerAt =retryTask.getNextTriggerAt();
boolean result = nextTriggerAt.isBefore(LocalDateTime.now());
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("未到触发时间. uniqueId:[{0}]", retryTask.getUniqueId()));
}
return Pair.of(result, description);
} }
@Override @Override
@ -118,9 +128,16 @@ public class FilterStrategies {
} }
@Override @Override
public boolean filter(RetryContext retryContext) { public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
RetryTask retryTask = retryContext.getRetryTask(); RetryTask retryTask = retryContext.getRetryTask();
return !idempotentStrategy.isExist(retryTask.getGroupName(), retryTask.getId().intValue());
boolean result = !idempotentStrategy.isExist(retryTask.getGroupName(), retryTask.getId().intValue());
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("存在执行中的任务.uniqueId:[{0}]", retryTask.getUniqueId()));
}
return Pair.of(result, description);
} }
@Override @Override
@ -137,8 +154,17 @@ public class FilterStrategies {
private static final class SceneBlackFilterStrategies implements FilterStrategy { private static final class SceneBlackFilterStrategies implements FilterStrategy {
@Override @Override
public boolean filter(RetryContext retryContext) { public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
return !retryContext.getSceneBlacklist().contains(retryContext.getRetryTask().getSceneName()); RetryTask retryTask = retryContext.getRetryTask();
boolean result = !retryContext.getSceneBlacklist().contains(retryTask.getSceneName());
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("场景:[{0}]在黑名单中, 不允许执行.", retryTask.getSceneName()));
}
return Pair.of(result, description);
} }
@Override @Override
@ -153,16 +179,22 @@ public class FilterStrategies {
private static final class CheckAliveClientPodFilterStrategies implements FilterStrategy { private static final class CheckAliveClientPodFilterStrategies implements FilterStrategy {
@Override @Override
public boolean filter(RetryContext retryContext) { public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
RetryTask retryTask = retryContext.getRetryTask();
RegisterNodeInfo serverNode = retryContext.getServerNode(); RegisterNodeInfo serverNode = retryContext.getServerNode();
StringBuilder description = new StringBuilder();
if (Objects.isNull(serverNode)) { if (Objects.isNull(serverNode)) {
return false; return Pair.of(Boolean.FALSE, description.append(MessageFormat.format("没有可执行的客户端节点. uniqueId:[{0}]", retryTask.getUniqueId())));
} }
ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class); ServerNodeMapper serverNodeMapper = SpringContext.getBeanByType(ServerNodeMapper.class);
boolean result = 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, serverNode.getHostId()));
if (!result) {
description.append(MessageFormat.format("DB中未查询到客户端节点. hostId:[{0}] uniqueId:[{1}]", serverNode.getHostId(), retryTask.getUniqueId()));
}
return 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, serverNode.getHostId())); return Pair.of(result, description);
} }
@Override @Override
@ -177,16 +209,20 @@ public class FilterStrategies {
private static final class RateLimiterFilterStrategies implements FilterStrategy { private static final class RateLimiterFilterStrategies implements FilterStrategy {
@Override @Override
public boolean filter(RetryContext retryContext) { public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
RegisterNodeInfo serverNode = retryContext.getServerNode(); RegisterNodeInfo serverNode = retryContext.getServerNode();
RetryTask retryTask = retryContext.getRetryTask();
StringBuilder description = new StringBuilder();
Boolean result = Boolean.TRUE;
RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId()); RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId());
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
LogUtils.error(log, "该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId()); LogUtils.error(log, "该POD:[{}]已到达最大限流阈值,本次重试不执行", serverNode.getHostId());
return Boolean.FALSE; description.append(MessageFormat.format("该POD:[{0}]已到达最大限流阈值,本次重试不执行.uniqueId:[{1}]", serverNode.getHostId(), retryTask.getUniqueId()));
result = Boolean.FALSE;
} }
return Boolean.TRUE; return Pair.of(result, description);
} }
@Override @Override
@ -201,8 +237,14 @@ public class FilterStrategies {
private static final class ReBalanceFilterStrategies implements FilterStrategy { private static final class ReBalanceFilterStrategies implements FilterStrategy {
@Override @Override
public boolean filter(RetryContext retryContext) { public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
return !ServerNodeBalance.RE_BALANCE_ING.get(); RetryTask retryTask = retryContext.getRetryTask();
boolean result = !ServerNodeBalance.RE_BALANCE_ING.get();
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("系统Rebalancing中数据无法重试.uniqueId:[{0}]", retryTask.getUniqueId()));
}
return Pair.of(result, description);
} }
@Override @Override