feat:2.4.0

1. 调通客户端与服务端
This commit is contained in:
byteblogs168 2023-09-29 23:13:45 +08:00
parent 0e94f87742
commit a5ea2a6651
143 changed files with 3705 additions and 878 deletions

View File

@ -15,6 +15,7 @@ CREATE TABLE `group_config`
`route_key` tinyint(4) NOT NULL COMMENT '路由策略',
`id_generator_mode` tinyint(4) NOT NULL DEFAULT '1' COMMENT '唯一id生成模式 默认号段模式',
`init_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否初始化场景 0:否 1:是',
`bucket_index` int(11) DEFAULT NULL COMMENT 'bucket',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
@ -134,7 +135,6 @@ CREATE TABLE `scene_config`
`back_off` tinyint(4) NOT NULL DEFAULT '1' COMMENT '1、默认等级 2、固定间隔时间 3、CRON 表达式',
`trigger_interval` varchar(16) NOT NULL DEFAULT '' COMMENT '间隔时长',
`deadline_request` bigint(20) unsigned NOT NULL DEFAULT '60000' COMMENT 'Deadline Request 调用链超时 单位毫秒',
`bucket_index` int(11) DEFAULT NULL COMMENT 'bucket',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
@ -215,64 +215,76 @@ CREATE TABLE `sequence_alloc`
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='号段模式序号ID分配表';
-- 分布式调度DDL
CREATE TABLE `job` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_name` VARCHAR ( 64 ) NOT NULL COMMENT '名称',
`args_str` TEXT NOT NULL COMMENT '执行方法参数',
`args_type` VARCHAR ( 16 ) NOT NULL DEFAULT '' COMMENT '参数类型 text/json',
`ext_attrs` TEXT NOT NULL COMMENT '扩展字段',
`next_trigger_at` DATETIME NOT NULL COMMENT '下次触发时间',
`job_status` TINYINT ( 4 ) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启',
`route_key` VARCHAR ( 50 ) DEFAULT NULL COMMENT '执行器路由策略',
`executor_type` TINYINT ( 4 ) NOT NULL DEFAULT '1' COMMENT '执行器类型 1、Java',
`executor_name` VARCHAR ( 255 ) DEFAULT NULL COMMENT '执行器名称',
`block_strategy` VARCHAR ( 50 ) DEFAULT NULL COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
`executor_timeout` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`max_retry_times` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '最大重试次数',
`retry_interval` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)',
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_name` varchar(64) NOT NULL COMMENT '名称',
`args_str` text NOT NULL COMMENT '执行方法参数',
`args_type` varchar(16) NOT NULL DEFAULT '' COMMENT '参数类型 text/json',
`ext_attrs` text NOT NULL COMMENT '扩展字段',
`next_trigger_at` datetime NOT NULL COMMENT '下次触发时间',
`job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启',
`task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片',
`route_key` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '执行器类型 1、Java',
`executor_name` varchar(255) DEFAULT NULL COMMENT '执行器名称',
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
`block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`max_retry_times` int(11) NOT NULL DEFAULT '0' COMMENT '最大重试次数',
`parallel_num` int(11) NOT NULL DEFAULT '1' COMMENT '并行数',
`retry_interval` int(11) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)',
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
PRIMARY KEY ( `id` ),
KEY `idx_group_name` ( `group_name` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '任务信息';
CREATE TABLE `job_task` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_id` BIGINT ( 20 ) NOT NULL COMMENT '任务id',
`retry_count` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '重试次数',
`task_status` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '任务状态 0、失败 1、成功',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '调度任务';
CREATE TABLE `job_task_instance` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_id` BIGINT ( 20 ) NOT NULL COMMENT '任务信息id',
`task_id` BIGINT ( 20 ) NOT NULL COMMENT '调度任务id',
`parent_id` BIGINT ( 20 ) NOT NULL DEFAULT '0' COMMENT '父执行器id',
`execute_status` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '执行的状态 0、失败 1、成功',
`result_message` TEXT NOT NULL COMMENT '执行结果',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '任务实例';
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
PRIMARY KEY (`id`),
KEY `idx_group_name` (`group_name`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='任务信息';
CREATE TABLE `job_log_message` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_id` BIGINT ( 20 ) NOT NULL COMMENT '任务信息id',
`task_id` BIGINT ( 20 ) NOT NULL COMMENT '任务实例id',
`task_instance_id` BIGINT ( 20 ) NOT NULL COMMENT '调度任务id',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`message` TEXT NOT NULL COMMENT '调度信息',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '调度日志';
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
`task_batch_id` bigint(20) NOT NULL COMMENT '任务批次id',
`task_id` bigint(20) NOT NULL COMMENT '调度任务id',
`client_address` varchar(255) DEFAULT NULL COMMENT '客户端地址',
`message` text NOT NULL COMMENT '调度信息',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='调度日志';
CREATE TABLE `job_task` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
`task_batch_id` bigint(20) NOT NULL COMMENT '调度任务id',
`parent_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '父执行器id',
`execute_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行的状态 0、失败 1、成功',
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`client_id` varchar(255) DEFAULT NULL COMMENT '客户端地址',
`result_message` text NOT NULL COMMENT '执行结果',
`args_str` text NOT NULL COMMENT '执行方法参数',
`args_type` varchar(16) NOT NULL DEFAULT '' COMMENT '参数类型 text/json',
`ext_attrs` text NOT NULL COMMENT '扩展字段',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='任务实例';
CREATE TABLE `job_task_batch` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`task_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`execution_at` datetime DEFAULT NULL COMMENT '任务执行时间',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4 COMMENT='任务批次';

View File

@ -26,6 +26,14 @@
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-client-core</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-client-job-core</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>

View File

@ -1,11 +1,13 @@
package com.aizuda.easy.retry.client.starter;
import com.aizuda.easy.retry.client.core.annotation.Retryable;
import com.aizuda.easy.retry.client.core.intercepter.EasyRetryInterceptor;
import com.aizuda.easy.retry.client.core.intercepter.EasyRetryPointcutAdvisor;
import com.aizuda.easy.retry.client.core.strategy.RetryStrategy;
import org.aopalliance.intercept.MethodInterceptor;
import org.springframework.aop.Advisor;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@ -16,20 +18,21 @@ import org.springframework.core.env.StandardEnvironment;
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ComponentScan("com.aizuda.easy.retry.client.core")
@ConditionalOnClass(Retryable.class)
@ComponentScan({"com.aizuda.easy.retry.client.core", "com.aizuda.easy.retry.client.common"})
@ConditionalOnProperty(prefix = "easy-retry", name = "enabled", havingValue = "true")
public class EasyRetryClientAutoConfiguration {
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public Advisor easyRetryPointcutAdvisor (MethodInterceptor easyRetryInterceptor) {
public Advisor easyRetryPointcutAdvisor(MethodInterceptor easyRetryInterceptor) {
return new EasyRetryPointcutAdvisor(easyRetryInterceptor);
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public MethodInterceptor easyRetryInterceptor(StandardEnvironment standardEnvironment,
@Lazy RetryStrategy localRetryStrategies) {
@Lazy RetryStrategy localRetryStrategies) {
return new EasyRetryInterceptor(standardEnvironment, localRetryStrategies);
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.client.starter;
import com.aizuda.easy.retry.client.core.annotation.Retryable;
import com.aizuda.easy.retry.client.core.intercepter.EasyRetryInterceptor;
import com.aizuda.easy.retry.client.core.intercepter.EasyRetryPointcutAdvisor;
import com.aizuda.easy.retry.client.core.strategy.RetryStrategy;
import com.aizuda.easy.retry.client.job.core.annotation.JobExecutor;
import org.aopalliance.intercept.MethodInterceptor;
import org.springframework.aop.Advisor;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.*;
import org.springframework.core.env.StandardEnvironment;
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ComponentScan({"com.aizuda.easy.retry.client.job.core", "com.aizuda.easy.retry.client.common"})
@ConditionalOnClass(JobExecutor.class)
@ConditionalOnProperty(prefix = "easy-retry", name = "enabled", havingValue = "true")
public class EasyRetryJobClientAutoConfiguration {
}

View File

@ -1,2 +1,3 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.aizuda.easy.retry.client.starter.EasyRetryClientAutoConfiguration
com.aizuda.easy.retry.client.starter.EasyRetryClientAutoConfiguration,\
com.aizuda.easy.retry.client.starter.EasyRetryJobClientAutoConfiguration

View File

@ -37,6 +37,10 @@
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-common-core</artifactId>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-common-server-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.client.core.cache;
package com.aizuda.easy.retry.client.common.cache;
import com.aizuda.easy.retry.client.common.Lifecycle;
import com.aizuda.easy.retry.client.common.NettyClient;

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.client.common.netty;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
@ -93,8 +94,7 @@ public class NettyChannel {
.set(HeadersEnum.GROUP_NAME.getKey(), EasyRetryProperties.getGroup())
.set(HeadersEnum.CONTEXT_PATH.getKey(), Optional.ofNullable(serverProperties.getServlet().getContextPath()).orElse("/"))
.set(HeadersEnum.HOST_PORT.getKey(), port)
// TODO 待办
// .set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion())
.set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion())
;
//发送数据

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.client.core.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.core.IdempotentIdGenerate;
import com.aizuda.easy.retry.client.core.RetryArgSerializer;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.cache.RetryerInfoCache;
import com.aizuda.easy.retry.client.core.callback.RetryCompleteCallback;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;

View File

@ -4,7 +4,7 @@ import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.core.annotation.Retryable;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.cache.RetryerInfoCache;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot.EnumStage;
import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;

View File

@ -4,7 +4,7 @@ import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.common.proxy.RequestBuilder;
import com.aizuda.easy.retry.client.core.RetryExecutor;
import com.aizuda.easy.retry.client.core.RetryExecutorParameter;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.client.NettyClient;
import com.aizuda.easy.retry.client.core.executor.GuavaRetryExecutor;
import com.aizuda.easy.retry.common.core.alarm.Alarm;

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.client.core.strategy;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.core.RetryExecutor;
import com.aizuda.easy.retry.client.core.RetryExecutorParameter;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.event.EasyRetryListener;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot;
import com.aizuda.easy.retry.client.core.Report;

View File

@ -2,7 +2,6 @@ package com.aizuda.easy.retry.client.core.strategy;
import com.aizuda.easy.retry.client.core.RetryExecutor;
import com.aizuda.easy.retry.client.core.RetryExecutorParameter;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot;
import com.aizuda.easy.retry.client.core.retryer.RetryType;

View File

@ -1,8 +1,7 @@
package com.aizuda.easy.retry.client.job.core;
import com.aizuda.easy.retry.client.job.core.dto.ExecuteResult;
import java.util.concurrent.Callable;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
/**
* job执行者
@ -11,7 +10,6 @@ import java.util.concurrent.Callable;
* @date : 2023-09-27 09:38
* @since 2.4.0
*/
public interface IJobExecutor extends Callable<ExecuteResult> {
String getName();
public interface IJobExecutor {
void jobExecute(JobContext jobContext);
}

View File

@ -1,9 +0,0 @@
package com.aizuda.easy.retry.client.job.core;
/**
* @author www.byteblogs.com
* @date 2023-09-27 22:31:59
* @since
*/
public class JobScanner {
}

View File

@ -0,0 +1,40 @@
package com.aizuda.easy.retry.client.job.core.cache;
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.Tables;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author www.byteblogs.com
* @date 2023-10-08 23:03:53
* @since 2.4.0
*/
public class FutureCache {
private static final Table<Long, Long, ListenableFuture<ExecuteResult>> futureCache = HashBasedTable.create();
public static void addFuture(Long taskBatchId, Long taskId, ListenableFuture<ExecuteResult> future) {
futureCache.put(taskBatchId, taskId, future);
}
public static void remove(Long taskBatchId, Long taskId) {
ListenableFuture<ExecuteResult> future = futureCache.get(taskBatchId, taskId);
future.cancel(true);
futureCache.remove(taskBatchId, taskId);
}
public static void remove(Long taskBatchId) {
Map<Long, ListenableFuture<ExecuteResult>> futureMap = futureCache.row(taskBatchId);
futureMap.forEach((taskId, future) -> {
future.cancel(true);
futureCache.remove(taskBatchId, taskId);
});
}
}

View File

@ -8,9 +8,12 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* @author: www.byteblogs.com
* @date : 2022-03-03 17:43
* @since : 2.4.0
*/
public class JobExecutorInfoCache {
private JobExecutorInfoCache() {}
private static final ConcurrentHashMap<String, JobExecutorInfo> JOB_EXECUTOR_REPOSITORY = new ConcurrentHashMap<>();
public static void put(JobExecutorInfo jobExecutorInfo) {
@ -22,7 +25,7 @@ public class JobExecutorInfoCache {
}
public static boolean isExisted(String executorName) {
return !Objects.nonNull(JOB_EXECUTOR_REPOSITORY.get(executorName));
return Objects.nonNull(JOB_EXECUTOR_REPOSITORY.get(executorName));
}
}

View File

@ -9,22 +9,38 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* TODO 任务执行完成了该如何优雅的终止线程池?????
*
* @author: www.byteblogs.com
* @date : 2023-09-27 17:12
* @since : 2.4.0
*/
@Component
public class ThreadPoolCache {
private static final ConcurrentHashMap<Long, ThreadPoolExecutor> CACHE_THREAD_POOL = new ConcurrentHashMap<>();
public static ThreadPoolExecutor createThreadPool(Long taskId, int parallelNum) {
Supplier<ThreadPoolExecutor> supplier = () -> new ThreadPoolExecutor(
parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()
);
return CACHE_THREAD_POOL.putIfAbsent(taskId, supplier.get());
public static ThreadPoolExecutor createThreadPool(Long taskBatchId, int parallelNum) {
Supplier<ThreadPoolExecutor> supplier = () -> {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
};
ThreadPoolExecutor threadPoolExecutor = supplier.get();
CACHE_THREAD_POOL.putIfAbsent(taskBatchId, supplier.get());
return threadPoolExecutor;
}
public static void getThreadPool(Long taskId, int parallelNum) {
public static ThreadPoolExecutor getThreadPool(Long taskBatchId) {
return CACHE_THREAD_POOL.get(taskBatchId);
}
public static void stopThreadPool(Long taskBatchId) {
FutureCache.remove(taskBatchId);
ThreadPoolExecutor threadPoolExecutor = CACHE_THREAD_POOL.get(taskBatchId);
threadPoolExecutor.shutdownNow();
}
}

View File

@ -1,24 +1,24 @@
package com.aizuda.easy.retry.client.job.core.client;
import com.aizuda.easy.retry.client.job.core.IJobExecutor;
import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
import com.aizuda.easy.retry.client.job.core.dto.ExecuteResult;
import com.aizuda.easy.retry.client.job.core.executor.AnnotationJobExecutor;
import com.aizuda.easy.retry.client.job.core.executor.NormalJobExecutor;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import com.aizuda.easy.retry.client.model.DispatchJobDTO;
import com.aizuda.easy.retry.client.model.InterruptJobDTO;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.model.Result;
import com.google.common.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ReflectionUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -29,46 +29,40 @@ import java.util.concurrent.ThreadPoolExecutor;
@RequestMapping("/job")
@Slf4j
public class JobEndPoint {
@PostMapping("/dispatch/v1")
public Result<Boolean> dispatchJob(@RequestBody @Validated DispatchJobDTO dispatchJob) {
// 创建可执行的任务
ThreadPoolExecutor threadPool = ThreadPoolCache.createThreadPool(dispatchJob.getTaskId(), dispatchJob.getParallelNum());
ListeningExecutorService decorator = MoreExecutors.listeningDecorator(threadPool);
public Result<Boolean> dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) {
JobContext jobContext = new JobContext();
jobContext.setJobId(dispatchJob.getJobId());
jobContext.setTaskId(dispatchJob.getTaskId());
jobContext.setTaskBatchId(dispatchJob.getTaskBatchId());
jobContext.setGroupName(dispatchJob.getGroupName());
jobContext.setExecutorName(dispatchJob.getExecutorName());
jobContext.setParallelNum(dispatchJob.getParallelNum());
jobContext.setTaskType(dispatchJob.getTaskType());
jobContext.setExecutorTimeout(dispatchJob.getExecutorTimeout());
String executorName = dispatchJob.getExecutorName();
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(executorName);
// 执行任务
ListenableFuture<ExecuteResult> submit = decorator.submit(() -> {
return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getMethod(), jobExecutorInfo.getExecutor(), jobContext);
});
Futures.addCallback(submit, new FutureCallback<ExecuteResult>() {
@Override
public void onSuccess(ExecuteResult result) {
// 上报执行成功
}
@Override
public void onFailure(Throwable t) {
// 上报执行失败
}
}, threadPool);
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorName());
if (jobExecutorInfo.isAnnotation()) {
IJobExecutor iJobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);
iJobExecutor.jobExecute(jobContext);
} else {
NormalJobExecutor normalJobExecutor = (NormalJobExecutor) jobExecutorInfo.getExecutor();
normalJobExecutor.jobExecute(jobContext);
}
return new Result<>(Boolean.TRUE);
}
@PostMapping("/interrupt/v1")
public Result<Boolean> dispatchJob(@RequestBody @Validated InterruptJobDTO interruptJob) {
@PostMapping("/stop/v1")
public Result<Boolean> stopJob(@RequestBody @Validated StopJobDTO interruptJob) {
ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskId());
if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {
return new Result<>(Boolean.TRUE);
}
return new Result<>(Boolean.TRUE);
ThreadPoolCache.stopThreadPool(interruptJob.getTaskId());
return new Result<>(threadPool.isShutdown() || threadPool.isTerminated());
}
}

View File

@ -0,0 +1,22 @@
package com.aizuda.easy.retry.client.job.core.client;
import com.aizuda.easy.retry.client.common.annotation.Mapping;
import com.aizuda.easy.retry.client.common.netty.RequestMethod;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.easy.retry.common.core.model.Result;
/**
* netty 客户端请求类
*
* @author: www.byteblogs.com
* @date : 2023-05-11 21:28
* @since 2.4.0
*/
public interface JobNettyClient {
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_JOB_DISPATCH_RESULT)
Result dispatchResult(DispatchJobResultRequest request);
}

View File

@ -1,9 +0,0 @@
package com.aizuda.easy.retry.client.job.core.dto;
/**
* @author: www.byteblogs.com
* @date : 2023-09-27 09:43
*/
public class ExecuteResult {
}

View File

@ -11,7 +11,24 @@ public class JobContext {
private Long jobId;
private Long taskBatchId;
private Long taskId;
private String groupName;
private String executorName;
/**
* 任务类型
*/
private Integer taskType;
private Integer parallelNum;
private Integer shardingTotal;
private Integer shardingIndex;
private Integer executorTimeout;
}

View File

@ -18,5 +18,8 @@ public class JobExecutorInfo {
private final Method method;
Object executor;
private Object executor;
private boolean isAnnotation;
}

View File

@ -0,0 +1,48 @@
package com.aizuda.easy.retry.client.job.core.executor;
import com.aizuda.easy.retry.client.job.core.IJobExecutor;
import com.aizuda.easy.retry.client.job.core.cache.FutureCache;
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
import com.aizuda.easy.retry.client.job.core.timer.StopTaskTimerTask;
import com.aizuda.easy.retry.client.job.core.timer.TimerManager;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 广播模式
*
* @author: www.byteblogs.com
* @date : 2023-09-27 09:48
* @since 2.4.0
*/
abstract class AbstractJobExecutor implements IJobExecutor {
@Override
public void jobExecute(JobContext jobContext) {
// 创建可执行的任务
ThreadPoolExecutor threadPool = ThreadPoolCache.createThreadPool(jobContext.getTaskBatchId(), jobContext.getParallelNum());
ListeningExecutorService decorator = MoreExecutors.listeningDecorator(threadPool);
// 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(jobContext.getTaskBatchId()), jobContext.getExecutorTimeout(), TimeUnit.SECONDS);
// 执行任务
ListenableFuture<ExecuteResult> submit = decorator.submit(() -> doJobExecute(jobContext));
FutureCache.addFuture(jobContext.getTaskBatchId(), jobContext.getTaskId(), submit);
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator);
}
protected abstract ExecuteResult doJobExecute(JobContext jobContext);
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.client.job.core.executor;
import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
/**
* 基于注解的执行器
*
* @author www.byteblogs.com
* @date 2023-09-27 22:20:36
* @since 2.4.0
*/
@Component
public class AnnotationJobExecutor extends AbstractJobExecutor {
@Override
protected ExecuteResult doJobExecute(final JobContext jobContext) {
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorName());
return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getMethod(), jobExecutorInfo.getExecutor(), jobContext);
}
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.client.job.core.executor;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import java.util.concurrent.Callable;
/**
* @author www.byteblogs.com
* @date 2023-10-08 22:48:44
* @since 2.4.0
*/
public class JobExecutorCallable implements Callable<ExecuteResult> {
public JobExecutorCallable(ExecuteResult executeResult) {
}
@Override
public ExecuteResult call() throws Exception {
return null;
}
}

View File

@ -0,0 +1,68 @@
package com.aizuda.easy.retry.client.job.core.executor;
import com.aizuda.easy.retry.client.common.proxy.RequestBuilder;
import com.aizuda.easy.retry.client.job.core.client.JobNettyClient;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.google.common.util.concurrent.FutureCallback;
import lombok.extern.slf4j.Slf4j;
/**
* @author: www.byteblogs.com
* @date : 2023-10-08 16:44
* @since : 2.4.0
*/
@Slf4j
public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult> {
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
.client(JobNettyClient.class)
.callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build();
private JobContext jobContext;
public JobExecutorFutureCallback(final JobContext jobContext) {
this.jobContext = jobContext;
}
@Override
public void onSuccess(final ExecuteResult result) {
// 上报执行成功
log.info("任务执行成功 [{}]", JsonUtil.toJsonString(result));
DispatchJobResultRequest dispatchJobRequest = new DispatchJobResultRequest();
if (result.getStatus() == StatusEnum.NO.getStatus()) {
dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
} else {
dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
}
dispatchJobRequest.setTaskBatchId(jobContext.getTaskId());
dispatchJobRequest.setGroupName(jobContext.getGroupName());
dispatchJobRequest.setJobId(jobContext.getJobId());
dispatchJobRequest.setTaskId(jobContext.getTaskId());
dispatchJobRequest.setTaskType(jobContext.getTaskType());
dispatchJobRequest.setExecuteResult(result);
CLIENT.dispatchResult(dispatchJobRequest);
}
@Override
public void onFailure(final Throwable t) {
// 上报执行失败
log.error("任务执行失败 jobTask:[{}]", jobContext.getTaskId(), t);
DispatchJobResultRequest dispatchJobRequest = new DispatchJobResultRequest();
dispatchJobRequest.setTaskBatchId(jobContext.getTaskId());
dispatchJobRequest.setGroupName(jobContext.getGroupName());
dispatchJobRequest.setJobId(jobContext.getJobId());
dispatchJobRequest.setTaskId(jobContext.getTaskId());
dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
dispatchJobRequest.setTaskType(jobContext.getTaskType());
dispatchJobRequest.setExecuteResult(ExecuteResult.failure(t.getMessage()));
CLIENT.dispatchResult(dispatchJobRequest);
}
}

View File

@ -0,0 +1,10 @@
package com.aizuda.easy.retry.client.job.core.executor;
/**
* @author: www.byteblogs.com
* @date : 2023-10-08 17:02
* @since : 2.4.0
*/
public abstract class NormalJobExecutor extends AbstractJobExecutor {
}

View File

@ -1,26 +0,0 @@
package com.aizuda.easy.retry.client.job.core.handler;
import com.aizuda.easy.retry.client.job.core.IJobExecutor;
import com.aizuda.easy.retry.client.job.core.dto.ExecuteResult;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
/**
* 广播模式
*
* @author: www.byteblogs.com
* @date : 2023-09-27 09:48
* @since 2.4.0
*/
public abstract class AbstractIJobExecutor implements IJobExecutor {
@Override
public ExecuteResult call() throws Exception {
JobContext jobContext = new JobContext();
ExecuteResult executeResult = jobExecute(jobContext);
return executeResult;
}
protected abstract ExecuteResult jobExecute(JobContext jobContext);
}

View File

@ -1,17 +0,0 @@
package com.aizuda.easy.retry.client.job.core.handler;
import com.aizuda.easy.retry.client.job.core.dto.ExecuteResult;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
/**
* @author www.byteblogs.com
* @date 2023-09-27 22:20:36
* @since 2.4.0
*/
public abstract class SimpleIJobExecutor extends AbstractIJobExecutor {
@Override
public ExecuteResult jobExecute(JobContext jobContext) {
return null;
}
}

View File

@ -4,6 +4,7 @@ import com.aizuda.easy.retry.client.job.core.IJobExecutor;
import com.aizuda.easy.retry.client.job.core.Scanner;
import com.aizuda.easy.retry.client.job.core.annotation.JobExecutor;
import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import lombok.extern.slf4j.Slf4j;
@ -13,6 +14,7 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
@ -52,18 +54,12 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
LogUtils.error(log, "{} JobExecutor加载异常{}", beanDefinitionName, ex);
}
if (annotatedMethods == null || annotatedMethods.isEmpty()) {
continue;
}
String executorClassName = bean.getClass().getName();
// 通过实现接口进行注册
if (bean.getClass().isAssignableFrom(IJobExecutor.class)) {
IJobExecutor iJobExecutor = (IJobExecutor) bean;
String executorName = iJobExecutor.getName();
if (JobExecutorInfoCache.isExisted(executorName)) {
retryerInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean));
if (!JobExecutorInfoCache.isExisted(executorClassName)) {
retryerInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean, false));
}
}
@ -72,18 +68,22 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
JobExecutor jobExecutor = bean.getClass().getAnnotation(JobExecutor.class);
if (Objects.nonNull(jobExecutor)) {
String executorName = jobExecutor.name();
if (JobExecutorInfoCache.isExisted(executorName)) {
if (!JobExecutorInfoCache.isExisted(executorName)) {
JobExecutorInfo jobExecutorInfo =
new JobExecutorInfo(
executorName,
ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method()),
bean
ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method(), JobContext.class),
bean, true
);
retryerInfoList.add(jobExecutorInfo);
}
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
continue;
}
// 扫描方法上的注解
for (Map.Entry<Method, JobExecutor> methodEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodEntry.getKey();
@ -96,7 +96,7 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
new JobExecutorInfo(
jobExecutor.name(),
executeMethod,
bean
bean,true
);
retryerInfoList.add(jobExecutorInfo);
}

