feat: 2.4.0

1. 完成任务调度流程设计
This commit is contained in:
byteblogs168 2023-09-26 15:55:16 +08:00
parent 90cfbf687c
commit 8fe5392144
100 changed files with 721 additions and 136 deletions

View File

@ -1,33 +0,0 @@
package com.aizuda.easy.retry.client.core.client;
import com.aizuda.easy.retry.client.core.annotation.Mapping;
import com.aizuda.easy.retry.client.core.client.netty.RequestMethod;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import java.util.List;
import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.BATCH_REPORT;
import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.BEAT;
import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.CONFIG;
/**
* netty 客户端请求类
*
* @author: www.byteblogs.com
* @date : 2023-05-11 21:28
* @since 1.3.0
*/
public interface NettyClient {
@Mapping(method = RequestMethod.GET, path = CONFIG)
Result getConfig(Integer version);
@Mapping(method = RequestMethod.GET, path = BEAT)
Result beat(String mark);
@Mapping(method = RequestMethod.POST, path = BATCH_REPORT)
NettyResult reportRetryInfo(List<RetryTaskDTO> list);
}

View File

@ -1,31 +0,0 @@
{
"groups": [
{
"name": "easy-retry",
"type": "com.aizuda.easy.retry.client.core.config.EasyRetryProperties",
"sourceType": "com.aizuda.easy.retry.client.core.config.EasyRetryProperties"
},
{
"name": "easy-retry.server",
"sourceMethod": "getServer()",
"type": "com.aizuda.easy.retry.client.core.config.EasyRetryProperties$ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.core.config.EasyRetryProperties"
}
],
"properties": [
{
"name": "easy-retry.server.host",
"type": "java.lang.String",
"defaultValue": "127.0.0.1",
"description": "服务端的地址,若服务端集群部署则此处配置域名",
"sourceType": "com.aizuda.easy.retry.client.core.config.EasyRetryProperties$ServerConfig"
},
{
"name": "easy-retry.server.port",
"type": "com.aizuda.easy.retry.client.core.config.EasyRetryProperties.ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.core.config.EasyRetryProperties",
"description": "服务端netty的端口号",
"defaultValue": "1788"
}
]
}

38
easy-retry-client/.gitignore vendored Normal file
View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-client</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>easy-retry-client-common</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-common-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.client.core;
package com.aizuda.easy.retry.client.common.client;
/**
* 组件生命周期

View File

@ -0,0 +1,28 @@
package com.aizuda.easy.retry.client.common.client;
import com.aizuda.easy.retry.client.common.client.annotation.Mapping;
import com.aizuda.easy.retry.client.common.client.netty.RequestMethod;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.model.Result;
import java.util.List;
/**
* netty 客户端请求类
*
* @author: www.byteblogs.com
* @date : 2023-05-11 21:28
* @since 1.3.0
*/
public interface NettyClient {
@Mapping(method = RequestMethod.GET, path = HTTP_PATH.CONFIG)
Result getConfig(Integer version);
@Mapping(method = RequestMethod.GET, path = HTTP_PATH.BEAT)
Result beat(String mark);
}

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.client.core.annotation;
package com.aizuda.easy.retry.client.common.client.annotation;
import com.aizuda.easy.retry.client.core.client.netty.RequestMethod;
import com.aizuda.easy.retry.client.common.client.netty.RequestMethod;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.client.core.cache;
package com.aizuda.easy.retry.client.common.client.cache;
import com.aizuda.easy.retry.client.core.Lifecycle;
import com.aizuda.easy.retry.client.core.client.NettyClient;

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.client.core.config;
package com.aizuda.easy.retry.client.common.client.config;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import lombok.Data;

View File

