新增链路重试流量管控
This commit is contained in:
byteblogs168 2023-01-14 19:59:45 +08:00
parent 80af094d2d
commit 45c460e307
15 changed files with 254 additions and 25 deletions

View File

@ -38,7 +38,7 @@
<dependency> <dependency>
<groupId>com.x.retry</groupId> <groupId>com.x.retry</groupId>
<artifactId>x-retry-client-starter</artifactId> <artifactId>x-retry-client-starter</artifactId>
<version>0.0.1.0-SNAPSHOT</version> <version>0.0.1.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.baomidou</groupId> <groupId>com.baomidou</groupId>
@ -67,6 +67,11 @@
<version>2.0.0</version> <version>2.0.0</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,30 @@
package com.example;
import com.x.retry.client.core.intercepter.RetrySiteSnapshot;
import com.x.retry.common.core.constant.SystemConstants;
import com.x.retry.common.core.model.XRetryHeaders;
import com.x.retry.common.core.util.JsonUtil;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import java.io.IOException;
import java.util.Objects;
/**
* @author: shuguang.zhang
* @date : 2022-04-17 15:22
*/
public class ExampleClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
XRetryHeaders retryHeader = RetrySiteSnapshot.getRetryHeader();
if (Objects.nonNull(retryHeader)) {
request.getHeaders().add(SystemConstants.X_RETRY_HEAD, JsonUtil.toJsonString(retryHeader));
}
return execution.execute(request, body);
}
}

View File

@ -0,0 +1,37 @@
package com.example.config;
import com.example.ExampleClientHttpRequestInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import java.util.Collections;
/**
* @author: www.byteblogs.com
* @date : 2022-03-09 14:19
*/
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory factory){
RestTemplate restTemplate = new RestTemplate(factory);
restTemplate.setInterceptors(Collections.singletonList(new ExampleClientHttpRequestInterceptor()));
return restTemplate;
}
@Bean
public ClientHttpRequestFactory okHttp3ClientHttpRequestFactory(){
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setReadTimeout(5000);
factory.setConnectTimeout(5000);
factory.setConnectTimeout(3000);
factory.setWriteTimeout(5000);
return factory;
}
}

View File

@ -1,9 +1,14 @@
package com.example.controller; package com.example.controller;
import com.x.retry.common.core.constant.SystemConstants;
import com.x.retry.common.core.model.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.stereotype.Controller; import javax.servlet.http.HttpServletRequest;
/** /**
* <p> * <p>
@ -13,8 +18,15 @@ import org.springframework.stereotype.Controller;
* @author www.byteblogs.com * @author www.byteblogs.com
* @since 2022-03-24 * @since 2022-03-24
*/ */
@Controller @RestController
@RequestMapping("/school") @RequestMapping("/school")
public class SchoolController { public class SchoolController {
@GetMapping("/id")
public Result getSchool(HttpServletRequest request) {
String header = request.getHeader(SystemConstants.X_RETRY_HEAD);
System.out.println(header);
return new Result("school");
}
} }

View File

@ -1,7 +1,9 @@
package com.example.demo; package com.example.demo;
import com.x.retry.common.core.model.Result; import com.x.retry.common.core.model.Result;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
/** /**
* @author: www.byteblogs.com * @author: www.byteblogs.com
@ -10,7 +12,11 @@ import org.springframework.stereotype.Component;
@Component @Component
public class RemoteService { public class RemoteService {
@Autowired
private RestTemplate restTemplate;
public Result call() { public Result call() {
// restTemplate.getForObject("http://127.0.0.1:8088/school/id", Result.class);
return new Result(); return new Result();
} }
} }

View File

@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.client.RestTemplate;
@SpringBootTest @SpringBootTest
@Slf4j @Slf4j
@ -14,6 +15,9 @@ public class ExampleApplicationTests {
@Autowired @Autowired
private SchoolMapper schoolMapper; private SchoolMapper schoolMapper;
@Autowired
private RestTemplate restTemplate;
@Test @Test
public void test() { public void test() {
School school = new School(); School school = new School();
@ -22,5 +26,12 @@ public class ExampleApplicationTests {
schoolMapper.insert(school); schoolMapper.insert(school);
} }
@Test
public void test1() {
String template = restTemplate.getForObject("http://127.0.0.1:8088/school/id", String.class);
System.out.println(template);
}
} }

View File

@ -89,7 +89,7 @@ public class ExistsTransactionalRetryServiceTest {
.thenReturn(new Result(0, "5")) .thenReturn(new Result(0, "5"))
; ;
try { try {
for (int i = 0; i < 300; i++) { for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(() -> testExistsTransactionalRetryService.testSimpleInsert(UUID.randomUUID().toString())); threadPoolExecutor.execute(() -> testExistsTransactionalRetryService.testSimpleInsert(UUID.randomUUID().toString()));
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -21,7 +21,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<revision>0.0.1.0</revision> <revision>0.0.1.0-SNAPSHOT</revision>
<dingding-talk.version>1.0.0</dingding-talk.version> <dingding-talk.version>1.0.0</dingding-talk.version>
<feign.version>11.7</feign.version> <feign.version>11.7</feign.version>
<hibernate-validator.version>5.4.2.Final</hibernate-validator.version> <hibernate-validator.version>5.4.2.Final</hibernate-validator.version>

View File

@ -0,0 +1,27 @@
package com.x.retry.client.core.config;
import com.x.retry.client.core.intercepter.HeadersInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @author www.byteblogs.com
* @date 2022-03-06
* @since 2.0
*/
@Configuration
public class XRetryWebMvcConfigurerAdapter implements WebMvcConfigurer {
@Autowired
private HeadersInterceptor headersInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 注册拦截器
registry.addInterceptor(headersInterceptor).addPathPatterns("/retry/**");
}
}