View File

@ -0,0 +1,24 @@
package com.aizuda.easy.retry.client.job.core.timer;
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
/**
* @author www.byteblogs.com
* @date 2023-10-08 22:28:53
* @since 2.4.0
*/
public class StopTaskTimerTask implements TimerTask {
private Long taskBatchId;
public StopTaskTimerTask(Long taskBatchId) {
this.taskBatchId = taskBatchId;
}
@Override
public void run(Timeout timeout) throws Exception {
ThreadPoolCache.stopThreadPool(taskBatchId);
}
}

View File

@ -0,0 +1,30 @@
package com.aizuda.easy.retry.client.job.core.timer;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
* @date 2023-10-08 22:23:57
* @since 2.4.0
*/
public class TimerManager {
private static final HashedWheelTimer wheelTimer;
static {
wheelTimer = new HashedWheelTimer(
new CustomizableThreadFactory("job-task-timer-wheel-"), 1,
TimeUnit.SECONDS, 1024);
}
private TimerManager() {
}
public static Timeout add(TimerTask task, long delay, TimeUnit unit) {
return wheelTimer.newTimeout(task, delay, unit);
}
}

View File

@ -0,0 +1,67 @@
package com.aizuda.easy.retry.client.model;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
/**
* @author: www.byteblogs.com
* @date : 2023-09-27 09:43
* @since 2.4.0
*/
public class ExecuteResult {
private int status;
private Object result;
private String message;
public ExecuteResult() {
}
public ExecuteResult(int status, Object result, String message) {
this.status = status;
this.result = result;
this.message = message;
}
public static ExecuteResult success(Object result) {
return new ExecuteResult(StatusEnum.YES.getStatus(), result, "任务执行成功");
}
public static ExecuteResult success() {
return success(null);
}
public static ExecuteResult failure() {
return failure(null);
}
public static ExecuteResult failure(Object result) {
return new ExecuteResult(StatusEnum.NO.getStatus(), result, "任务执行失败");
}
public static ExecuteResult failure(Object result, String message) {
return new ExecuteResult(StatusEnum.NO.getStatus(), result, message);
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull;
* @date : 2023-09-26 15:10
*/
@Data
public class InterruptJobDTO {
public class StopJobDTO {
@NotNull(message = "jobId 不能为空")
private Long jobId;

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.client.model;
package com.aizuda.easy.retry.client.model.request;
import lombok.Data;
@ -10,14 +10,20 @@ import javax.validation.constraints.NotNull;
* @date : 2023-09-26 15:10
*/
@Data
public class DispatchJobDTO {
public class DispatchJobRequest {
@NotNull(message = "jobId 不能为空")
private Long jobId;
@NotNull(message = "taskBatchId 不能为空")
private Long taskBatchId;
@NotNull(message = "taskId 不能为空")
private Long taskId;
@NotNull(message = "taskType 不能为空")
private Integer taskType;
@NotBlank(message = "group 不能为空")
private String groupName;
@ -30,5 +36,10 @@ public class DispatchJobDTO {
@NotBlank(message = "executorName 不能为空")
private String executorName;
private Integer shardingTotal;
private Integer shardingIndex;
private Integer executorTimeout;
}

View File

@ -0,0 +1,30 @@
package com.aizuda.easy.retry.client.model.request;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import lombok.Data;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 15:10
*/
@Data
public class DispatchJobResultRequest {
private Long jobId;
private Long taskBatchId;
private Long taskId;
/**
* 任务类型
*/
private Integer taskType;
private String groupName;
private Integer taskStatus;
private ExecuteResult executeResult;
}

View File

@ -66,6 +66,10 @@ public interface SystemConstants {
*/
String BATCH_REPORT = "/batch/report";
/**
* 上报job的运行结果
*/
String REPORT_JOB_DISPATCH_RESULT = "/report/dispatch/result";
}
String LOGO = " ___ ___ _ \n" +

View File

@ -0,0 +1,26 @@
package com.aizuda.easy.retry.common.core.enums;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 标识某个操作的具体原因
*
* @author www.byteblogs.com
* @date 2023-10-07 23:05:50
* @since 2.4.0
*/
@AllArgsConstructor
@Getter
public enum JobOperationReasonEnum {
NONE(0, StrUtil.EMPTY),
EXECUTE_TIMEOUT(1, "执行超时"),
NOT_CLIENT(2, "无客户端节点"),
;
private final int reason;
private final String desc;
}

View File

@ -0,0 +1,55 @@
package com.aizuda.easy.retry.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 14:26
* @since : 2.4.0
*/
@AllArgsConstructor
@Getter
public enum JobTaskBatchStatusEnum {
/**
* 待处理
*/
WAITING(1),
/**
* 运行中
*/
RUNNING(2),
/**
* 处理成功
*/
SUCCESS(3),
/**
* 处理失败
*/
FAIL(4),
/**
* 任务停止
*/
STOP(5),
/**
* 取消
*/
CANCEL(6),
;
private final int status;
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status);
}

View File

@ -0,0 +1,50 @@
package com.aizuda.easy.retry.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 14:26
*/
@AllArgsConstructor
@Getter
public enum JobTaskStatusEnum {
/**
* 待处理
*/
WAITING(1),
/**
* 处理中
*/
RUNNING(2),
/**
* 处理成功
*/
SUCCESS(3),
/**
* 处理失败
*/
FAIL(4),
/**
* 任务停止成功
*/
STOP(5),
;
private final int status;
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status);
}

View File

@ -29,7 +29,12 @@ import java.util.stream.Collectors;
* @author: byteblogs
* @date: 2019/09/30 17:02
*/
@ControllerAdvice(basePackages = {"com.aizuda.easy.retry.client.core", "com.aizuda.easy.retry.server"} )
@ControllerAdvice(basePackages = {
"com.aizuda.easy.retry.client.core",
"com.aizuda.easy.retry.client.job.core",
"com.aizuda.easy.retry.client.common",
"com.aizuda.easy.retry.server",
} )
@Slf4j
@ResponseBody
public class RestExceptionHandler {

View File

@ -1,18 +1,20 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskInstance;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* <p>
* 任务实例 Mapper 接口
* 调度任务 Mapper 接口
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Mapper
public interface JobTaskInstanceMapper extends BaseMapper<JobTaskInstance> {
public interface JobTaskBatchMapper extends BaseMapper<JobTaskBatch> {
int updateJobTaskBatchStatus(@Param("taskBatchId") Long taskBatchId, @Param("taskStatus") Integer taskStatus);
}

View File

@ -6,7 +6,7 @@ import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* 调度任务 Mapper 接口
* 任务实例 Mapper 接口
* </p>
*
* @author www.byteblogs.com

View File

@ -27,6 +27,8 @@ public class GroupConfig implements Serializable {
private Integer initScene;
private Integer bucketIndex;
private String description;
private LocalDateTime createDt;

View File

@ -109,6 +109,16 @@ public class Job implements Serializable {
*/
private Integer retryInterval;
/**
* 任务类型
*/
private Integer taskType;
/**
* 并行数
*/
private Integer parallelNum;
/**
* bucket
*/

View File

@ -3,8 +3,10 @@ package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
@ -26,7 +28,7 @@ public class JobLogMessage implements Serializable {
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
@ -42,18 +44,23 @@ public class JobLogMessage implements Serializable {
/**
* 任务实例id
*/
private Long taskId;
private Long taskBatchId;
/**
* 调度任务id
*/
private Long taskInstanceId;
private Long taskId;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 客户端信息
*/
private String clientAddress;
/**
* 调度信息
*/

View File

@ -12,7 +12,7 @@ import lombok.Setter;
/**
* <p>
* 调度任务
* 任务实例
* </p>
*
* @author www.byteblogs.com
@ -37,24 +37,54 @@ public class JobTask implements Serializable {
private String groupName;
/**
* 任务id
* 任务信息id
*/
private Long jobId;
/**
* 调度任务id
*/
private Long taskBatchId;
/**
* 父执行器id
*/
private Long parentId;
/**
* 执行的状态 0失败 1成功
*/
private Integer executeStatus;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 任务状态 0失败 1成功
* 执行结果
*/
private Integer taskStatus;
private String resultMessage;
/**
* 客户端节点id
* 客户端ID
*/
private String hostId;
private String clientId;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 创建时间
@ -66,10 +96,5 @@ public class JobTask implements Serializable {
*/
private LocalDateTime updateDt;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
}

View File

@ -3,14 +3,16 @@ package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 任务实例
* 调度任务
* </p>
*
* @author www.byteblogs.com
@ -18,15 +20,15 @@ import lombok.Setter;
*/
@Getter
@Setter
@TableName("job_task_instance")
public class JobTaskInstance implements Serializable {
@TableName("job_task_batch")
public class JobTaskBatch implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
@ -40,34 +42,34 @@ public class JobTaskInstance implements Serializable {
private Long jobId;
/**
* 调度任务id
* 任务状态 0失败 1成功
*/
private Long taskId;
/**
* 父执行器id
*/
private Long parentId;
/**
* 执行的状态 0失败 1成功
*/
private Integer executeStatus;
/**
* 执行结果
*/
private String resultMessage;
private Integer taskStatus;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 任务执行时间
*/
private LocalDateTime executionAt;
/**
* 操作原因
*/
private Integer operationReason;
/**
* 修改时间
*/
private LocalDateTime updateDt;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
}

View File

@ -29,8 +29,6 @@ public class SceneConfig implements Serializable {
private Long deadlineRequest;
private Integer bucketIndex;
private LocalDateTime createDt;
private LocalDateTime updateDt;

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_status" property="taskStatus" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
<result column="deleted" property="deleted" />
</resultMap>
<update id="updateJobTaskBatchStatus">
update job_task_batch
set task_status = #{taskStatus}
<where>
id = #{taskBatchId}
<if test="taskStatus == 4">
and EXISTS (select id from job_task where task_batch_id = #{taskBatchId} and execute_status = 4)
</if>
<if test="taskStatus == 3">
and NOT EXISTS (select id from job_task where task_batch_id = #{taskBatchId} and execute_status IN(1, 2, 4, 5))
</if>
<if test="taskStatus == 5">
and NOT EXISTS (select id from job_task where task_batch_id = #{taskBatchId} and execute_status IN(1, 2))
</if>
</where>
</update>
</mapper>

View File

@ -1,18 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskInstanceMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskInstance">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_id" property="taskId" />
<result column="parent_id" property="parentId" />
<result column="execute_status" property="executeStatus" />
<result column="result_message" property="resultMessage" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
</resultMap>
</mapper>

View File

@ -7,11 +7,12 @@
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="retry_count" property="retryCount" />
<result column="task_status" property="taskStatus" />
<result column="task_batch_id" property="taskBatchId" />
<result column="parent_id" property="parentId" />
<result column="execute_status" property="executeStatus" />
<result column="result_message" property="resultMessage" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
<result column="deleted" property="deleted" />
</resultMap>
</mapper>

View File

@ -86,6 +86,10 @@
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -28,6 +28,10 @@ public class ActorGenerator {
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
public static final String JOB_LOG_ACTOR = "JobLogActor";
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
public static final String REAL_STOP_TASK_INSTANCE_ACTOR = "RealStopTaskInstanceActor";
private ActorGenerator() {}
@ -149,6 +153,42 @@ public class ActorGenerator {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_ACTOR));
}
/**
* Job任务执行结果actor
*
* @return actor 引用
*/
public static ActorRef jobTaskExecutorResultActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_RESULT_ACTOR));
}
/**
* Job任务向客户端发起请求阶段actor
*
* @return actor 引用
*/
public static ActorRef jobRealTaskExecutorActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_JOB_EXECUTOR_ACTOR));
}
/**
* Job任务向客户端发起请求阶段actor
*
* @return actor 引用
*/
public static ActorRef jobRealStopTaskInstanceActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_STOP_TASK_INSTANCE_ACTOR));
}
/**
* Job日志actor
*
* @return actor 引用
*/
public static ActorRef jobLogActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_LOG_ACTOR));
}
public static SpringExtension getSpringExtension() {
return SpringContext.getBeanByType(SpringExtension.class);
}

View File

@ -1,9 +1,9 @@
package com.aizuda.easy.retry.server.common.client;
import com.aizuda.easy.retry.client.model.DispatchJobDTO;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.InterruptJobDTO;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
@ -26,10 +26,10 @@ public interface RpcClient {
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
Result callback(@Body RetryCallbackDTO retryCallbackDTO);
@Mapping(path = "/job/interrupt/v1", method = RequestMethod.POST)
Result<Boolean> interrupt(@Body InterruptJobDTO interruptJobDTO);
@Mapping(path = "/job/stop/v1", method = RequestMethod.POST)
Result<Boolean> stop(@Body StopJobDTO stopJobDTO);
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
Result dispatch(@Body DispatchJobDTO dispatchJobDTO);
Result<Boolean> dispatch(@Body DispatchJobRequest dispatchJobRequest);
}

View File

@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.URLUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
@ -15,13 +16,23 @@ import com.aizuda.easy.retry.server.common.client.annotation.Param;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
@ -34,6 +45,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* 请求处理器
@ -71,11 +84,23 @@ public class RpcClientInvokeHandler implements InvocationHandler {
Mapping annotation = method.getAnnotation(Mapping.class);
Assert.notNull(annotation, () -> new EasyRetryServerException("@Mapping cannot be null"));
if (annotation.failover()) {
return doFailoverHandler(method, args, annotation);
}
return requestRemote(method, args, annotation, 0);
}
@NotNull
private Result doFailoverHandler(final Method method, final Object[] args, final Mapping annotation)
throws Throwable {
Set<RegisterNodeInfo> serverNodeSet = CacheRegisterTable.getServerNodeSet(groupName);
// 最多调用size次
int size = serverNodeSet.size();
for (int count = 1; count <= size; count++) {
log.info("Start request client. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, hostIp, hostPort, HostUtils.getIp());
log.info("Start request client. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
hostIp, hostPort, HostUtils.getIp());
Result result = requestRemote(method, args, annotation, count);
if (Objects.nonNull(result)) {
return result;
@ -85,7 +110,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
throw new EasyRetryServerException("No available nodes.");
}
private Result requestRemote(Method method, Object[] args, Mapping mapping, int count) {
private Result requestRemote(Method method, Object[] args, Mapping mapping, int count) throws Throwable {
try {
@ -99,23 +124,31 @@ public class RpcClientInvokeHandler implements InvocationHandler {
RestTemplate restTemplate = SpringContext.CONTEXT.getBean(RestTemplate.class);
ResponseEntity<Result> response = restTemplate.exchange(
// 拼接 url?a=1&b=1
getUrl(mapping, parasResult.paramMap).toString(),
// post or get
HttpMethod.valueOf(mapping.method().name()),
// body
new HttpEntity<>(parasResult.body, parasResult.requestHeaders),
// 返回值类型
Result.class);
Retryer<Result> retryer = buildResultRetryer(mapping);
log.info("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, hostIp, hostPort, HostUtils.getIp());
Result result = retryer.call(() -> {
ResponseEntity<Result> response = restTemplate.exchange(
// 拼接 url?a=1&b=1
getUrl(mapping, parasResult.paramMap).toString(),
// post or get
HttpMethod.valueOf(mapping.method().name()),
// body
new HttpEntity<>(parasResult.body, parasResult.requestHeaders),
// 返回值类型
Result.class);
return Objects.requireNonNull(response.getBody());
return Objects.requireNonNull(response.getBody());
});
log.info("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
hostIp, hostPort, HostUtils.getIp());
return result;
} catch (RestClientException ex) {
// 网络异常
if (ex instanceof ResourceAccessException) {
log.error("request client I/O error, count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, hostIp, hostPort, HostUtils.getIp(), ex);
if (ex instanceof ResourceAccessException && mapping.failover()) {
log.error("request client I/O error, count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count,
hostId, hostIp, hostPort, HostUtils.getIp(), ex);
// 进行路由剔除处理
CacheRegisterTable.remove(groupName, hostId);
@ -135,15 +168,43 @@ public class RpcClientInvokeHandler implements InvocationHandler {
} else {
// 其他异常继续抛出
log.error("request client error.count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, hostIp, hostPort, HostUtils.getIp(), ex);
log.error("request client error.count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count,
hostId, hostIp, hostPort, HostUtils.getIp(), ex);
throw ex;
}
} catch (Exception ex) {
log.error("request client unknown exception. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, hostIp, hostPort, HostUtils.getIp(), ex);
throw ex;
log.error("request client unknown exception. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]",
count, hostId, hostIp, hostPort, HostUtils.getIp(), ex);
Throwable throwable = ex;
if (ex.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) ex;
throwable = re.getLastFailedAttempt().getExceptionCause();
}
throw throwable;
}
return null;
return null;
}
private Retryer<Result> buildResultRetryer(Mapping mapping) throws InstantiationException, IllegalAccessException, NoSuchMethodException {
Class<? extends RetryListener> retryListenerClazz = mapping.retryListener();
RetryListener retryListener = retryListenerClazz.newInstance();
Method method = retryListenerClazz.getMethod("onRetry", Attempt.class);
Retryer<Result> retryer = RetryerBuilder.<Result>newBuilder()
.retryIfException(throwable -> mapping.failRetry())
.withStopStrategy(StopStrategies.stopAfterAttempt(mapping.retryTimes()))
.withWaitStrategy(WaitStrategies.fixedWait(mapping.retryInterval(), TimeUnit.SECONDS))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
ReflectionUtils.invokeMethod(method, retryListener, attempt);
}
})
.build();
return retryer;
}
private StringBuilder getUrl(Mapping mapping, Map<String, Object> paramMap) {

View File

@ -0,0 +1,17 @@
package com.aizuda.easy.retry.server.common.client;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
/**
* @author: www.byteblogs.com
* @date : 2023-10-09 09:24
* @since : 2.4.0
*/
public class SimpleRetryListener implements RetryListener {
@Override
public <V> void onRetry(final Attempt<V> attempt) {
}
}

View File

@ -1,6 +1,8 @@
package com.aizuda.easy.retry.server.common.client.annotation;
import com.aizuda.easy.retry.server.common.client.RequestMethod;
import com.aizuda.easy.retry.server.common.client.SimpleRetryListener;
import com.github.rholder.retry.RetryListener;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@ -24,4 +26,16 @@ public @interface Mapping {
String path() default "";
boolean failover() default false;
boolean failRetry() default false;
int retryTimes() default 3;
int retryInterval() default 1;
Class<? extends RetryListener> retryListener() default SimpleRetryListener.class;
}

View File

@ -0,0 +1,14 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
/**
* @author: www.byteblogs.com
* @date : 2022-10-28 18:45
*/
@Data
public class PartitionTask {
protected Long id;
}

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
import java.text.MessageFormat;
import java.time.LocalDateTime;
/**
@ -13,6 +14,8 @@ import java.time.LocalDateTime;
@Data
public class RegisterNodeInfo implements Comparable<RegisterNodeInfo> {
private static final String URL = "http://{0}:{1}/{2}";
private String groupName;
private String hostId;
@ -27,6 +30,10 @@ public class RegisterNodeInfo implements Comparable<RegisterNodeInfo> {
private String contextPath;
public String fullUrl() {
return MessageFormat.format(URL, hostIp, hostPort.toString(), contextPath);
}
@Override
public int compareTo(RegisterNodeInfo info) {
return hostId.compareTo(info.hostId);

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**

View File

@ -222,7 +222,6 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
try {
TimeUnit.SECONDS.sleep(systemProperties.getLoadBalanceCycleTime());
} catch (InterruptedException e) {
LogUtils.error(log, "check balance interrupt");
Thread.currentThread().interrupt();
}
}

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.server.retry.task.support.listener;
package com.aizuda.easy.retry.server.common.listener;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.server.retry.task.support.listener;
package com.aizuda.easy.retry.server.common.listener;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.log.LogUtils;

View File

@ -0,0 +1,28 @@
package com.aizuda.easy.retry.server.job;
import com.aizuda.easy.retry.server.common.Lifecycle;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author www.byteblogs.com
* @date 2023-09-29 23:29:44
* @since 2.4.0
*/
@Component
@Slf4j
public class EasyRetryJobTaskStarter implements Lifecycle {
@Override
public void start() {
// 检查是否还有未执行的任务如果有则直接失败
log.info("easy-retry-job-task starting...");
log.info("easy-retry-job-task completed");
}
@Override
public void close() {
// 关闭还未执行的任务
}
}

View File

@ -7,6 +7,5 @@ import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrat
* @date : 2023-09-25 17:53
*/
public interface BlockStrategy {
boolean block(BlockStrategyContext context);
void block(BlockStrategyContext context);
}

View File

@ -0,0 +1,79 @@
package com.aizuda.easy.retry.server.job.task;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
import com.aizuda.easy.retry.server.job.task.dto.*;
import com.aizuda.easy.retry.server.job.task.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.generator.task.JobTaskGenerateContext;
import com.aizuda.easy.retry.server.job.task.handler.callback.ClientCallbackContext;
import com.aizuda.easy.retry.server.job.task.handler.executor.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.handler.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2021-11-26 15:22
*/
@Mapper
public interface JobTaskConverter {
JobTaskConverter INSTANCE = Mappers.getMapper(JobTaskConverter.class);
@Mappings(
@Mapping(source = "id", target = "jobId")
)
JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTask job);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategies.BlockStrategyContext context);
JobTaskGenerateContext toJobTaskInstanceGenerateContext(JobExecutorContext context);
JobTask toJobTaskInstance(JobTaskGenerateContext context);
BlockStrategies.BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO);
TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context);
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
JobLogDTO toJobLogDTO(JobExecutorContext context);
JobLogDTO toJobLogDTO(JobExecutorResultDTO resultDTO);
JobLogDTO toJobLogDTO(BaseDTO baseDTO);
JobLogDTO toJobLogDTO(DispatchJobResultRequest request);
ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request);
DispatchJobRequest toDispatchJobRequest(RealJobExecutorDTO realJobExecutorDTO);
@Mappings({
@Mapping(source = "job.groupName", target = "groupName"),
@Mapping(source = "jobTask.id", target = "taskId"),
@Mapping(source = "jobTask.argsStr", target = "argsStr"),
@Mapping(source = "jobTask.argsType", target = "argsType"),
@Mapping(source = "jobTask.extAttrs", target = "extAttrs")
})
RealJobExecutorDTO toRealJobExecutorDTO(Job job, JobTask jobTask);
JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context);
JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask);
RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context);
List<JobPartitionTask> toJobPartitionTasks(List<Job> jobs);
}

View File

@ -0,0 +1,55 @@
package com.aizuda.easy.retry.server.job.task.dispatch;
import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.handler.executor.JobExecutor;
import com.aizuda.easy.retry.server.job.task.handler.executor.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.handler.executor.JobExecutorFactory;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:41
* @since : 2.4.0
*/
@Component(ActorGenerator.JOB_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobExecutorActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
try {
doExecute(taskExecute);
} catch (Exception e) {
LogUtils.error(log, "job executor exception. [{}]", taskExecute, e);
} finally {
getContext().stop(getSelf());
}
}).build();
}
private void doExecute(final TaskExecuteDTO taskExecute) {
Job job = jobMapper.selectById(taskExecute.getJobId());
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
JobExecutorContext context = new JobExecutorContext();
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setGroupName(taskExecute.getGroupName());
context.setJobId(job.getId());
context.setTaskType(job.getTaskType());
jobExecutor.execute(context);
}
}