@ -0,0 +1,38 @@
package com.aizuda.easy.retry.client.common.client.exception;
import com.aizuda.easy.retry.common.core.exception.BaseEasyRetryException;
/**
* @author: www.byteblogs.com
* @date : 2022-03-03 14:49
*/
public class EasyRetryClientException extends BaseEasyRetryException {
public EasyRetryClientException(String message) {
super(message);
}
public EasyRetryClientException(String message, Throwable cause) {
super(message, cause);
}
public EasyRetryClientException(Throwable cause) {
super(cause);
}
public EasyRetryClientException(String message, Object... arguments) {
super(message, arguments);
}
public EasyRetryClientException(String message, Object[] arguments, Throwable cause) {
super(message, arguments, cause);
}
public EasyRetryClientException(String message, Object argument, Throwable cause) {
super(message, argument, cause);
}
public EasyRetryClientException(String message, Object argument) {
super(message, argument);
}
}

View File

@ -0,0 +1,38 @@
package com.aizuda.easy.retry.client.common.client.exception;
import com.aizuda.easy.retry.common.core.exception.BaseEasyRetryException;
/**
* @author: www.byteblogs.com
* @date : 2022-03-03 14:49
*/
public class EasyRetryClientTimeOutException extends BaseEasyRetryException {
public EasyRetryClientTimeOutException(String message) {
super(message);
}
public EasyRetryClientTimeOutException(String message, Throwable cause) {
super(message, cause);
}
public EasyRetryClientTimeOutException(Throwable cause) {
super(cause);
}
public EasyRetryClientTimeOutException(String message, Object... arguments) {
super(message, arguments);
}
public EasyRetryClientTimeOutException(String message, Object[] arguments, Throwable cause) {
super(message, arguments, cause);
}
public EasyRetryClientTimeOutException(String message, Object argument, Throwable cause) {
super(message, argument, cause);
}
public EasyRetryClientTimeOutException(String message, Object argument) {
super(message, argument);
}
}

View File

