From a9b5ba3ffd51759f061e51793652cbd72826deea Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 3 Nov 2023 00:07:50 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E4=BC=98=E5=8C=96JOB?= =?UTF-8?q?=E7=9A=84=E8=A7=A6=E5=8F=91=E6=97=B6=E9=97=B4wei=20long=202.=20?= =?UTF-8?q?=E6=8A=BD=E7=A6=BBWaitStrategy=E4=B8=BA=E5=85=AC=E5=85=B1?= =?UTF-8?q?=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_mysql.sql | 2 +- .../datasource/persistence/po/Job.java | 2 +- .../retry/server/common}/WaitStrategy.java | 8 +- .../common}/strategy/WaitStrategies.java | 75 ++++----- .../retry/server/common/util/CronUtils.java | 49 ++++++ .../retry/server/common/util/DateUtil.java | 25 +++ .../server/job/task/dto/JobPartitionTask.java | 2 +- .../job/task/dto/JobTaskPrepareDTO.java | 2 +- .../server/job/task/support/WaitStrategy.java | 23 --- .../task/support/cache/ResidentTaskCache.java | 8 +- .../support/dispatch/JobExecutorActor.java | 16 +- .../support/dispatch/ScanJobTaskActor.java | 42 +++-- .../task/support/strategy/WaitStrategies.java | 150 ------------------ .../support/timer/ResidentJobTimerTask.java | 20 --- .../generator/task/AbstractGenerator.java | 14 +- .../task/service/impl/RetryServiceImpl.java | 6 +- .../retry/task/support/RetryContext.java | 1 + .../support/context/CallbackRetryContext.java | 2 +- .../MaxAttemptsPersistenceRetryContext.java | 2 +- .../dispatch/actor/result/NoRetryActor.java | 1 - .../actor/scan/ScanCallbackTaskActor.java | 11 +- .../actor/scan/ScanRetryTaskActor.java | 14 +- .../dispatch/task/CallbackTaskExecutor.java | 6 +- .../task/ManualCallbackTaskExecutor.java | 6 +- .../task/ManualRetryTaskExecutor.java | 4 +- .../dispatch/task/RetryTaskExecutor.java | 4 +- .../handler/CallbackRetryTaskHandler.java | 17 +- .../task/support/retry/RetryBuilder.java | 2 +- .../task/support/retry/RetryExecutor.java | 2 +- .../web/service/impl/JobServiceImpl.java | 61 ++----- .../impl/RetryDeadLetterServiceImpl.java | 4 +- .../service/impl/RetryTaskServiceImpl.java | 7 +- .../service/impl/SceneConfigServiceImpl.java | 24 ++- frontend/src/views/job/JobInfo.vue | 2 +- frontend/src/views/job/from/JobFrom.vue | 43 +++-- frontend/src/views/task/SceneList.vue | 3 +- frontend/src/views/task/form/SceneFrom.vue | 6 +- 37 files changed, 257 insertions(+), 409 deletions(-) rename easy-retry-server/{easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support => easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common}/WaitStrategy.java (54%) rename easy-retry-server/{easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support => easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common}/strategy/WaitStrategies.java (67%) create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/CronUtils.java create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/DateUtil.java delete mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WaitStrategy.java delete mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 22781878..4eca24ae 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -222,7 +222,7 @@ CREATE TABLE `job` ( `job_name` varchar(64) NOT NULL COMMENT '名称', `args_str` text DEFAULT NULL COMMENT '执行方法参数', `args_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '参数类型 ', - `next_trigger_at` datetime NOT NULL COMMENT '下次触发时间', + `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间', `job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启', `task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片', `route_key` tinyint(4) NOT NULL DEFAULT '4' COMMENT '路由策略', diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java index db97aa23..46c84412 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java @@ -57,7 +57,7 @@ public class Job implements Serializable { /** * 下次触发时间 */ - private LocalDateTime nextTriggerAt; + private Long nextTriggerAt; /** * 重试状态 0、关闭、1、开启 diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/WaitStrategy.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/WaitStrategy.java similarity index 54% rename from easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/WaitStrategy.java rename to easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/WaitStrategy.java index a02f74b4..0361c936 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/WaitStrategy.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/WaitStrategy.java @@ -1,8 +1,6 @@ -package com.aizuda.easy.retry.server.retry.task.support; +package com.aizuda.easy.retry.server.common; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; - -import java.time.LocalDateTime; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext; /** * 等待策略(退避策略) @@ -18,6 +16,6 @@ public interface WaitStrategy { * @param waitStrategyContext {@link WaitStrategyContext} 重试上下文 * @return 下次触发时间 */ - LocalDateTime computeRetryTime(WaitStrategyContext waitStrategyContext); + Long computeTriggerTime(WaitStrategyContext waitStrategyContext); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/strategy/WaitStrategies.java similarity index 67% rename from easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java rename to easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/strategy/WaitStrategies.java index 3a2058da..59ad40a0 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/strategy/WaitStrategies.java @@ -1,18 +1,17 @@ -package com.aizuda.easy.retry.server.retry.task.support.strategy; +package com.aizuda.easy.retry.server.common.strategy; -import cn.hutool.core.date.DateUtil; +import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException; import com.aizuda.easy.retry.common.core.util.CronExpression; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.util.DateUtil; import com.google.common.base.Preconditions; import lombok.Data; import lombok.Getter; import java.text.ParseException; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; +import java.time.Duration; import java.util.Date; import java.util.Objects; import java.util.Random; @@ -31,10 +30,6 @@ public class WaitStrategies { @Data public static class WaitStrategyContext { -// /** -// * 触发类型 1.CRON 表达式 2. 固定时间 -// */ -// private Integer triggerType; /** * 间隔时长 @@ -44,12 +39,13 @@ public class WaitStrategies { /** * 下次触发时间 */ - private LocalDateTime nextTriggerAt; + private long nextTriggerAt; /** - * 触发次数 + * 延迟等级 + * 仅在选择 DELAY_LEVEL时使用 {@link DelayLevelEnum} */ - private Integer triggerCount; + private Integer delayLevel; } @@ -60,11 +56,11 @@ public class WaitStrategies { CRON(3, cronWait()), RANDOM(4, randomWait()); - private final int backOff; + private final int type; private final WaitStrategy waitStrategy; - WaitStrategyEnum(int backOff, WaitStrategy waitStrategy) { - this.backOff = backOff; + WaitStrategyEnum(int type, WaitStrategy waitStrategy) { + this.type = type; this.waitStrategy = waitStrategy; } @@ -75,35 +71,26 @@ public class WaitStrategies { * @return 退避策略 */ public static WaitStrategy getWaitStrategy(int backOff) { - - WaitStrategyEnum waitStrategy = getWaitStrategyEnum(backOff); - switch (waitStrategy) { - case RANDOM: - return randomWait(); - case CRON: - return cronWait(); - default: - return waitStrategy.waitStrategy; - } + return getWaitStrategyEnum(backOff).getWaitStrategy(); } /** - * 获取退避策略枚举对象 + * 获取等待策略类型枚举对象 * - * @param backOff 退避策略 - * @return 退避策略枚举对象 + * @param type 等待策略类型 + * @return 等待策略类型枚举对象 */ - public static WaitStrategyEnum getWaitStrategyEnum(int backOff) { + public static WaitStrategyEnum getWaitStrategyEnum(int type) { for (WaitStrategyEnum value : WaitStrategyEnum.values()) { - if (value.backOff == backOff) { + if (value.type == type) { return value; } } // 兜底为默认等级策略 - return WaitStrategyEnum.DELAY_LEVEL; + throw new EasyRetryCommonException("等待策略类型不存在. [{}]", type); } } @@ -159,9 +146,10 @@ public class WaitStrategies { private static final class DelayLevelWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(WaitStrategyContext context) { - DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(context.getTriggerCount()); - return context.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit()); + public Long computeTriggerTime(WaitStrategyContext context) { + DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(context.getDelayLevel()); + Duration of = Duration.of(levelEnum.getTime(), levelEnum.getUnit()); + return context.getNextTriggerAt() + of.toMillis(); } } @@ -171,8 +159,8 @@ public class WaitStrategies { private static final class FixedWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) { - return retryContext.getNextTriggerAt().plusSeconds(Integer.parseInt(retryContext.getTriggerInterval())); + public Long computeTriggerTime(WaitStrategyContext retryContext) { + return retryContext.getNextTriggerAt() + Integer.parseInt(retryContext.getTriggerInterval()); } } @@ -182,12 +170,11 @@ public class WaitStrategies { private static final class CronWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(WaitStrategyContext context) { + public Long computeTriggerTime(WaitStrategyContext context) { try { - ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8)); - Date nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); - return DateUtil.toLocalDateTime(nextValidTime); + Date nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(new Date(context.getNextTriggerAt())); + return DateUtil.toEpochMilli(nextValidTime); } catch (ParseException e) { throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); } @@ -217,7 +204,7 @@ public class WaitStrategies { } @Override - public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) { + public Long computeTriggerTime(WaitStrategyContext retryContext) { if (Objects.nonNull(retryContext)) { if (maximum == 0) { @@ -228,9 +215,7 @@ public class WaitStrategies { Preconditions.checkArgument(maximum > minimum, "maximum must be > minimum but maximum is %d and minimum is", maximum, minimum); long t = Math.abs(RANDOM.nextLong()) % (maximum - minimum); - long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); - - return LocalDateTime.ofEpochSecond( (t + minimum + now) / 1000,0, ZoneOffset.ofHours(8)); + return (t + minimum + System.currentTimeMillis()) / 1000; } } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/CronUtils.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/CronUtils.java new file mode 100644 index 00000000..b973a004 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/CronUtils.java @@ -0,0 +1,49 @@ +package com.aizuda.easy.retry.server.common.util; + +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.util.CronExpression; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; + +import java.text.ParseException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * @author www.byteblogs.com + * @date 2023-11-02 22:52:10 + * @since 2.4.0 + */ +public class CronUtils { + + public static List getExecuteTimeByCron(String cron, int nums) { + + List list = new ArrayList<>(); + LocalDateTime now = LocalDateTime.now(); + for (int i = 0; i < nums; i++) { + Date nextValidTime; + try { + ZonedDateTime zdt = now.atZone(ZoneOffset.ofHours(8)); + nextValidTime = new CronExpression(cron).getNextValidTimeAfter(Date.from(zdt.toInstant())); + now = LocalDateTime.ofEpochSecond(nextValidTime.getTime() / 1000, 0, ZoneOffset.ofHours(8)); + list.add(SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS.format(now)); + } catch (ParseException ignored) { + } + } + + return list; + } + + public static long getExecuteInterval(String cron) { + List executeTimeByCron = getExecuteTimeByCron(cron, 2); + LocalDateTime first = LocalDateTime.parse(executeTimeByCron.get(0), SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS); + LocalDateTime second = LocalDateTime.parse(executeTimeByCron.get(1), SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS); + Duration duration = Duration.between(first, second); + return duration.toMillis(); + } + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/DateUtil.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/DateUtil.java new file mode 100644 index 00000000..cb571635 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/DateUtil.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.common.util; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Date; + +/** + * @author www.byteblogs.com + * @date 2023-11-02 23:42:53 + * @since 2.4.0 + */ +public class DateUtil extends cn.hutool.core.date.DateUtil { + + public static long toEpochMilli(Date date) { + return DateUtil.toLocalDateTime(date).toInstant(ZoneOffset.of("+8")).toEpochMilli(); + } + + public static long toEpochMilli(LocalDateTime date) { + return date.toInstant(ZoneOffset.of("+8")).toEpochMilli(); + } + + public static LocalDateTime toEpochMilli(long milli) { + return LocalDateTime.ofEpochSecond(milli / 1000, 0, ZoneOffset.ofHours(8)); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java index 8450464d..4e173ee5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java @@ -27,7 +27,7 @@ public class JobPartitionTask extends PartitionTask { /** * 下次触发时间 */ - private LocalDateTime nextTriggerAt; + private long nextTriggerAt; /** * 阻塞策略 1、丢弃 2、覆盖 3、并行 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java index fbb31d3a..abf82b43 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java @@ -27,7 +27,7 @@ public class JobTaskPrepareDTO { /** * 下次触发时间 */ - private LocalDateTime nextTriggerAt; + private long nextTriggerAt; /** * 阻塞策略 1、丢弃 2、覆盖 3、并行 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WaitStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WaitStrategy.java deleted file mode 100644 index 685b2cc9..00000000 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WaitStrategy.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.aizuda.easy.retry.server.job.task.support; - -import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies; - -import java.time.LocalDateTime; - -/** - * 等待策略(退避策略) - * - * @author: www.byteblogs.com - * @date : 2021-11-29 18:18 - */ -public interface WaitStrategy { - - /** - * 计算下次重试触发时间 - * - * @param context {@link WaitStrategies.WaitStrategyContext} 重试上下文 - * @return 下次触发时间 - */ - LocalDateTime computeRetryTime(WaitStrategies.WaitStrategyContext context); - -} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java index a17ba0b7..8f163eb6 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java @@ -14,7 +14,7 @@ import java.util.concurrent.TimeUnit; */ public class ResidentTaskCache { - private static final Cache cache; + private static final Cache cache; static { cache = CacheBuilder.newBuilder() @@ -23,15 +23,15 @@ public class ResidentTaskCache { .build(); } - public static void refresh(Long jobId, LocalDateTime nextTriggerTime) { + public static void refresh(Long jobId, Long nextTriggerTime) { cache.put(jobId, nextTriggerTime); } - public static LocalDateTime getOrDefault(Long jobId, LocalDateTime nextTriggerTime) { + public static Long getOrDefault(Long jobId, Long nextTriggerTime) { return Optional.ofNullable(cache.getIfPresent(jobId)).orElse(nextTriggerTime); } - public static LocalDateTime get(Long jobId) { + public static Long get(Long jobId) { return getOrDefault(jobId, null); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index d1a1a05a..428a1df3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -2,23 +2,22 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; -import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.JobExecutor; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; -import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorContext; import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorFactory; -import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies; import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; @@ -38,7 +37,6 @@ import org.springframework.transaction.support.TransactionSynchronizationManager import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; -import java.time.Duration; import java.time.LocalDateTime; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -154,20 +152,18 @@ public class JobExecutorActor extends AbstractActor { ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); - LocalDateTime preTriggerAt = ResidentTaskCache.get(job.getId()); - if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) { + Long preTriggerAt = ResidentTaskCache.get(job.getId()); + if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) { preTriggerAt = job.getNextTriggerAt(); } WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); - waitStrategyContext.setTriggerType(job.getTriggerType()); waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); waitStrategyContext.setNextTriggerAt(preTriggerAt); - LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext); + Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext); // 获取时间差的毫秒数 - Duration duration = Duration.between(preTriggerAt, nextTriggerAt); - long milliseconds = duration.toMillis(); + long milliseconds = nextTriggerAt - preTriggerAt; log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000); job.setNextTriggerAt(nextTriggerAt); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index db13fb64..72ddd09e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -2,25 +2,22 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; -import cn.hutool.core.date.DateUnit; -import cn.hutool.core.date.DateUtil; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; -import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; -import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask; -import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -32,12 +29,10 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.time.LocalDateTime; import java.util.Collections; import java.util.List; import java.util.Objects; -import static com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.*; /** * JOB任务扫描 @@ -93,16 +88,16 @@ public class ScanJobTaskActor extends AbstractActor { job.setId(partitionTask.getId()); boolean triggerTask = true; - LocalDateTime nextTriggerAt = ResidentTaskCache.get(partitionTask.getId()); - if (needCalculateNextTriggerTime(partitionTask, nextTriggerAt)) { + Long nextTriggerAt = ResidentTaskCache.get(partitionTask.getId()); + if (needCalculateNextTriggerTime(partitionTask)) { // 更新下次触发时间 - nextTriggerAt = calculateNextTriggerTime(partitionTask); + nextTriggerAt = calculateNextTriggerTime(partitionTask); } else { // 若常驻任务的缓存时间为空则触发一次任务调度,说明常驻任务长时间未更新或者是系统刚刚启动 triggerTask = Objects.isNull(nextTriggerAt); // 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now - LocalDateTime now = LocalDateTime.now(); - if (Objects.isNull(nextTriggerAt) || nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) { + long now = System.currentTimeMillis(); + if (Objects.isNull(nextTriggerAt) || (nextTriggerAt + SystemConstants.SCHEDULE_PERIOD * 1000) < now) { nextTriggerAt = now; } } @@ -124,26 +119,25 @@ public class ScanJobTaskActor extends AbstractActor { * 2、常驻任务缓存的触发任务为空 * 3、常驻任务中的触发时间不是最新的 */ - private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) { + private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) { return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()); } - private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) { - // 更新下次触发时间 - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType()); - WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); - waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); - waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); + private Long calculateNextTriggerTime(JobPartitionTask partitionTask) { - LocalDateTime now = LocalDateTime.now(); - LocalDateTime nextTriggerAt = partitionTask.getNextTriggerAt(); - if (nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) { + long now = System.currentTimeMillis(); + long nextTriggerAt = partitionTask.getNextTriggerAt(); + if ((nextTriggerAt + SystemConstants.SCHEDULE_PERIOD * 1000) < now) { nextTriggerAt = now; } + // 更新下次触发时间 + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType()); + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); waitStrategyContext.setNextTriggerAt(nextTriggerAt); - return waitStrategy.computeRetryTime(waitStrategyContext); + return waitStrategy.computeTriggerTime(waitStrategyContext); } private List listAvailableJobs(Long startId, ScanTask scanTask) { @@ -155,7 +149,7 @@ public class ScanJobTaskActor extends AbstractActor { new LambdaQueryWrapper() .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) .in(Job::getBucketIndex, scanTask.getBuckets()) - .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD)) + .le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000) .eq(Job::getDeleted, StatusEnum.NO.getStatus()) .ge(Job::getId, startId) ).getRecords(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java deleted file mode 100644 index b592d539..00000000 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.aizuda.easy.retry.server.job.task.support.strategy; - -import com.aizuda.easy.retry.common.core.util.CronExpression; -import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.Getter; - -import java.text.ParseException; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.util.Date; - -/** - * 生成 {@link WaitStrategy} 实例. - * - * @author: www.byteblogs.com - * @date : 2021-11-29 18:19 - */ -public class WaitStrategies { - - private WaitStrategies() { - } - - @Data - public static class WaitStrategyContext { - /** - * 触发类型 1.CRON 表达式 2. 固定时间 - */ - private Integer triggerType; - - /** - * 间隔时长 - */ - private String triggerInterval; - - /** - * 下次触发时间 - */ - private LocalDateTime nextTriggerAt; - - } - - @Getter - @AllArgsConstructor - public enum WaitStrategyEnum { - CRON(1, cronWait()), - FIXED(2, fixedWait()), - ; - - private final int triggerType; - private final WaitStrategy waitStrategy; - - - /** - * 获取退避策略 - * - * @param triggerType 触发类型 - * @return 退避策略 - */ - public static WaitStrategy getWaitStrategy(int triggerType) { - - WaitStrategyEnum waitStrategy = getWaitStrategyEnum(triggerType); - switch (waitStrategy) { - case CRON: - return cronWait(); - case FIXED: - return fixedWait(); - default: - return waitStrategy.waitStrategy; - } - - } - - /** - * 获取退避策略枚举对象 - * - * @param backOff 退避策略 - * @return 退避策略枚举对象 - */ - public static WaitStrategyEnum getWaitStrategyEnum(int backOff) { - - for (WaitStrategyEnum value : WaitStrategyEnum.values()) { - if (value.triggerType == backOff) { - return value; - } - } - - // TODO 兜底为默认等级策略 - return null; - } - - } - - - /** - * 固定定时间等待策略 - * - * @return {@link FixedWaitStrategy} 固定定时间等待策略 - */ - public static WaitStrategy fixedWait() { - return new FixedWaitStrategy(); - } - - /** - * cron等待策略 - * - * @return {@link CronWaitStrategy} cron等待策略 - */ - public static WaitStrategy cronWait() { - return new CronWaitStrategy(); - } - - /** - * 固定定时间等待策略 - */ - private static final class FixedWaitStrategy implements WaitStrategy { - - @Override - public LocalDateTime computeRetryTime(WaitStrategyContext context) { - long triggerInterval = Long.parseLong(context.triggerInterval); - - - return context.nextTriggerAt.plusSeconds(triggerInterval); - } - } - - /** - * Cron等待策略 - */ - private static final class CronWaitStrategy implements WaitStrategy { - - @Override - public LocalDateTime computeRetryTime(WaitStrategyContext context) { - - Date nextValidTime; - try { - ZonedDateTime zdt = context.nextTriggerAt.atZone(ZoneOffset.ofHours(8)); - nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); - } catch (ParseException e) { - throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); - } - - return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8)); - } - } - -} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java index 6ef72e0e..d60fd8e1 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java @@ -1,36 +1,16 @@ package com.aizuda.easy.retry.server.job.task.support.timer; import akka.actor.ActorRef; -import cn.hutool.core.lang.Assert; -import com.aizuda.easy.retry.common.core.context.SpringContext; -import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; -import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; -import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; -import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; -import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; -import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; -import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.util.Timeout; import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.time.LocalDateTime; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * @author www.byteblogs.com * @date 2023-10-20 23:09:13 diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java index cf81239c..5d22c2f2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java @@ -14,10 +14,10 @@ import com.aizuda.easy.retry.server.common.generator.id.IdGenerator; import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext.TaskInfo; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum; +import com.aizuda.easy.retry.server.common.WaitStrategy; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.access.TaskAccess; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; @@ -134,9 +134,9 @@ public abstract class AbstractGenerator implements TaskGenerator { WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setNextTriggerAt(now); waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); - waitStrategyContext.setTriggerCount(1); + waitStrategyContext.setDelayLevel(1); WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); - retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); + retryTask.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext)); waitInsertTasks.add(retryTask); // 初始化日志 @@ -185,7 +185,7 @@ public abstract class AbstractGenerator implements TaskGenerator { sceneConfig.setGroupName(groupName); sceneConfig.setSceneName(sceneName); sceneConfig.setSceneStatus(StatusEnum.YES.getStatus()); - sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getBackOff()); + sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()); sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel()); sceneConfig.setDescription("自动初始化场景"); Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig), () -> new EasyRetryServerException("init scene error")); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java index f46c5f0d..a6bef813 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java @@ -15,7 +15,7 @@ import com.aizuda.easy.retry.server.retry.task.service.RetryDeadLetterConverter; import com.aizuda.easy.retry.server.retry.task.service.RetryService; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.access.ConfigAccess; import com.aizuda.easy.retry.template.datasource.access.TaskAccess; @@ -103,7 +103,7 @@ public class RetryServiceImpl implements RetryService { } retryTask.setNextTriggerAt( - WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null)); Assert.isTrue(1 == retryTaskAccess.insert(retryTaskDTO.getGroupName(), retryTask), () -> new EasyRetryServerException("failed to report data")); @@ -130,7 +130,7 @@ public class RetryServiceImpl implements RetryService { sceneConfig.setGroupName(retryTaskDTO.getGroupName()); sceneConfig.setSceneName(retryTaskDTO.getSceneName()); sceneConfig.setSceneStatus(StatusEnum.YES.getStatus()); - sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getBackOff()); + sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType()); sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel()); sceneConfig.setDescription("自动初始化场景"); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryContext.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryContext.java index 71986169..3182fcba 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryContext.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryContext.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.server.retry.task.support; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/CallbackRetryContext.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/CallbackRetryContext.java index 369e0307..c2b73829 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/CallbackRetryContext.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/CallbackRetryContext.java @@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.context; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import lombok.Data; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java index b1e9b40e..e8c23c94 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/context/MaxAttemptsPersistenceRetryContext.java @@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.context; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import lombok.Data; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java index 307cc8ec..8d89d0c2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java @@ -6,7 +6,6 @@ import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java index 358835bb..cfc1c7d6 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java @@ -2,11 +2,10 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask; import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext; import io.netty.util.TimerTask; @@ -59,13 +58,13 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask) { long triggerInterval = systemProperties.getCallback().getTriggerInterval(); - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getBackOff()); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType()); WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt()); waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval)); // 更新触发时间, 任务进入时间轮 - return waitStrategy.computeRetryTime(waitStrategyContext); + return waitStrategy.computeTriggerTime(waitStrategyContext); } @Override diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index c95625f9..b8a16218 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -2,11 +2,12 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.util.DateUtil; import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext; import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; @@ -16,6 +17,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.time.Duration; import java.time.LocalDateTime; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -69,12 +71,12 @@ public class ScanRetryTaskActor extends AbstractScanGroup { nextTriggerAt = now; } - waitStrategyContext.setNextTriggerAt(nextTriggerAt); + waitStrategyContext.setNextTriggerAt(DateUtil.toEpochMilli(nextTriggerAt)); waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); - waitStrategyContext.setTriggerCount(partitionTask.getRetryCount() + 1); + waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1); // 更新触发时间, 任务进入时间轮 WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); - return waitStrategy.computeRetryTime(waitStrategyContext); + return DateUtil.toEpochMilli(waitStrategy.computeTriggerTime(waitStrategyContext)); } @Override diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java index 2ea76de5..b4ef4c0d 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java @@ -4,13 +4,13 @@ import akka.actor.ActorRef; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import org.springframework.stereotype.Component; @@ -62,7 +62,7 @@ public class CallbackTaskExecutor extends AbstractTaskExecutor { private WaitStrategy getWaitWaitStrategy() { // 回调失败每15min重试一次 - return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getType()); } @Override diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java index d1cab008..e5ebee2c 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java @@ -7,13 +7,13 @@ import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import org.springframework.stereotype.Component; @@ -71,7 +71,7 @@ public class ManualCallbackTaskExecutor extends AbstractTaskExecutor { private WaitStrategy getWaitWaitStrategy() { // 回调失败每15min重试一次 - return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getType()); } @Override diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java index 1f27b828..6c51d31a 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java @@ -8,13 +8,13 @@ import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import org.springframework.stereotype.Component; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java index bf6fba94..9e28c70a 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java @@ -5,13 +5,13 @@ import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import org.springframework.stereotype.Component; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java index 31ad604c..bba4a183 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java @@ -6,9 +6,10 @@ import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.DateUtil; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; @@ -19,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.Instant; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; @@ -50,7 +52,7 @@ public class CallbackRetryTaskHandler { @Transactional public void create(RetryTask retryTask) { if (!TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) { - return; + return; } RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask); @@ -62,10 +64,11 @@ public class CallbackRetryTaskHandler { callbackRetryTask.setCreateDt(LocalDateTime.now()); callbackRetryTask.setUpdateDt(LocalDateTime.now()); - callbackRetryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + Long triggerTime = WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null); + callbackRetryTask.setNextTriggerAt(DateUtil.toLocalDateTime(Instant.ofEpochMilli(triggerTime))); Assert.isTrue(1 == accessTemplate.getRetryTaskAccess() - .insert(callbackRetryTask.getGroupName(), callbackRetryTask), + .insert(callbackRetryTask.getGroupName(), callbackRetryTask), () -> new EasyRetryServerException("failed to report data")); // 初始化回调日志 @@ -73,8 +76,8 @@ public class CallbackRetryTaskHandler { // 记录重试日志 retryTaskLog.setTaskType(TaskTypeEnum.CALLBACK.getType()); retryTaskLog.setCreateDt(LocalDateTime.now()); - Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), - () -> new EasyRetryServerException("新增重试日志失败")); + Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), + () -> new EasyRetryServerException("新增重试日志失败")); } @@ -87,7 +90,7 @@ public class CallbackRetryTaskHandler { public String generatorCallbackUniqueId(String uniqueId) { // eg: CB_202307180949471 FormattingTuple callbackUniqueId = MessageFormatter.arrayFormat(CALLBACK_UNIQUE_ID_RULE, - new Object[]{systemProperties.getCallback().getPrefix(), uniqueId}); + new Object[]{systemProperties.getCallback().getPrefix(), uniqueId}); return callbackUniqueId.getMessage(); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryBuilder.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryBuilder.java index 71298e4a..7da1b2b5 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryBuilder.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryBuilder.java @@ -4,7 +4,7 @@ import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.StopStrategy; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import org.springframework.util.CollectionUtils; import java.util.*; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java index d888866d..97984913 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java @@ -6,7 +6,7 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.StopStrategy; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.WaitStrategy; import lombok.extern.slf4j.Slf4j; import java.util.List; diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 6cada535..b53d7dce 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -4,13 +4,12 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.enums.StatusEnum; -import com.aizuda.easy.retry.common.core.util.CronExpression; +import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.util.CronUtils; import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; -import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext; -import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.web.model.base.PageResult; import com.aizuda.easy.retry.server.web.model.request.JobQueryVO; import com.aizuda.easy.retry.server.web.model.request.JobRequestVO; @@ -27,14 +26,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.text.ParseException; -import java.time.Duration; import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -91,28 +84,14 @@ public class JobServiceImpl implements JobService { @Override public List getTimeByCron(String cron) { - - List list = new ArrayList<>(); - LocalDateTime now = LocalDateTime.now(); - for (int i = 0; i < 5; i++) { - Date nextValidTime; - try { - ZonedDateTime zdt = now.atZone(ZoneOffset.ofHours(8)); - nextValidTime = new CronExpression(cron).getNextValidTimeAfter(Date.from(zdt.toInstant())); - now = LocalDateTime.ofEpochSecond(nextValidTime.getTime() / 1000, 0, ZoneOffset.ofHours(8)); - list.add(dateTimeFormatter.format(now)); - } catch (ParseException ignored) { - } - } - - return list; + return CronUtils.getExecuteTimeByCron(cron, 5); } @Override public List getJobNameList(String keywords, Long jobId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .select(Job::getId, Job::getJobName); + .select(Job::getId, Job::getJobName); if (StrUtil.isNotBlank(keywords)) { queryWrapper.like(Job::getJobName, keywords.trim() + "%"); } @@ -132,8 +111,8 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) - % systemProperties.getBucketTotal()); - calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); + % systemProperties.getBucketTotal()); + job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now())); return 1 == jobMapper.insert(job); } @@ -148,17 +127,17 @@ public class JobServiceImpl implements JobService { Job updateJob = updateJobResident(jobRequestVO); // 非常驻任务 > 非常驻任务 if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), - StatusEnum.NO.getStatus())) { + StatusEnum.NO.getStatus())) { updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now())); } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals( - updateJob.getResident(), StatusEnum.NO.getStatus())) { + updateJob.getResident(), StatusEnum.NO.getStatus())) { // 常驻任务的触发时间 LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())) - .orElse(LocalDateTime.now()); + .orElse(LocalDateTime.now()); updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time)); // 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间 } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals( - updateJob.getResident(), StatusEnum.YES.getStatus())) { + updateJob.getResident(), StatusEnum.YES.getStatus())) { updateJob.setNextTriggerAt(LocalDateTime.now()); } @@ -166,29 +145,23 @@ public class JobServiceImpl implements JobService { } private static LocalDateTime calculateNextTriggerAt(final JobRequestVO jobRequestVO, LocalDateTime time) { - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); - WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); - waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType()); + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); waitStrategyContext.setNextTriggerAt(time); - return waitStrategy.computeRetryTime(waitStrategyContext); + return waitStrategy.computeTriggerTime(waitStrategyContext); } @Override public Job updateJobResident(JobRequestVO jobRequestVO) { Job job = JobConverter.INSTANCE.toJob(jobRequestVO); job.setResident(StatusEnum.NO.getStatus()); - if (jobRequestVO.getTriggerType() == WaitStrategyEnum.FIXED.getTriggerType()) { + if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) { if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) { job.setResident(StatusEnum.YES.getStatus()); } - } else if (jobRequestVO.getTriggerType() == WaitStrategyEnum.CRON.getTriggerType()) { - List timeByCron = getTimeByCron(jobRequestVO.getTriggerInterval()); - LocalDateTime first = LocalDateTime.parse(timeByCron.get(0), dateTimeFormatter); - LocalDateTime second = LocalDateTime.parse(timeByCron.get(1), dateTimeFormatter); - Duration duration = Duration.between(first, second); - long milliseconds = duration.toMillis(); - if (milliseconds < 10 * 1000) { + } else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) { + if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) { job.setResident(StatusEnum.YES.getStatus()); } } else { diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryDeadLetterServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryDeadLetterServiceImpl.java index 98134f84..eab779cd 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryDeadLetterServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryDeadLetterServiceImpl.java @@ -6,7 +6,7 @@ import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.web.service.RetryDeadLetterService; import com.aizuda.easy.retry.server.web.service.convert.RetryDeadLetterResponseVOConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; @@ -107,7 +107,7 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService { RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter); retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); retryTask.setTaskType(TaskTypeEnum.RETRY.getType()); - retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null)); retryTask.setCreateDt(LocalDateTime.now()); waitRollbackList.add(retryTask); } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java index bad62e47..d75b0458 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java @@ -19,7 +19,7 @@ import com.aizuda.easy.retry.server.retry.task.generator.task.TaskGenerator; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.web.model.base.PageResult; import com.aizuda.easy.retry.server.web.model.request.BatchDeleteRetryTaskVO; import com.aizuda.easy.retry.server.web.model.request.GenerateRetryIdempotentIdVO; @@ -47,13 +47,10 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; -import org.springframework.http.HttpEntity; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; -import org.springframework.web.client.RestTemplate; -import java.text.MessageFormat; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; @@ -158,7 +155,7 @@ public class RetryTaskServiceImpl implements RetryTaskService { // 若恢复重试则需要重新计算下次触发时间 if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) { retryTask.setNextTriggerAt( - WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null)); } if (RetryStatusEnum.FINISH.getStatus().equals(retryStatusEnum.getStatus())) { diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/SceneConfigServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/SceneConfigServiceImpl.java index cfea9f16..bba589b8 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/SceneConfigServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/SceneConfigServiceImpl.java @@ -4,6 +4,8 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.CronUtils; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.web.model.base.PageResult; import com.aizuda.easy.retry.server.web.model.request.SceneConfigQueryVO; import com.aizuda.easy.retry.server.web.model.request.SceneConfigRequestVO; @@ -17,11 +19,13 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.List; +import java.util.Optional; /** * @author: www.byteblogs.com @@ -64,6 +68,9 @@ public class SceneConfigServiceImpl implements SceneConfigService { @Override public Boolean saveSceneConfig(SceneConfigRequestVO requestVO) { + + checkExecuteInterval(requestVO); + SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO); sceneConfig.setCreateDt(LocalDateTime.now()); ConfigAccess sceneConfigAccess = accessTemplate.getSceneConfigAccess(); @@ -72,10 +79,25 @@ public class SceneConfigServiceImpl implements SceneConfigService { return Boolean.TRUE; } + private static void checkExecuteInterval(SceneConfigRequestVO requestVO) { + if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(), WaitStrategies.WaitStrategyEnum.RANDOM.getType()) + .contains(requestVO.getBackOff())) { + if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) { + throw new EasyRetryServerException("间隔时间不得小于10"); + } + } else if (requestVO.getBackOff() == WaitStrategies.WaitStrategyEnum.CRON.getType()) { + if (CronUtils.getExecuteInterval(requestVO.getTriggerInterval()) < 10 * 1000) { + throw new EasyRetryServerException("间隔时间不得小于10"); + } + } + } + + @Override public Boolean updateSceneConfig(SceneConfigRequestVO requestVO) { + checkExecuteInterval(requestVO); SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO); - + sceneConfig.setTriggerInterval(Optional.ofNullable(sceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(sceneConfig, new LambdaUpdateWrapper() .eq(SceneConfig::getGroupName, sceneConfig.getGroupName()) diff --git a/frontend/src/views/job/JobInfo.vue b/frontend/src/views/job/JobInfo.vue index 11b7d6b9..22083f98 100644 --- a/frontend/src/views/job/JobInfo.vue +++ b/frontend/src/views/job/JobInfo.vue @@ -66,7 +66,7 @@ - {{ jobInfo.argsStr }} + {{ jobInfo.taskType === 3 ? JSON.parse(jobInfo.argsStr).map((item, index) => `分区:${index}=>${item}`).join('; ') : jobInfo.argsStr }} {{ jobInfo.extAttrs }} diff --git a/frontend/src/views/job/from/JobFrom.vue b/frontend/src/views/job/from/JobFrom.vue index 40e358a5..1d04c2be 100644 --- a/frontend/src/views/job/from/JobFrom.vue +++ b/frontend/src/views/job/from/JobFrom.vue @@ -357,7 +357,8 @@ export default { visible: false, count: 0, triggerTypeValue: '2', - taskTypeValue: '1' + taskTypeValue: '1', + argsStrValue: [] } }, beforeCreate () { @@ -432,35 +433,28 @@ export default { const taskType = this.form.getFieldValue('taskType') if (taskType === '3') { this.visible = !this.visible - + const { form } = this if (this.formType === 'create') { return } - const argsStr = this.form.getFieldValue('argsStr') + form.setFieldsValue({ + argsStr: '' + }) - console.log(argsStr.includes('#=@')) - if (!argsStr.includes('#=@')) { + console.log(this.argsStrValue) + if (this.argsStrValue.length === 0) { return } // 将字符串分割成键值对数组 - const keyValuePairs = argsStr.split('#;@') - console.log(keyValuePairs) - const restoredArray = keyValuePairs.map(pair => { - const [index, value] = pair.split('#=@') - console.log(value) + const keys = this.argsStrValue.map((item, index) => { this.count++ - return Number.parseInt(index) + this.dynamicForm.getFieldDecorator(`sharding[${index}]`, { initialValue: item, preserve: true }) + return index }) - this.dynamicForm.getFieldDecorator('keys', { initialValue: restoredArray, preserve: true }) - - keyValuePairs.map(pair => { - const [index, value] = pair.split('#=@') - this.dynamicForm.getFieldDecorator(`sharding[${index}]`, { initialValue: value, preserve: true }) - return value - }) + this.dynamicForm.getFieldDecorator('keys', { initialValue: keys, preserve: true }) } }, getCron (cron) { @@ -472,13 +466,10 @@ export default { const { form } = this e.preventDefault() this.dynamicForm.validateFields((err, values) => { - console.log() if (!err) { - console.log(values) - const arr = values['sharding'] - const formattedString = arr.map((item, index) => `${index}#=@${item}`).join('#;@') + this.argsStrValue = values['sharding'] form.setFieldsValue({ - argsStr: formattedString + argsStr: this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join('; ') }) this.visible = false } @@ -491,6 +482,10 @@ export default { e.preventDefault() this.form.validateFields((err, values) => { if (!err) { + if (this.taskTypeValue === '3') { + values['argsStr'] = this.argsStrValue + } + if (this.formType === 'create') { saveJob(values).then(res => { this.$message.success('任务新增完成') @@ -523,6 +518,8 @@ export default { formData.blockStrategy = formData.blockStrategy.toString() formData.triggerType = formData.triggerType.toString() this.triggerTypeValue = formData.triggerType + this.argsStrValue = JSON.parse(formData.argsStr) + formData.argsStr = this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join(';') form.setFieldsValue(formData) }) }, diff --git a/frontend/src/views/task/SceneList.vue b/frontend/src/views/task/SceneList.vue index b56f1b38..15d18336 100644 --- a/frontend/src/views/task/SceneList.vue +++ b/frontend/src/views/task/SceneList.vue @@ -64,7 +64,7 @@ - {{ text ? text : '无' }} + {{ text ? text : '10s,15s,30s,35s,40s,50s,1m,2m,4m,6m,8m,10m,20m,40m,1h,2h,3h,4h,5h,6h,7h,8h,9h,10h,11h,12h' }}