View File

@ -0,0 +1,76 @@
package com.aizuda.easy.retry.server.job.task.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.handler.helper.JobTaskBatchHelper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.Objects;
/**
* @author www.byteblogs.com
* @date 2023-10-05 17:16:35
* @since 2.4.0
*/
@Component(ActorGenerator.JOB_EXECUTOR_RESULT_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobExecutorResultActor extends AbstractActor {
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private JobTaskBatchHelper jobTaskBatchHelper;
@Override
public Receive createReceive() {
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
JobTask jobTask = new JobTask();
jobTask.setExecuteStatus(result.getTaskStatus());
if (Objects.nonNull(result.getResult())) {
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
}
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态
jobTaskBatchHelper.complete(result.getTaskBatchId());
}
});
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
jobLogDTO.setMessage(result.getMessage());
jobLogDTO.setClientId(result.getClientId());
jobLogDTO.setTaskId(result.getTaskId());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
}).build();
}
}

View File

@ -0,0 +1,61 @@
package com.aizuda.easy.retry.server.job.task.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Optional;
/**
* @author www.byteblogs.com
* @date 2023-10-03 22:32:30
* @since 2.4.0
*/
@Component(ActorGenerator.JOB_LOG_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobLogActor extends AbstractActor {
@Autowired
private JobLogMessageMapper jobLogMessageMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(JobLogDTO.class, (jobLogDTO -> {
try {
saveLogMessage(jobLogDTO);
} catch (Exception e) {
log.error("保存日志异常.", e);
} finally {
getContext().stop(getSelf());
}
})).build();
}
private void saveLogMessage(JobLogDTO jobLogDTO) {
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogDTO);
if (Objects.nonNull(jobLogDTO.getClientId())) {
Optional.ofNullable(CacheRegisterTable.getServerNode(jobLogDTO.getGroupName(), jobLogDTO.getClientId())).ifPresent(registerNodeInfo -> {
jobLogMessage.setClientAddress(registerNodeInfo.fullUrl());
});
}
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY));
jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L));
jobLogMessageMapper.insert(jobLogMessage);
}
}