@ -1,8 +1,8 @@
package com.aizuda.easy.retry.client.core.client.netty;
package com.aizuda.easy.retry.client.common.client.netty;
import cn.hutool.core.util.IdUtil;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -16,7 +16,6 @@ import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import java.nio.charset.StandardCharsets;
@ -79,7 +78,7 @@ public class NettyChannel {
String host = easyRetryProperties.getHost();
// 获取客户端指定的IP地址
if (StringUtils.isBlank(host)) {
if (StrUtil.isBlank(host)) {
host = HOST;
}

View File

@ -1,10 +1,8 @@
package com.aizuda.easy.retry.client.core.client.netty;
package com.aizuda.easy.retry.client.common.client.netty;
import com.aizuda.easy.retry.client.core.client.NettyClient;
import com.aizuda.easy.retry.client.core.client.proxy.RequestBuilder;
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.client.common.client.proxy.RequestBuilder;
import com.aizuda.easy.retry.client.common.client.NettyClient;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.BEAT;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
@ -112,7 +110,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
LogUtils.debug(log,"userEventTriggered");
if (evt instanceof IdleStateEvent) {
client.beat(PING);
client.beat(BEAT.PING);
} else {
super.userEventTriggered(ctx, evt);
}

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.client.core.client.netty;
package com.aizuda.easy.retry.client.common.client.netty;
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.core.Lifecycle;
import com.aizuda.easy.retry.client.common.client.Lifecycle;
import com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.client.core.client.netty;
package com.aizuda.easy.retry.client.common.client.netty;
/**
* 请求类型

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.client.core.client.netty;
package com.aizuda.easy.retry.client.common.client.netty;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;

View File

@ -1,15 +1,14 @@
package com.aizuda.easy.retry.client.core.client.proxy;
package com.aizuda.easy.retry.client.common.client.proxy;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.core.annotation.Mapping;
import com.aizuda.easy.retry.client.core.client.netty.NettyChannel;
import com.aizuda.easy.retry.client.core.client.netty.RpcContext;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientTimeOutException;
import com.aizuda.easy.retry.client.common.client.annotation.Mapping;
import com.aizuda.easy.retry.client.common.client.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.common.client.exception.EasyRetryClientTimeOutException;
import com.aizuda.easy.retry.client.common.client.netty.NettyChannel;
import com.aizuda.easy.retry.client.common.client.netty.RpcContext;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.client.core.client.proxy;
package com.aizuda.easy.retry.client.common.client.proxy;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.common.client.exception.EasyRetryClientException;
import java.lang.reflect.Proxy;
import java.util.Objects;

View File

@ -0,0 +1,31 @@
{
"groups": [
{
"name": "easy-retry",
"type": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties"
},
{
"name": "easy-retry.server",
"sourceMethod": "getServer()",
"type": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties$ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties"
}
],
"properties": [
{
"name": "easy-retry.server.host",
"type": "java.lang.String",
"defaultValue": "127.0.0.1",
"description": "服务端的地址,若服务端集群部署则此处配置域名",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties$ServerConfig"
},
{
"name": "easy-retry.server.port",
"type": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties.ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties",
"description": "服务端netty的端口号",
"defaultValue": "1788"
}
]
}

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry</artifactId>
<artifactId>easy-retry-client</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -75,6 +75,10 @@
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-common-client-api</artifactId>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-client-common</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,27 @@
package com.aizuda.easy.retry.client.core.client;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import org.springframework.web.bind.annotation.RequestMethod;
import java.util.List;
/**
* netty 客户端请求类
*
* @author: www.byteblogs.com
* @date : 2023-05-11 21:28
* @since 1.3.0
*/
public interface NettyClient {
@Mapping(method = RequestMethod.GET, path = HTTP_PATH.CONFIG)
Result getConfig(Integer version);
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.BATCH_REPORT)
NettyResult reportRetryInfo(List<RetryTaskDTO> list);
}

View File

@ -8,7 +8,6 @@ import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**

View File

@ -8,8 +8,6 @@ import org.slf4j.helpers.MessageFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.ConfigurableBootstrapContext;
import org.springframework.boot.SpringApplicationRunListener;
import org.springframework.stereotype.Component;
import java.util.List;

View File

@ -25,8 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.aop.AfterAdvice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.env.StandardEnvironment;

View File

@ -2,7 +2,6 @@ package com.aizuda.easy.retry.client.core.report;
import com.aizuda.easy.retry.client.core.client.NettyClient;
import com.aizuda.easy.retry.client.core.client.proxy.RequestBuilder;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot;
import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.NettyResult;
@ -11,7 +10,6 @@ import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

View File

@ -1,9 +1,7 @@
package com.aizuda.easy.retry.client.core.strategy;
import cn.hutool.core.util.IdUtil;
import com.aizuda.easy.retry.client.core.RetryExecutor;
import com.aizuda.easy.retry.client.core.RetryExecutorParameter;
import com.aizuda.easy.retry.client.core.annotation.Retryable;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot;
@ -12,14 +10,12 @@ import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;
import com.aizuda.easy.retry.client.core.retryer.RetryerResultContext;
import com.aizuda.easy.retry.common.core.enums.RetryResultStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.StopStrategy;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.google.common.base.Predicate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

View File

@ -0,0 +1,31 @@
{
"groups": [
{
"name": "easy-retry",
"type": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties"
},
{
"name": "easy-retry.server",
"sourceMethod": "getServer()",
"type": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties$ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties"
}
],
"properties": [
{
"name": "easy-retry.server.host",
"type": "java.lang.String",
"defaultValue": "127.0.0.1",
"description": "服务端的地址,若服务端集群部署则此处配置域名",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties$ServerConfig"
},
{
"name": "easy-retry.server.port",
"type": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties.ServerConfig",
"sourceType": "com.aizuda.easy.retry.client.common.client.config.EasyRetryProperties",
"description": "服务端netty的端口号",
"defaultValue": "1788"
}
]
}

View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-client</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>easy-retry-client-job-core</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

37
easy-retry-client/pom.xml Normal file
View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>easy-retry-client</artifactId>
<packaging>pom</packaging>
<modules>
<module>easy-retry-client-common</module>
<module>easy-retry-client-job-core</module>
<module>easy-retry-client-core</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-client-common</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -0,0 +1,24 @@
package com.aizuda.easy.retry.client.model;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 15:10
*/
@Data
public class DispatchJobDTO {
@NotNull(message = "jobId 不能为空")
private Long jobId;
@NotNull(message = "taskId 不能为空")
private Long taskId;
@NotBlank(message = "group 不能为空")
private String groupName;
}

View File

@ -0,0 +1,24 @@
package com.aizuda.easy.retry.client.model;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 15:10
*/
@Data
public class InterruptJobDTO {
@NotNull(message = "jobId 不能为空")
private Long jobId;
@NotNull(message = "taskId 不能为空")
private Long taskId;
@NotBlank(message = "group 不能为空")
private String groupName;
}

View File

@ -51,6 +51,11 @@ public class JobTask implements Serializable {
*/
private Integer taskStatus;
/**
* 客户端节点id
*/
private String hostId;
/**
* 创建时间
*/

View File

@ -26,8 +26,8 @@ public class ActorGenerator {
/*----------------------------------------分布式任务调度----------------------------------------*/
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
private ActorGenerator() {}
@ -130,6 +130,7 @@ public class ActorGenerator {
return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR));
}
/**
* Job调度准备阶段actor
*
@ -139,6 +140,15 @@ public class ActorGenerator {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR));
}
/**
* Job任务执行阶段actor
*
* @return actor 引用
*/
public static ActorRef jobTaskExecutorActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_ACTOR));
}
public static SpringExtension getSpringExtension() {
return SpringContext.getBeanByType(SpringExtension.class);
}

