diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java index 8cecee14..852eb7f1 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java @@ -2,8 +2,10 @@ package com.aizuda.easy.retry.template.datasource.persistence.po; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; + import java.io.Serializable; import java.time.LocalDateTime; + import lombok.Getter; import lombok.Setter; @@ -24,7 +26,7 @@ public class Job implements Serializable { /** * 主键 */ - @TableId(value = "id", type = IdType.AUTO) + @TableId(value = "id", type = IdType.AUTO) private Long id; /** @@ -77,6 +79,16 @@ public class Job implements Serializable { */ private String executorName; + /** + * 触发类型 1.CRON 表达式 2. 固定时间 + */ + private Integer triggerType; + + /** + * 间隔时长 + */ + private String triggerInterval; + /** * 阻塞策略 1、丢弃 2、覆盖 3、并行 */ diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java index 639764ec..888eda4f 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java @@ -15,7 +15,6 @@ public class ActorGenerator { public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor"; public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor"; - public static final String SCAN_JOB_ACTOR = "ScanJobActor"; public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor"; public static final String FINISH_ACTOR = "FinishActor"; public static final String FAILURE_ACTOR = "FailureActor"; @@ -25,6 +24,10 @@ public class ActorGenerator { public static final String LOG_ACTOR = "LogActor"; public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor"; + /*----------------------------------------分布式任务调度----------------------------------------*/ + public static final String SCAN_JOB_ACTOR = "ScanJobActor"; + + public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor"; private ActorGenerator() {} @@ -127,6 +130,15 @@ public class ActorGenerator { return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR)); } + /** + * Job调度准备阶段actor + * + * @return actor 引用 + */ + public static ActorRef jobTaskPrepareActor() { + return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR)); + } + public static SpringExtension getSpringExtension() { return SpringContext.getBeanByType(SpringExtension.class); } @@ -173,4 +185,15 @@ public class ActorGenerator { public static ActorSystem getNettyActorSystem() { return SpringContext.getBean("nettyActorSystem", ActorSystem.class); } + + /** + * 处理Job调度 + * + * @return + */ + public static ActorSystem getJobActorSystem() { + return SpringContext.getBean("jobActorSystem", ActorSystem.class); + } + + } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java index 2864b868..5fdde59b 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java @@ -20,6 +20,7 @@ public class AkkaConfiguration { private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM"; private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM"; private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM"; + private static final String JOB_ACTOR_SYSTEM = "JOB_ACTOR_SYSTEM"; @Autowired private ApplicationContext applicationContext; @@ -86,4 +87,16 @@ public class AkkaConfiguration { springExtension.initialize(applicationContext); return system; } + + /** + * 处理job调度 + * + * @return {@link ActorSystem} 顶级actor + */ + @Bean("jobActorSystem") + public ActorSystem jobActorSystem() { + ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM); + springExtension.initialize(applicationContext); + return system; + } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/FilterStrategyContext.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/FilterStrategyContext.java new file mode 100644 index 00000000..5e564236 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/FilterStrategyContext.java @@ -0,0 +1,32 @@ +package com.aizuda.easy.retry.server.common.dto; + +import lombok.Data; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * @author www.byteblogs.com + * @date 2023-09-25 22:48:36 + * @since 2.4.0 + */ +@Data +public class FilterStrategyContext { + + private Long id; + + private RegisterNodeInfo registerNodeInfo; + + private String groupName; + + private LocalDateTime nextTriggerAt; + + + private String uniqueId; + + private List sceneBlacklist; + + private String sceneName; + + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/WaitStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/WaitStrategy.java new file mode 100644 index 00000000..d5617d81 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/WaitStrategy.java @@ -0,0 +1,23 @@ +package com.aizuda.easy.retry.server.job.task; + +import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies; + +import java.time.LocalDateTime; + +/** + * 等待策略(退避策略) + * + * @author: www.byteblogs.com + * @date : 2021-11-29 18:18 + */ +public interface WaitStrategy { + + /** + * 计算下次重试触发时间 + * + * @param context {@link WaitStrategies.WaitStrategyContext} 重试上下文 + * @return 下次触发时间 + */ + LocalDateTime computeRetryTime(WaitStrategies.WaitStrategyContext context); + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java new file mode 100644 index 00000000..702b462e --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java @@ -0,0 +1,102 @@ +package com.aizuda.easy.retry.server.job.task.dto; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author www.byteblogs.com + * @date 2023-09-25 22:42:21 + * @since 2.4.0 + */ +@Data +public class JobTaskPrepareDTO { + + private Long jobId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 名称 + */ + private String jobName; + + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private String argsType; + + /** + * 扩展字段 + */ + private String extAttrs; + + /** + * 下次触发时间 + */ + private LocalDateTime nextTriggerAt; + + /** + * 重试状态 0、关闭、1、开启 + */ + private Integer jobStatus; + + /** + * 执行器路由策略 + */ + private String routeKey; + + /** + * 执行器类型 1、Java + */ + private Integer executorType; + + /** + * 执行器名称 + */ + private String executorName; + + /** + * 触发类型 1.CRON 表达式 2. 固定时间 + */ + private Integer triggerType; + + /** + * 间隔时长 + */ + private String triggerInterval; + + /** + * 阻塞策略 1、丢弃 2、覆盖 3、并行 + */ + private Integer blockStrategy; + + /** + * 任务执行超时时间,单位秒 + */ + private Integer executorTimeout; + + /** + * 最大重试次数 + */ + private Integer maxRetryTimes; + + /** + * 重试间隔(s) + */ + private Integer retryInterval; + + /** + * bucket + */ + private Integer bucketIndex; + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java index fa76d52a..ba07fb6f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java @@ -29,7 +29,6 @@ public class JobExecutorActor extends AbstractActor { } private void doExecute(final JobContext jobContext) { - // 调度客户端 } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTaskPrepareActor.java new file mode 100644 index 00000000..d9c9ccd0 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTaskPrepareActor.java @@ -0,0 +1,73 @@ +package com.aizuda.easy.retry.server.job.task.scan; + +import akka.actor.AbstractActor; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.job.task.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * @author www.byteblogs.com + * @date 2023-09-25 22:20:53 + * @since + */ +@Component(ActorGenerator.SCAN_JOB_ACTOR) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class JobTaskPrepareActor extends AbstractActor { + @Autowired + private JobMapper jobMapper; + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + protected ClientNodeAllocateHandler clientNodeAllocateHandler; + + @Override + public Receive createReceive() { + return receiveBuilder().match(JobTaskPrepareDTO.class, job -> { + try { + doPrepare(job); + } catch (Exception e) { + + } + }).build(); + } + + private void doPrepare(JobTaskPrepareDTO prepare) { + + Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus())); + if (count <= 0) { + // 生成可执行任务 + JobTask jobTask = new JobTask(); + jobTask.setJobId(prepare.getJobId()); + jobTask.setGroupName(prepare.getGroupName()); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", prepare.getJobId())); + + JobContext jobContext = new JobContext(); + // 进入时间轮 + JobTimerWheelHandler.register(prepare.getGroupName(), prepare.getJobId().toString(), new JobTimerTask(jobContext), 1, TimeUnit.MILLISECONDS); + } else { + + BlockStrategies.BlockStrategyContext blockStrategyContext = new BlockStrategies.BlockStrategyContext(); + BlockStrategy blockStrategy = BlockStrategies.BlockStrategyEnum.getBlockStrategy(prepare.getBlockStrategy()); + blockStrategy.block(blockStrategyContext); + } + + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java index 1ff6cf8a..8b78141d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java @@ -7,7 +7,13 @@ import io.netty.util.TimerTask; * @author: www.byteblogs.com * @date : 2023-09-25 17:28 */ -public class JobTimerTask implements TimerTask { +public class JobTimerTask implements TimerTask { + + private JobContext jobContext; + + public JobTimerTask(JobContext jobContext) { + this.jobContext = jobContext; + } @Override public void run(final Timeout timeout) throws Exception { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java index 561e8628..5eef4fb1 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java @@ -1,16 +1,22 @@ package com.aizuda.easy.retry.server.job.task.scan; import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.ScanTask; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.job.task.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.WaitStrategy; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies; import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext; import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyEnum; +import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; @@ -34,6 +40,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.*; + /** * JOB任务扫描 * @@ -88,41 +96,23 @@ public class ScanJobTaskActor extends AbstractActor { lastId.set(jobs.get(jobs.size() - 1).getId()); for (Job job : jobs) { - JobContext jobContext = new JobContext(); - // 选择客户端节点 - // TODO 校验一下客户端是否存活 - jobContext.setRegisterNodeInfo(clientNodeAllocateHandler.getServerNode(job.getGroupName())); - Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus())); - if (count <= 0) { - // 生成可执行任务 - JobTask jobTask = new JobTask(); - jobTask.setJobId(job.getId()); - jobTask.setGroupName(job.getGroupName()); - jobTaskMapper.insert(jobTask); + // 更新下次触发时间 + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setTriggerType(job.getTriggerType()); + waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(job.getNextTriggerAt()); + job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); + Assert.isTrue(1 == jobMapper.updateById(job), () -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId())); - // 更新下次触发时间 - // ToDo 根据CRON表达式计算 - job.setNextTriggerAt(LocalDateTime.now().plusSeconds(50)); - jobMapper.updateById(job); - } else { - BlockStrategyContext blockStrategyContext = new BlockStrategyContext(); - - BlockStrategy blockStrategy = BlockStrategyEnum.getBlockStrategy(job.getBlockStrategy()); - blockStrategy.block(blockStrategyContext); - } - - - - // 进入时间轮 - JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(), 1, TimeUnit.MILLISECONDS); - // 校验是否存在已经在执行的任务了 -// boolean isExist = true; -// if (isExist) { -// // 选择丢弃策略 -// String blockStrategy = job.getBlockStrategy(); -// -// } + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + JobTaskPrepareDTO jobTaskPrepareDTO = new JobTaskPrepareDTO(); + jobTaskPrepareDTO.setJobId(job.getId()); + jobTaskPrepareDTO.setTriggerType(job.getTriggerType()); + jobTaskPrepareDTO.setNextTriggerAt(job.getNextTriggerAt()); + actorRef.tell(jobTaskPrepareDTO, actorRef); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java index 61b6687b..f00674e8 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java @@ -1,7 +1,12 @@ package com.aizuda.easy.retry.server.job.task.strategy; +import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.scan.JobContext; +import com.aizuda.easy.retry.server.job.task.scan.JobTimerTask; +import com.aizuda.easy.retry.server.job.task.scan.JobTimerWheelHandler; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; @@ -10,6 +15,8 @@ import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.TimeUnit; + /** * @author: www.byteblogs.com * @date : 2023-09-25 17:52 @@ -78,9 +85,12 @@ public class BlockStrategies { JobTask jobTask = new JobTask(); jobTask.setJobId(job.getId()); jobTask.setGroupName(job.getGroupName()); - JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class); - jobTaskMapper.insert(jobTask); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", job.getId())); + + JobContext jobContext = new JobContext(); + // 进入时间轮 + JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(jobContext), 1, TimeUnit.MILLISECONDS); return false; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/WaitStrategies.java new file mode 100644 index 00000000..f4c4b85b --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/WaitStrategies.java @@ -0,0 +1,155 @@ +package com.aizuda.easy.retry.server.job.task.strategy; + +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.util.CronExpression; +import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.WaitStrategy; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; + +import java.text.ParseException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Date; + +/** + * 生成 {@link WaitStrategy} 实例. + * + * @author: www.byteblogs.com + * @date : 2021-11-29 18:19 + */ +public class WaitStrategies { + + private WaitStrategies() { + } + + @Data + public static class WaitStrategyContext { + /** + * 触发类型 1.CRON 表达式 2. 固定时间 + */ + private Integer triggerType; + + /** + * 间隔时长 + */ + private String triggerInterval; + + /** + * 下次触发时间 + */ + private LocalDateTime nextTriggerAt; + + } + + @Getter + @AllArgsConstructor + public enum WaitStrategyEnum { + CRON(1, cronWait()), + FIXED(2, fixedWait()), + ; + + private final int triggerType; + private final WaitStrategy waitStrategy; + + + /** + * 获取退避策略 + * + * @param triggerType 触发类型 + * @return 退避策略 + */ + public static WaitStrategy getWaitStrategy(int triggerType) { + + WaitStrategyEnum waitStrategy = getWaitStrategyEnum(triggerType); + switch (waitStrategy) { + case CRON: + return cronWait(); + case FIXED: + return fixedWait(); + default: + return waitStrategy.waitStrategy; + } + + } + + /** + * 获取退避策略枚举对象 + * + * @param backOff 退避策略 + * @return 退避策略枚举对象 + */ + public static WaitStrategyEnum getWaitStrategyEnum(int backOff) { + + for (WaitStrategyEnum value : WaitStrategyEnum.values()) { + if (value.triggerType == backOff) { + return value; + } + } + + // TODO 兜底为默认等级策略 + return null; + } + + } + + + /** + * 固定定时间等待策略 + * + * @return {@link FixedWaitStrategy} 固定定时间等待策略 + */ + public static WaitStrategy fixedWait() { + return new FixedWaitStrategy(); + } + + /** + * cron等待策略 + * + * @return {@link CronWaitStrategy} cron等待策略 + */ + public static WaitStrategy cronWait() { + return new CronWaitStrategy(); + } + + /** + * 固定定时间等待策略 + */ + private static final class FixedWaitStrategy implements WaitStrategy { + + @Override + public LocalDateTime computeRetryTime(WaitStrategyContext context) { + long triggerInterval = Long.parseLong(context.triggerInterval); + + return context.getNextTriggerAt().plusSeconds(triggerInterval); + } + } + + /** + * Cron等待策略 + */ + private static final class CronWaitStrategy implements WaitStrategy { + + @Override + public LocalDateTime computeRetryTime(WaitStrategyContext context) { + + Date nextValidTime; + try { + ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8)); + nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); + } catch (ParseException e) { + throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); + } + + return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8)); + } + } + +}