View File

@ -0,0 +1,75 @@
package com.aizuda.easy.retry.server.job.task.dispatch;
import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.handler.prepare.JobPrePareHandler;
import com.aizuda.easy.retry.server.job.task.handler.prepare.TerminalJobPrepareHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
/**
* 调度任务准备阶段
*
* @author www.byteblogs.com
* @date 2023-09-25 22:20:53
* @since
*/
@Component(ActorGenerator.JOB_TASK_PREPARE_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobTaskPrepareActor extends AbstractActor {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private List<JobPrePareHandler> prePareHandlers;
@Override
public Receive createReceive() {
return receiveBuilder().match(JobTaskPrepareDTO.class, job -> {
try {
doPrepare(job);
} catch (Exception e) {
log.error("预处理节点异常", e);
} finally {
getContext().stop(getSelf());
}
}).build();
}
private void doPrepare(JobTaskPrepareDTO prepare) {
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getJobId, prepare.getJobId())
.in(JobTaskBatch::getTaskStatus, NOT_COMPLETE));
// 说明所以任务已经完成
if (CollectionUtils.isEmpty(notCompleteJobTaskBatchList)) {
TerminalJobPrepareHandler terminalJobPrepareHandler = SpringContext.getBeanByType(TerminalJobPrepareHandler.class);
terminalJobPrepareHandler.handler(prepare);
} else {
for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) {
prepare.setExecutionAt(jobTaskBatch.getExecutionAt());
prepare.setTaskBatchId(jobTaskBatch.getId());
for (JobPrePareHandler prePareHandler : prePareHandlers) {
if (prePareHandler.matches(jobTaskBatch.getTaskStatus())) {
prePareHandler.handler(prepare);
break;
}
}
}
}
}
}

