fix(1.5.0-beta1): 修复重试传播机制重试次数错误问题
This commit is contained in:
parent
53530517f4
commit
310d9bc580
@ -8,9 +8,7 @@ import com.aizuda.snailjob.common.core.model.SnailJobHeaders;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|
||||||
import java.util.Deque;
|
import java.util.*;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -21,6 +19,10 @@ import java.util.concurrent.LinkedBlockingDeque;
|
|||||||
*/
|
*/
|
||||||
public class RetrySiteSnapshot {
|
public class RetrySiteSnapshot {
|
||||||
|
|
||||||
|
private static final String RETRY_STAGE_KEY = "RETRY_STAGE";
|
||||||
|
private static final String RETRY_CLASS_METHOD_ENTRANCE_KEY = "RETRY_CLASS_METHOD_ENTRANCE";
|
||||||
|
private static final String RETRY_STATUS_KEY = "RETRY_STATUS";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 重试阶段,1-内存重试阶段,2-服务端重试阶段
|
* 重试阶段,1-内存重试阶段,2-服务端重试阶段
|
||||||
*/
|
*/
|
||||||
@ -46,11 +48,33 @@ public class RetrySiteSnapshot {
|
|||||||
*/
|
*/
|
||||||
private static final RetrySiteSnapshotContext<String> RETRY_STATUS_CODE = SnailRetrySpiLoader.loadRetrySiteSnapshotContext();
|
private static final RetrySiteSnapshotContext<String> RETRY_STATUS_CODE = SnailRetrySpiLoader.loadRetrySiteSnapshotContext();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 挂起重试的内存状态
|
||||||
|
*/
|
||||||
|
private static final RetrySiteSnapshotContext<Map<String, Object>> SUSPEND = SnailRetrySpiLoader.loadRetrySiteSnapshotContext();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 进入方法入口时间标记
|
* 进入方法入口时间标记
|
||||||
*/
|
*/
|
||||||
private static final RetrySiteSnapshotContext<Long> ENTRY_METHOD_TIME = SnailRetrySpiLoader.loadRetrySiteSnapshotContext();
|
private static final RetrySiteSnapshotContext<Long> ENTRY_METHOD_TIME = SnailRetrySpiLoader.loadRetrySiteSnapshotContext();
|
||||||
|
|
||||||
|
public static void suspend() {
|
||||||
|
SUSPEND.set(new HashMap<>(){{
|
||||||
|
put(RETRY_STAGE_KEY, RETRY_STAGE.get());
|
||||||
|
put(RETRY_STATUS_KEY, RETRY_STATUS.get());
|
||||||
|
put(RETRY_CLASS_METHOD_ENTRANCE_KEY, RETRY_CLASS_METHOD_ENTRANCE.get());
|
||||||
|
}});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void restore() {
|
||||||
|
Optional.ofNullable(SUSPEND.get()).ifPresent(map -> {
|
||||||
|
RETRY_STAGE.set((Integer) map.get(RETRY_STAGE_KEY));
|
||||||
|
RETRY_STATUS.set((Integer) map.get(RETRY_STATUS_KEY));
|
||||||
|
RETRY_CLASS_METHOD_ENTRANCE.set((Deque<MethodEntranceMeta>) map.get(RETRY_CLASS_METHOD_ENTRANCE_KEY));
|
||||||
|
SUSPEND.remove();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public static Integer getStage() {
|
public static Integer getStage() {
|
||||||
return RETRY_STAGE.get();
|
return RETRY_STAGE.get();
|
||||||
}
|
}
|
||||||
|
@ -74,7 +74,18 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
|
|||||||
String methodEntrance = getMethodEntrance(retryable, executorClassName);
|
String methodEntrance = getMethodEntrance(retryable, executorClassName);
|
||||||
|
|
||||||
if (Propagation.REQUIRES_NEW.equals(retryable.propagation())) {
|
if (Propagation.REQUIRES_NEW.equals(retryable.propagation())) {
|
||||||
|
// 如果已经是入口了就不需要继续添加入口了
|
||||||
|
if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance)) {
|
||||||
|
// 这里需要挂起外部重试的内存的信息
|
||||||
|
if (RetrySiteSnapshot.isRunning()
|
||||||
|
&& RetrySiteSnapshot.getStage() == RetrySiteSnapshot.EnumStage.LOCAL.getStage()) {
|
||||||
|
RetrySiteSnapshot.suspend();
|
||||||
|
// 清除线程信息
|
||||||
|
RetrySiteSnapshot.removeAll();
|
||||||
|
}
|
||||||
|
// 设置新的内容信息
|
||||||
RetrySiteSnapshot.setMethodEntrance(methodEntrance);
|
RetrySiteSnapshot.setMethodEntrance(methodEntrance);
|
||||||
|
}
|
||||||
} else if (!RetrySiteSnapshot.existedMethodEntrance()) {
|
} else if (!RetrySiteSnapshot.existedMethodEntrance()) {
|
||||||
RetrySiteSnapshot.setMethodEntrance(methodEntrance);
|
RetrySiteSnapshot.setMethodEntrance(methodEntrance);
|
||||||
} else {
|
} else {
|
||||||
@ -195,7 +206,11 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
|
|||||||
sendMessage(e);
|
sendMessage(e);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
// 清除当前重试的信息
|
||||||
RetrySiteSnapshot.removeAll();
|
RetrySiteSnapshot.removeAll();
|
||||||
|
// 还原挂起的信息
|
||||||
|
RetrySiteSnapshot.restore();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
|
Loading…
Reference in New Issue
Block a user