修复多线程未上传异常数据

This commit is contained in:
byteblogs168 2023-01-14 19:58:22 +08:00
parent b6e0c72453
commit 689ff7a9d2
3 changed files with 37 additions and 11 deletions

View File

@ -38,4 +38,4 @@ logging:
x-retry: x-retry:
server: server:
host: 192.168.100.3 host: 127.0.0.1

View File

@ -13,8 +13,9 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.test.mock.mockito.MockBean;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import static org.awaitility.Awaitility.await; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* @author: www.byteblogs.com * @author: www.byteblogs.com
@ -74,4 +75,28 @@ public class ExistsTransactionalRetryServiceTest {
Thread.sleep(90000); Thread.sleep(90000);
} }
@SneakyThrows
@Test
public void syncTestSimpleInsert() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Mockito.when(remoteService.call())
.thenReturn(new Result(0, "1"))
.thenReturn(new Result(0, "2"))
.thenReturn(new Result(0, "3"))
.thenReturn(new Result(0, "4"))
.thenReturn(new Result(0, "5"))
;
try {
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(() -> testExistsTransactionalRetryService.testSimpleInsert(UUID.randomUUID().toString()));
}
} catch (Exception e) {
log.error("", e);
}
Thread.sleep(900000);
}
} }

View File

@ -74,13 +74,13 @@ public class RetryAspect {
private void doHandlerRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, String methodEntrance, Throwable throwable) { private void doHandlerRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, String methodEntrance, Throwable throwable) {
if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance) || RetrySiteSnapshot.isRunning() || Objects.isNull(throwable)) { if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance)) {
return; return;
} }
if (!TransactionSynchronizationManager.isActualTransactionActive()) { if (!TransactionSynchronizationManager.isActualTransactionActive()) {
// 无事务, 开启重试 // 无事务, 开启重试
openRetry(point, traceId, retryable, executorClassName); openRetry(point, traceId, retryable, executorClassName, throwable);
return; return;
} }
@ -89,17 +89,18 @@ public class RetryAspect {
@Override @Override
public void afterCompletion(int status) { public void afterCompletion(int status) {
if (STATUS_ROLLED_BACK == status) { // 有事务开启重试
openRetry(point, traceId, retryable, executorClassName, throwable);
// 有事务开启重试
openRetry(point, traceId, retryable, executorClassName);
}
} }
}); });
} }
private void openRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName) { private void openRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, Throwable throwable) {
try { try {
if (Objects.isNull(throwable) || RetrySiteSnapshot.isRunning()) {
return;
}
RetryerResultContext context = retryStrategy.openRetry(retryable.scene(), executorClassName, point.getArgs()); RetryerResultContext context = retryStrategy.openRetry(retryable.scene(), executorClassName, point.getArgs());
if (RetryResultStatusEnum.SUCCESS.getStatus().equals(context.getRetryResultStatusEnum().getStatus())) { if (RetryResultStatusEnum.SUCCESS.getStatus().equals(context.getRetryResultStatusEnum().getStatus())) {
LogUtils.debug("aop 结果成功 traceId:[{}] result:[{}]", traceId, context.getResult()); LogUtils.debug("aop 结果成功 traceId:[{}] result:[{}]", traceId, context.getResult());