View File

@ -0,0 +1,137 @@
package com.aizuda.easy.retry.server.job.task.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import static com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.*;
/**
* JOB任务扫描
*
* @author: www.byteblogs.com
* @date : 2023-09-22 09:08
* @since 2.4.0
*/
@Component(ActorGenerator.SCAN_JOB_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ScanJobTaskActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
private static final AtomicLong lastId = new AtomicLong(0L);
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
try {
doScan(config);
} catch (Exception e) {
LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
}
}).build();
}
private void doScan(final ScanTask scanTask) {
log.info("job scan start");
long total = process(startId -> listAvailableJobs(startId, scanTask), partitionTasks -> {
for (final JobPartitionTask partitionTask : (List<JobPartitionTask>) partitionTasks) {
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
// 更新下次触发时间
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(partitionTask.getTriggerType());
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
Job job = new Job();
job.setId(partitionTask.getId());
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
Assert.isTrue(1 == jobMapper.updateById(job),
() -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId()));
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);
}
}, 0);
log.info("job scan end. total:[{}]", total);
}
private List<JobPartitionTask> listAvailableJobs(Long startId, ScanTask scanTask) {
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, 1000),
new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(6))
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.gt(Job::getId, startId)
).getRecords();
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
}
public long process(
Function<Long, List<? extends PartitionTask>> dataSource, Consumer<List<? extends PartitionTask>> task, long startId) {
int total = 0;
do {
List<? extends PartitionTask> products = dataSource.apply(startId);
if (CollectionUtils.isEmpty(products)) {
// 没有查询到数据直接退出
break;
}
total += products.size();
task.accept(products);
startId = maxId(products);
} while (startId > 0);
return total;
}
private static long maxId(List<? extends PartitionTask> products) {
Optional<Long> max = products.stream().map(PartitionTask::getId).max(Long::compareTo);
return max.orElse(-1L) + 1;
}
}