View File

@ -0,0 +1,46 @@
package com.x.retry.client.core.intercepter;
import com.x.retry.common.core.constant.SystemConstants;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.model.XRetryHeaders;
import com.x.retry.common.core.util.JsonUtil;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Objects;
/**
* @Author:byteblogs
* @Date:2018/09/27 12:52
*/
@Component
public class HeadersInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object object) throws Exception {
String xRetry = httpServletRequest.getHeader(SystemConstants.X_RETRY_HEAD);
if (Objects.nonNull(xRetry)) {
LogUtils.info("x-retry 拦截器 xRetry:[{}]", xRetry);
RetrySiteSnapshot.setRetryHeader(JsonUtil.parseObject(xRetry, XRetryHeaders.class));
}
return true;
}
@Override
public void postHandle(HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse,
Object o, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse,
Object o, Exception e) throws Exception {
}
}

View File

@ -35,8 +35,6 @@ public class RetryAspect {
@Autowired @Autowired
@Qualifier("localRetryStrategies") @Qualifier("localRetryStrategies")
private RetryStrategy retryStrategy; private RetryStrategy retryStrategy;
@Autowired(required = false)
private TransactionTemplate transactionTemplate;
@Around("@annotation(com.x.retry.client.core.annotation.Retryable)") @Around("@annotation(com.x.retry.client.core.annotation.Retryable)")
public Object around(ProceedingJoinPoint point) throws Throwable { public Object around(ProceedingJoinPoint point) throws Throwable {
@ -74,7 +72,7 @@ public class RetryAspect {
private void doHandlerRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, String methodEntrance, Throwable throwable) { private void doHandlerRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, String methodEntrance, Throwable throwable) {
if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance)) { if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance) || RetrySiteSnapshot.isRunning()) {
return; return;
} }
@ -97,7 +95,7 @@ public class RetryAspect {
private void openRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, Throwable throwable) { private void openRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, Throwable throwable) {
try { try {
if (Objects.isNull(throwable) || RetrySiteSnapshot.isRunning()) { if (Objects.isNull(throwable)) {
return; return;
} }

View File

@ -1,7 +1,10 @@
package com.x.retry.client.core.intercepter; package com.x.retry.client.core.intercepter;
import com.x.retry.common.core.model.XRetryHeaders;
import lombok.Getter; import lombok.Getter;
import java.util.Objects;
/** /**
* 重试现场记录器 * 重试现场记录器
* *
@ -25,6 +28,11 @@ public class RetrySiteSnapshot {
*/ */
private static final ThreadLocal<Integer> RETRY_STATUS = ThreadLocal.withInitial(EnumStatus.COMPLETE::getStatus); private static final ThreadLocal<Integer> RETRY_STATUS = ThreadLocal.withInitial(EnumStatus.COMPLETE::getStatus);
/**
* 重试请求头
*/
private static final ThreadLocal<XRetryHeaders> RETRY_HEADER = new ThreadLocal<>();
public static Integer getStage() { public static Integer getStage() {
return RETRY_STAGE.get(); return RETRY_STAGE.get();
} }
@ -57,6 +65,26 @@ public class RetrySiteSnapshot {
return EnumStatus.RUNNING.status == getStatus(); return EnumStatus.RUNNING.status == getStatus();
} }
public static XRetryHeaders getRetryHeader() {
return RETRY_HEADER.get();
}
public static void setRetryHeader(XRetryHeaders headers) {
RETRY_HEADER.set(headers);
}
/**
* 是否是重试流量
*/
public static boolean isRetryFlow(XRetryHeaders headers) {
XRetryHeaders retryHeader = getRetryHeader();
if (Objects.nonNull(retryHeader)) {
return retryHeader.isXRetry();
}
return false;
}
public static void removeAll() { public static void removeAll() {
RETRY_STATUS.remove(); RETRY_STATUS.remove();
RETRY_CLASS_METHOD_ENTRANCE.remove(); RETRY_CLASS_METHOD_ENTRANCE.remove();

View File

@ -0,0 +1,14 @@
package com.x.retry.common.core.constant;
/**
* @author: www.byteblogs.com
* @date : 2022-04-17 10:47
*/
public class SystemConstants {
/**
* 请求头
*/
public static final String X_RETRY_HEAD = "X-RETRY";
}

View File

@ -0,0 +1,16 @@
package com.x.retry.common.core.model;
import lombok.Data;
/**
* @author: shuguang.zhang
* @date : 2022-04-16 22:20
*/
@Data
public class XRetryHeaders {
/**
* 是否是重试流量
*/
private boolean xRetry;
}

View File

@ -1,26 +1,20 @@
package com.x.retry.server.support.dispatch.actor.exec; package com.x.retry.server.support.dispatch.actor.exec;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import com.x.retry.client.model.DispatchRetryDTO; import com.x.retry.client.model.DispatchRetryDTO;
import com.x.retry.client.model.DispatchRetryResultDTO; import com.x.retry.client.model.DispatchRetryResultDTO;
import com.x.retry.common.core.constant.SystemConstants;
import com.x.retry.common.core.log.LogUtils; import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.model.Result; import com.x.retry.common.core.model.Result;
import com.x.retry.common.core.model.XRetryHeaders;
import com.x.retry.common.core.util.Assert; import com.x.retry.common.core.util.Assert;
import com.x.retry.common.core.util.JsonUtil; import com.x.retry.common.core.util.JsonUtil;
import com.x.retry.server.exception.XRetryServerException; import com.x.retry.server.exception.XRetryServerException;
import com.x.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; import com.x.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.x.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.x.retry.server.persistence.mybatis.po.GroupConfig;
import com.x.retry.server.persistence.mybatis.po.RetryTask; import com.x.retry.server.persistence.mybatis.po.RetryTask;
import com.x.retry.server.persistence.mybatis.po.RetryTaskLog; import com.x.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.x.retry.server.persistence.mybatis.po.ServerNode; import com.x.retry.server.persistence.mybatis.po.ServerNode;
import com.x.retry.server.persistence.support.ConfigAccess;
import com.x.retry.server.support.ClientLoadBalance;
import com.x.retry.server.support.IdempotentStrategy; import com.x.retry.server.support.IdempotentStrategy;
import com.x.retry.server.support.allocate.client.ClientLoadBalanceManager;
import com.x.retry.server.support.cache.CacheGroupRateLimiter;
import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext; import com.x.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.x.retry.server.support.retry.RetryExecutor; import com.x.retry.server.support.retry.RetryExecutor;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -29,18 +23,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** /**
* 重试结果执行器 * 重试结果执行器
@ -117,8 +108,16 @@ public class ExecUnitActor extends AbstractActor {
dispatchRetryDTO.setExecutorName(retryTask.getExecutorName()); dispatchRetryDTO.setExecutorName(retryTask.getExecutorName());
dispatchRetryDTO.setArgsStr(retryTask.getArgsStr()); dispatchRetryDTO.setArgsStr(retryTask.getArgsStr());
// 设置header
HttpHeaders requestHeaders = new HttpHeaders();
XRetryHeaders xRetryHeaders = new XRetryHeaders();
xRetryHeaders.setXRetry(Boolean.TRUE);
requestHeaders.add(SystemConstants.X_RETRY_HEAD, JsonUtil.toJsonString(xRetryHeaders));
HttpEntity<DispatchRetryDTO> requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString()); String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString());
Result result = restTemplate.postForObject(format, dispatchRetryDTO, Result.class); Result result = restTemplate.postForObject(format, requestEntity, Result.class);
if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) { if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) {
retryTaskLog.setErrorMessage(result.getMessage()); retryTaskLog.setErrorMessage(result.getMessage());