View File

@ -1,7 +1,9 @@
package com.aizuda.easy.retry.server.common.client;
import com.aizuda.easy.retry.client.model.DispatchJobDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.InterruptJobDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
@ -24,4 +26,10 @@ public interface RpcClient {
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
Result callback(@Body RetryCallbackDTO retryCallbackDTO);
@Mapping(path = "/job/interrupt/v1", method = RequestMethod.POST)
Result<Boolean> interrupt(@Body InterruptJobDTO interruptJobDTO);
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
Result dispatch(@Body DispatchJobDTO dispatchJobDTO);
}

View File

@ -0,0 +1,13 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 15:39
*/
@Data
public class TaskExecuteDTO {
private Long taskId;
}

View File

@ -0,0 +1,52 @@
package com.aizuda.easy.retry.server.job.task.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 14:26
*/
@AllArgsConstructor
@Getter
public enum TaskStatusEnum {
/**
* 待处理
*/
WAIT(10),
/**
* 处理中
*/
PROCESSING(20),
/**
* 处理中
*/
PROCESSED_SUCCESS(21),
/**
* 处理中
*/
PROCESSED_FAIL(22),
/**
* 中断中
*/
INTERRUPTING(30),
/**
* 中断成功
*/
INTERRUPT_SUCCESS(31),
/**
* 中断失败
*/
INTERRUPT_FAIL(32),
;
private final int status;
}

View File