View File

@ -0,0 +1,19 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-10-06 17:05:04
* @since 2.4.0
*/
@Data
public class BaseDTO {
private Long jobId;
private Long taskBatchId;
private String groupName;
private Long taskId;
private Integer taskType;
private String clientId;
}

View File

@ -0,0 +1,31 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-10-05 17:18:38
* @since 2.4.0
*/
@Data
public class JobExecutorResultDTO {
private Long jobId;
private Long taskBatchId;
private Long taskId;
private String groupName;
private Integer taskStatus;
private String message;
private String clientId;
private Integer taskType;
private Object result;
}

View File

@ -0,0 +1,44 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-10-03 22:34:14
* @since 2.4.0
*/
@Data
public class JobLogDTO {
/**
* 组名称
*/
private String groupName;
/**
* 任务信息id
*/
private Long jobId;
/**
* 任务实例id
*/
private Long taskBatchId;
/**
* 调度任务id
*/
private Long taskId;
/**
* 执行的客户端信息
*/
private String clientId;
/**
* 调度信息
*/
private String message;
}

View File

@ -0,0 +1,52 @@
package com.aizuda.easy.retry.server.job.task.dto;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author: www.byteblogs.com
* @date : 2023-10-10 17:52
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class JobPartitionTask extends PartitionTask {
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
}

