feat:2.4.0
1. 重试任务优化完成
This commit is contained in:
parent
01fb0cda20
commit
fdbf9493ae
@ -14,7 +14,7 @@ public interface FilterStrategy {
|
||||
* 过滤器执行器
|
||||
*
|
||||
* @param retryContext {@link RetryContext} 重试上下文
|
||||
* @return true- 符合重试条 false- 不满足重试条件
|
||||
* @return true- 符合重试条件 false- 不满足重试条件
|
||||
*/
|
||||
Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext);
|
||||
|
||||
|
@ -35,7 +35,7 @@ public class TimerWheelHandler implements Lifecycle {
|
||||
// tickDuration 和 timeUnit 一格的时间长度
|
||||
// ticksPerWheel 一圈有多少格
|
||||
timer = new HashedWheelTimer(
|
||||
new CustomizableThreadFactory("retry_task_timer_wheel_"), 100,
|
||||
new CustomizableThreadFactory("retry-task-timer-wheel-"), 100,
|
||||
TimeUnit.MILLISECONDS, 1024);
|
||||
|
||||
timer.start();
|
||||
@ -80,6 +80,10 @@ public class TimerWheelHandler implements Lifecycle {
|
||||
return cache.getIfPresent(getKey(groupName, uniqueId));
|
||||
}
|
||||
|
||||
public static boolean isExisted(String groupName, String uniqueId) {
|
||||
return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId)));
|
||||
}
|
||||
|
||||
public static boolean cancel(String groupName, String uniqueId) {
|
||||
String key = getKey(groupName, uniqueId);
|
||||
Timeout timeout = cache.getIfPresent(key);
|
||||
|
@ -7,6 +7,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
|
||||
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuator;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
@ -74,6 +75,12 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId());
|
||||
|
||||
for (RetryTask retryTask : list) {
|
||||
// 已经存在时间轮里面的任务由时间轮负责调度
|
||||
boolean existed = TimerWheelHandler.isExisted(retryTask.getGroupName(), retryTask.getUniqueId());
|
||||
if (existed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (TaskActuator taskActuator : taskActuators) {
|
||||
if (taskActuatorScene().getScene() == taskActuator.getTaskType().getScene()) {
|
||||
taskActuator.actuator(retryTask);
|
||||
|
@ -50,11 +50,8 @@ public abstract class AbstractTaskActuator implements TaskActuator, Initializing
|
||||
return;
|
||||
}
|
||||
|
||||
Timeout timeout = TimerWheelHandler.getTimeout(retryTask.getGroupName(), retryTask.getUniqueId());
|
||||
if (Objects.isNull(timeout)) {
|
||||
productExecUnitActor(executor);
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) {
|
||||
Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> pair = executor.filter();
|
||||
@ -73,7 +70,7 @@ public abstract class AbstractTaskActuator implements TaskActuator, Initializing
|
||||
retryTask.setRetryCount(++retryCount);
|
||||
}
|
||||
|
||||
private void productExecUnitActor(RetryExecutor retryExecutor) {
|
||||
protected void productExecUnitActor(RetryExecutor retryExecutor) {
|
||||
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
|
||||
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
|
||||
idempotentStrategy.set(groupIdHash, retryId.intValue());
|
||||
|
Loading…
Reference in New Issue
Block a user