@ -1,9 +1,21 @@
package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.client.model.DispatchJobDTO;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.client.RpcClient;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -12,24 +24,43 @@ import org.springframework.stereotype.Component;
* @author: www.byteblogs.com
* @date : 2023-09-25 17:41
*/
@Component(ActorGenerator.SCAN_JOB_ACTOR)
@Component(ActorGenerator.JOB_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobExecutorActor extends AbstractActor {
@Autowired
private JobTaskMapper jobTaskMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(JobContext.class, jobContext -> {
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
try {
doExecute(jobContext);
doExecute(taskExecute);
} catch (Exception e) {
LogUtils.error(log, "job executor exception. [{}]", jobContext, e);
LogUtils.error(log, "job executor exception. [{}]", taskExecute, e);
}
}).build();
}
private void doExecute(final JobContext jobContext) {
private void doExecute(final TaskExecuteDTO taskExecute) {
// 调度客户端
JobTask jobTask = jobTaskMapper.selectById(taskExecute.getTaskId());
RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(jobTask.getGroupName(), jobTask.getHostId());
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
.hostPort(registerNodeInfo.getHostPort())
.groupName(registerNodeInfo.getGroupName())
.hostId(registerNodeInfo.getHostId())
.hostIp(registerNodeInfo.getHostIp())
.contextPath(registerNodeInfo.getContextPath())
.client(RpcClient.class)
.build();
DispatchJobDTO dispatchJobDTO = new DispatchJobDTO();
dispatchJobDTO.setJobId(jobTask.getJobId());
dispatchJobDTO.setTaskId(jobTask.getId());
dispatchJobDTO.setGroupName(jobTask.getGroupName());
Result dispatch = rpcClient.dispatch(dispatchJobDTO);
}
}

View File

@ -2,12 +2,14 @@ package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskStatusEnum;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
@ -19,9 +21,13 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.time.ZoneId;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 调度任务准备阶段
*
* @author www.byteblogs.com
* @date 2023-09-25 22:20:53
* @since
@ -30,6 +36,7 @@ import java.util.concurrent.TimeUnit;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobTaskPrepareActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
@Autowired
@ -50,21 +57,39 @@ public class JobTaskPrepareActor extends AbstractActor {
private void doPrepare(JobTaskPrepareDTO prepare) {
Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper<JobTask>().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus()));
if (count <= 0) {
JobTask jobTask = jobTaskMapper.selectOne(new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getJobId, prepare.getJobId())
.in(JobTask::getTaskStatus,
TaskStatusEnum.WAIT.getStatus(), TaskStatusEnum.INTERRUPTING.getStatus(),
TaskStatusEnum.INTERRUPTING.getStatus()));
if (Objects.isNull(jobTask)) {
// 生成可执行任务
JobTask jobTask = new JobTask();
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(prepare.getGroupName());
if (Objects.isNull(serverNode)) {
log.error("无可执行的客户端信息. jobId:[{}]", prepare.getJobId());
return;
}
jobTask = new JobTask();
jobTask.setHostId(serverNode.getHostId());
jobTask.setJobId(prepare.getJobId());
jobTask.setGroupName(prepare.getGroupName());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", prepare.getJobId()));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", prepare.getJobId()));
JobContext jobContext = new JobContext();
// 进入时间轮
JobTimerWheelHandler.register(prepare.getGroupName(), prepare.getJobId().toString(), new JobTimerTask(jobContext), 1, TimeUnit.MILLISECONDS);
JobContext jobContext = new JobContext();
jobContext.setRegisterNodeInfo(serverNode);
long delay = prepare.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis();
JobTimerWheelHandler.register(prepare.getGroupName(), prepare.getJobId().toString(),
new JobTimerTask(jobTask.getId(), jobTask.getGroupName()), delay, TimeUnit.MILLISECONDS);
} else {
BlockStrategies.BlockStrategyContext blockStrategyContext = new BlockStrategies.BlockStrategyContext();
BlockStrategy blockStrategy = BlockStrategies.BlockStrategyEnum.getBlockStrategy(prepare.getBlockStrategy());
blockStrategyContext.setRegisterNodeInfo(CacheRegisterTable.getServerNode(jobTask.getGroupName(), jobTask.getHostId()));
BlockStrategy blockStrategy = BlockStrategies.BlockStrategyEnum.getBlockStrategy(
prepare.getBlockStrategy());
blockStrategy.block(blockStrategyContext);
}

View File

@ -1,23 +1,36 @@
package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:28
*/
@AllArgsConstructor
@Slf4j
public class JobTimerTask implements TimerTask {
private JobContext jobContext;
public JobTimerTask(JobContext jobContext) {
this.jobContext = jobContext;
}
private Long taskId;
private String groupName;
@Override
public void run(final Timeout timeout) throws Exception {
// 执行任务调度
log.info("开始执行任务调度. 当前时间:[{}]", LocalDateTime.now());
// 先清除时间轮的缓存
JobTimerWheelHandler.clearCache(groupName, taskId.toString());
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
}
}

View File