View File

@ -24,79 +24,33 @@ public class JobTaskPrepareDTO {
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 重试状态 0关闭1开启
*/
private Integer jobStatus;
/**
* 执行器路由策略
*/
private String routeKey;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorName;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务类型
*/
private Integer taskType;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
private Long taskBatchId;
private String clientId;
/**
* 重试间隔(s)
* 任务执行时间
*/
private Integer retryInterval;
/**
* bucket
*/
private Integer bucketIndex;
private LocalDateTime executionAt;
}

View File

@ -0,0 +1,14 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-09-30 23:19:39
* @since 2.4.0
*/
@Data
public class JobTimerTaskDTO {
private Long taskBatchId;
}

View File

@ -0,0 +1,71 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author www.byteblogs.com
* @date 2023-10-06 16:45:13
* @since 2.4.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RealJobExecutorDTO extends BaseDTO {
private Long jobId;
/**
* 名称
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
private Long taskBatchId;
private Long taskId;
private Integer taskType;
private String groupName;
private Integer parallelNum;
private Integer executorType;
private String executorName;
private String clientId;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
private Integer shardingTotal;
private Integer shardingIndex;
private Integer executorTimeout;
}

View File

@ -0,0 +1,20 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author: www.byteblogs.com
* @date : 2023-10-07 10:50
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RealStopTaskInstanceDTO extends BaseDTO {
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
}

View File

@ -9,5 +9,7 @@ import lombok.Data;
@Data
public class TaskExecuteDTO {
private Long taskId;
private Long jobId;
private Long taskBatchId;
private String groupName;
}

View File

@ -1,9 +0,0 @@
package com.aizuda.easy.retry.server.job.task.enums;
/**
* @author www.byteblogs.com
* @date 2023-09-24 12:01:31
* @since 2.4.0
*/
public enum BlockStrategyEnum {
}

View File

@ -1,52 +0,0 @@
package com.aizuda.easy.retry.server.job.task.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 14:26
*/
@AllArgsConstructor
@Getter
public enum TaskStatusEnum {
/**
* 待处理
*/
WAIT(10),
/**
* 处理中
*/
PROCESSING(20),
/**
* 处理中
*/
PROCESSED_SUCCESS(21),
/**
* 处理中
*/
PROCESSED_FAIL(22),
/**
* 中断中
*/
INTERRUPTING(30),
/**
* 中断成功
*/
INTERRUPT_SUCCESS(31),
/**
* 中断失败
*/
INTERRUPT_FAIL(32),
;
private final int status;
}

View File

@ -0,0 +1,31 @@
package com.aizuda.easy.retry.server.job.task.enums;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author www.byteblogs.com
* @date 2023-10-02 10:39:22
* @since 2.4.0
*/
@AllArgsConstructor
@Getter
public enum TaskTypeEnum {
CLUSTER(1),
BROADCAST(2),
SHARDING(3);
private final int type;
public static TaskTypeEnum valueOf(int type) {
for (TaskTypeEnum value : TaskTypeEnum.values()) {
if (value.getType() == type) {
return value;
}
}
throw new EasyRetryServerException("未知类型");
}
}

View File

@ -1,9 +0,0 @@
package com.aizuda.easy.retry.server.job.task.executor;
/**
* @author www.byteblogs.com
* @date 2023-09-24 11:40:21
* @since 2.4.0
*/
public class JobExecutor {
}

View File

@ -0,0 +1,65 @@
package com.aizuda.easy.retry.server.job.task.generator.batch;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.handler.timer.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.handler.timer.JobTimerWheelHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
* @date 2023-10-02 10:22:26
* @since 2.4.0
*/
@Component
@Slf4j
public class JobTaskBatchGenerator {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Transactional
public void generateJobTaskBatch(JobTaskBatchGeneratorContext context) {
// 生成一个新的任务
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setJobId(context.getJobId());
jobTaskBatch.setGroupName(context.getGroupName());
// 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) {
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
return;
}
// 生成一个新的任务
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
// 进入时间轮
long delay = context.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis();
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
JobTimerWheelHandler.register(context.getGroupName(), context.getJobId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -0,0 +1,36 @@
package com.aizuda.easy.retry.server.job.task.generator.batch;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author www.byteblogs.com
* @date 2023-10-02 13:12:48
* @since 2.4.0
*/
@Data
public class JobTaskBatchGeneratorContext {
/**
* 组名称
*/
private String groupName;
/**
* 任务id
*/
private Long jobId;
/**
* 任务类型
*/
private Integer taskInstanceType;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
}

View File

@ -0,0 +1,26 @@
package com.aizuda.easy.retry.server.job.task.generator.task;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import org.springframework.beans.factory.InitializingBean;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2023-10-02 13:08:14
* @since 2.4.0
*/
public abstract class AbstractJobTaskGenerator implements JobTaskGenerator, InitializingBean {
@Override
public List<JobTask> generate(JobTaskGenerateContext context) {
return doGenerate(context);
}
protected abstract List<JobTask> doGenerate(JobTaskGenerateContext context);
@Override
public void afterPropertiesSet() throws Exception {
JobTaskGeneratorFactory.registerTaskInstance(getTaskInstanceType(), this);
}
}

View File

@ -0,0 +1,73 @@
package com.aizuda.easy.retry.server.job.task.generator.task;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* @author www.byteblogs.com
* @date 2023-10-02 21:25:08
* @since 2.4.0
*/
@Component
@Slf4j
public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
@Override
public TaskTypeEnum getTaskInstanceType() {
return TaskTypeEnum.BROADCAST;
}
@Override
@Transactional
public List<JobTask> doGenerate(JobTaskGenerateContext context) {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName());
if (CollectionUtils.isEmpty(serverNodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
Job job = jobMapper.selectById(context.getJobId());
List<JobTask> jobTasks = new ArrayList<>(serverNodes.size());
for (RegisterNodeInfo serverNode : serverNodes) {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(serverNode.getHostId());
jobTask.setArgsType(job.getArgsType());
jobTask.setArgsStr(job.getArgsStr());
jobTask.setExtAttrs(job.getExtAttrs());
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
return jobTasks;
}
}

View File

@ -0,0 +1,69 @@
package com.aizuda.easy.retry.server.job.task.generator.task;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* @author www.byteblogs.com
* @date 2023-10-02 12:59:53
* @since 2.4.0
*/
@Component
@Slf4j
public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
@Override
public TaskTypeEnum getTaskInstanceType() {
return TaskTypeEnum.CLUSTER;
}
@Override
public List<JobTask> doGenerate(JobTaskGenerateContext context) {
// 生成可执行任务
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getGroupName());
if (Objects.isNull(serverNode)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
Job job = jobMapper.selectById(context.getJobId());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(serverNode.getHostId());
jobTask.setArgsType(job.getArgsType());
jobTask.setArgsStr(job.getArgsStr());
jobTask.setExtAttrs(job.getExtAttrs());
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
return Lists.newArrayList(jobTask);
}
}

View File

@ -0,0 +1,15 @@
package com.aizuda.easy.retry.server.job.task.generator.task;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-10-02 13:02:57
* @since 2.4.0
*/
@Data
public class JobTaskGenerateContext {
private Long taskBatchId;
private String groupName;
private Long jobId;
}

View File

@ -0,0 +1,19 @@
package com.aizuda.easy.retry.server.job.task.generator.task;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2023-10-02 10:43:58
* @since 2.4.0
*/
public interface JobTaskGenerator {
TaskTypeEnum getTaskInstanceType();
List<JobTask> generate(JobTaskGenerateContext context);
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.server.job.task.generator.task;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author www.byteblogs.com
* @date 2023-10-02 13:04:09
* @since 2.4.0
*/
public class JobTaskGeneratorFactory {
private static final ConcurrentHashMap<TaskTypeEnum, JobTaskGenerator> CACHE = new ConcurrentHashMap<>();
public static void registerTaskInstance(TaskTypeEnum taskInstanceType, JobTaskGenerator generator) {
CACHE.put(taskInstanceType, generator);
}
public static JobTaskGenerator getTaskInstance(Integer type) {
return CACHE.get(TaskTypeEnum.valueOf(type));
}
}

View File

@ -0,0 +1,80 @@
package com.aizuda.easy.retry.server.job.task.generator.task;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* 分片参数格式
* 0=参数1;1=参数2;
*
* @author www.byteblogs.com
* @date 2023-10-02 21:37:22
* @since 2.4.0
*/
@Component
@Slf4j
public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
@Autowired
private JobMapper jobMapper;
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
@Override
public TaskTypeEnum getTaskInstanceType() {
return TaskTypeEnum.SHARDING;
}
@Override
public List<JobTask> doGenerate(JobTaskGenerateContext context) {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName());
if (CollectionUtils.isEmpty(serverNodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
Job job = jobMapper.selectById(context.getJobId());
String argsStr = job.getArgsStr();
Map<String, String> split = Splitter.on(";").omitEmptyStrings().withKeyValueSeparator('=').split(argsStr);
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
List<JobTask> jobTasks = new ArrayList<>(split.size());
split.forEach((key, value) -> {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(Integer.parseInt(key) % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(registerNodeInfo.getHostId());
jobTask.setArgsType(job.getArgsType());
jobTask.setArgsStr(job.getArgsStr());
jobTask.setExtAttrs(job.getExtAttrs());
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
});
return jobTasks;
}
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.job.task.handler.callback;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional;
/**
* @author www.byteblogs.com
* @date 2023-10-03 23:12:33
* @since 2.4.0
*/
public abstract class AbstractClientCallbackHandler implements ClientCallbackHandler, InitializingBean {
@Override
@Transactional
public void callback(ClientCallbackContext context) {
doCallback(context);
}
protected abstract void doCallback(ClientCallbackContext context);
@Override
public void afterPropertiesSet() throws Exception {
ClientCallbackFactory.registerJobExecutor(getTaskInstanceType(), this);
}
}

View File

@ -0,0 +1,40 @@
package com.aizuda.easy.retry.server.job.task.handler.callback;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collections;
/**
* @author: www.byteblogs.com
* @date : 2023-10-07 10:24
* @since : 2.4.0
*/
@Component
@Slf4j
public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandler {
@Override
public TaskTypeEnum getTaskInstanceType() {
return TaskTypeEnum.BROADCAST;
}
@Override
protected void doCallback(final ClientCallbackContext context) {
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId());
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
jobExecutorResultDTO.setResult(context.getExecuteResult().getResult());
jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType());
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
actorRef.tell(jobExecutorResultDTO, actorRef);
}
}

View File

@ -0,0 +1,27 @@
package com.aizuda.easy.retry.server.job.task.handler.callback;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-10-03 23:13:05
* @since 2.4.0
*/
@Data
public class ClientCallbackContext {
private Long jobId;
private Long taskBatchId;
private Long taskId;
private String groupName;
private Integer taskStatus;
private ExecuteResult executeResult;
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.server.job.task.handler.callback;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author www.byteblogs.com
* @date 2023-10-02 13:04:09
* @since 2.4.0
*/
public class ClientCallbackFactory {
private static final ConcurrentHashMap<TaskTypeEnum, ClientCallbackHandler> CACHE = new ConcurrentHashMap<>();
public static void registerJobExecutor(TaskTypeEnum taskInstanceType, ClientCallbackHandler callbackHandler) {
CACHE.put(taskInstanceType, callbackHandler);
}
public static ClientCallbackHandler getClientCallback(Integer type) {
return CACHE.get(TaskTypeEnum.valueOf(type));
}
}

Some files were not shown because too many files have changed in this diff Show More