@ -1,9 +1,16 @@
package com.aizuda.easy.retry.server.job.task.strategy;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.model.InterruptJobDTO;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.client.RpcClient;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.enums.TaskStatusEnum;
import com.aizuda.easy.retry.server.job.task.scan.JobContext;
import com.aizuda.easy.retry.server.job.task.scan.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.scan.JobTimerWheelHandler;
@ -41,10 +48,9 @@ public class BlockStrategies {
}
}
return null;
throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
}
@Data
@ -52,7 +58,11 @@ public class BlockStrategies {
private Long jobId;
private Job job;
private Long taskId;
private String groupName;
private RegisterNodeInfo registerNodeInfo;
}
private static final class DiscardBlockStrategy implements BlockStrategy {
@ -70,6 +80,46 @@ public class BlockStrategies {
public boolean block(final BlockStrategyContext context) {
log.warn("阻塞策略为覆盖. jobId:[{}]", context.getJobId());
// 向客户端发送中断执行指令
RegisterNodeInfo registerNodeInfo = context.registerNodeInfo;
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
.hostPort(registerNodeInfo.getHostPort())
.groupName(registerNodeInfo.getGroupName())
.hostId(registerNodeInfo.getHostId())
.hostIp(registerNodeInfo.getHostIp())
.contextPath(registerNodeInfo.getContextPath())
.client(RpcClient.class)
.build();
InterruptJobDTO interruptJobDTO = new InterruptJobDTO();
interruptJobDTO.setTaskId(context.getTaskId());
interruptJobDTO.setGroupName(context.getGroupName());
interruptJobDTO.setJobId(context.getJobId());
// TODO 处理结果
Result<Boolean> result = rpcClient.interrupt(interruptJobDTO);
Integer taskStatus;
if (result.getStatus() == StatusEnum.YES.getStatus() && Boolean.TRUE.equals(result.getData())) {
taskStatus = TaskStatusEnum.INTERRUPT_SUCCESS.getStatus();
// 生成一个新的任务
JobTask jobTask = new JobTask();
jobTask.setJobId(context.getJobId());
jobTask.setGroupName(context.getGroupName());
JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class);
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
JobContext jobContext = new JobContext();
// 进入时间轮
JobTimerWheelHandler.register(context.getGroupName(), context.getJobId().toString(), new JobTimerTask(jobTask.getJobId(), jobTask.getGroupName()), 1, TimeUnit.MILLISECONDS);
} else {
taskStatus = TaskStatusEnum.INTERRUPT_FAIL.getStatus();
}
JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class);
JobTask jobTask = new JobTask();
jobTask.setTaskStatus(taskStatus);
Assert.isTrue(1 == jobTaskMapper.updateById(jobTask), ()-> new EasyRetryServerException("更新调度任务失败. jopId:[{}]", context.getJobId()));
return true;
}
@ -80,17 +130,15 @@ public class BlockStrategies {
@Override
public boolean block(final BlockStrategyContext context) {
log.warn("阻塞策略为并行执行. jobId:[{}]", context.getJobId());
Job job = context.getJob();
JobTask jobTask = new JobTask();
jobTask.setJobId(job.getId());
jobTask.setGroupName(job.getGroupName());
jobTask.setJobId(context.getJobId());
jobTask.setGroupName(context.getGroupName());
JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class);
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", job.getId()));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
JobContext jobContext = new JobContext();
// 进入时间轮
JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(jobContext), 1, TimeUnit.MILLISECONDS);
JobTimerWheelHandler.register(context.getGroupName(), context.getJobId().toString(), new JobTimerTask(jobTask.getJobId(), jobTask.getGroupName()), 1, TimeUnit.MILLISECONDS);
return false;
}

View File

@ -31,11 +31,11 @@
</properties>
<modules>
<module>easy-retry-client-core</module>
<module>easy-retry-common</module>
<module>easy-retry-server</module>
<module>easy-retry-client-starter</module>
<module>easy-retry-datasource</module>
<module>easy-retry-client</module>
</modules>
<dependencyManagement>