feat:2.4.0

1. 优化JOB的触发时间wei long
2. 抽离WaitStrategy为公共组件
This commit is contained in:
byteblogs168 2023-11-03 00:07:50 +08:00
parent a128b62ea6
commit a9b5ba3ffd
37 changed files with 257 additions and 409 deletions

View File

@ -222,7 +222,7 @@ CREATE TABLE `job` (
`job_name` varchar(64) NOT NULL COMMENT '名称',
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
`args_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '参数类型 ',
`next_trigger_at` datetime NOT NULL COMMENT '下次触发时间',
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
`job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启',
`task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片',
`route_key` tinyint(4) NOT NULL DEFAULT '4' COMMENT '路由策略',

View File

@ -57,7 +57,7 @@ public class Job implements Serializable {
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
private Long nextTriggerAt;
/**
* 重试状态 0关闭1开启

View File

@ -1,8 +1,6 @@
package com.aizuda.easy.retry.server.retry.task.support;
package com.aizuda.easy.retry.server.common;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
import java.time.LocalDateTime;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
/**
* 等待策略退避策略
@ -18,6 +16,6 @@ public interface WaitStrategy {
* @param waitStrategyContext {@link WaitStrategyContext} 重试上下文
* @return 下次触发时间
*/
LocalDateTime computeRetryTime(WaitStrategyContext waitStrategyContext);
Long computeTriggerTime(WaitStrategyContext waitStrategyContext);
}

View File

@ -1,18 +1,17 @@
package com.aizuda.easy.retry.server.retry.task.support.strategy;
package com.aizuda.easy.retry.server.common.strategy;
import cn.hutool.core.date.DateUtil;
import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException;
import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.util.DateUtil;
import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.Getter;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.Duration;
import java.util.Date;
import java.util.Objects;
import java.util.Random;
@ -31,10 +30,6 @@ public class WaitStrategies {
@Data
public static class WaitStrategyContext {
// /**
// * 触发类型 1.CRON 表达式 2. 固定时间
// */
// private Integer triggerType;
/**
* 间隔时长
@ -44,12 +39,13 @@ public class WaitStrategies {
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
private long nextTriggerAt;
/**
* 触发次数
* 延迟等级
* 仅在选择 DELAY_LEVEL时使用 {@link DelayLevelEnum}
*/
private Integer triggerCount;
private Integer delayLevel;
}
@ -60,11 +56,11 @@ public class WaitStrategies {
CRON(3, cronWait()),
RANDOM(4, randomWait());
private final int backOff;
private final int type;
private final WaitStrategy waitStrategy;
WaitStrategyEnum(int backOff, WaitStrategy waitStrategy) {
this.backOff = backOff;
WaitStrategyEnum(int type, WaitStrategy waitStrategy) {
this.type = type;
this.waitStrategy = waitStrategy;
}
@ -75,35 +71,26 @@ public class WaitStrategies {
* @return 退避策略
*/
public static WaitStrategy getWaitStrategy(int backOff) {
WaitStrategyEnum waitStrategy = getWaitStrategyEnum(backOff);
switch (waitStrategy) {
case RANDOM:
return randomWait();
case CRON:
return cronWait();
default:
return waitStrategy.waitStrategy;
}
return getWaitStrategyEnum(backOff).getWaitStrategy();
}
/**
* 获取退避策略枚举对象
* 获取等待策略类型枚举对象
*
* @param backOff 退避策略
* @return 退避策略枚举对象
* @param type 等待策略类型
* @return 等待策略类型枚举对象
*/
public static WaitStrategyEnum getWaitStrategyEnum(int backOff) {
public static WaitStrategyEnum getWaitStrategyEnum(int type) {
for (WaitStrategyEnum value : WaitStrategyEnum.values()) {
if (value.backOff == backOff) {
if (value.type == type) {
return value;
}
}
// 兜底为默认等级策略
return WaitStrategyEnum.DELAY_LEVEL;
throw new EasyRetryCommonException("等待策略类型不存在. [{}]", type);
}
}
@ -159,9 +146,10 @@ public class WaitStrategies {
private static final class DelayLevelWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(context.getTriggerCount());
return context.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit());
public Long computeTriggerTime(WaitStrategyContext context) {
DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(context.getDelayLevel());
Duration of = Duration.of(levelEnum.getTime(), levelEnum.getUnit());
return context.getNextTriggerAt() + of.toMillis();
}
}
@ -171,8 +159,8 @@ public class WaitStrategies {
private static final class FixedWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) {
return retryContext.getNextTriggerAt().plusSeconds(Integer.parseInt(retryContext.getTriggerInterval()));
public Long computeTriggerTime(WaitStrategyContext retryContext) {
return retryContext.getNextTriggerAt() + Integer.parseInt(retryContext.getTriggerInterval());
}
}
@ -182,12 +170,11 @@ public class WaitStrategies {
private static final class CronWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
public Long computeTriggerTime(WaitStrategyContext context) {
try {
ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8));
Date nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));
return DateUtil.toLocalDateTime(nextValidTime);
Date nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(new Date(context.getNextTriggerAt()));
return DateUtil.toEpochMilli(nextValidTime);
} catch (ParseException e) {
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);
}
@ -217,7 +204,7 @@ public class WaitStrategies {
}
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) {
public Long computeTriggerTime(WaitStrategyContext retryContext) {
if (Objects.nonNull(retryContext)) {
if (maximum == 0) {
@ -228,9 +215,7 @@ public class WaitStrategies {
Preconditions.checkArgument(maximum > minimum, "maximum must be > minimum but maximum is %d and minimum is", maximum, minimum);
long t = Math.abs(RANDOM.nextLong()) % (maximum - minimum);
long now = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
return LocalDateTime.ofEpochSecond( (t + minimum + now) / 1000,0, ZoneOffset.ofHours(8));
return (t + minimum + System.currentTimeMillis()) / 1000;
}
}
}

View File

@ -0,0 +1,49 @@
package com.aizuda.easy.retry.server.common.util;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import java.text.ParseException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2023-11-02 22:52:10
* @since 2.4.0
*/
public class CronUtils {
public static List<String> getExecuteTimeByCron(String cron, int nums) {
List<String> list = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < nums; i++) {
Date nextValidTime;
try {
ZonedDateTime zdt = now.atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(cron).getNextValidTimeAfter(Date.from(zdt.toInstant()));
now = LocalDateTime.ofEpochSecond(nextValidTime.getTime() / 1000, 0, ZoneOffset.ofHours(8));
list.add(SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS.format(now));
} catch (ParseException ignored) {
}
}
return list;
}
public static long getExecuteInterval(String cron) {
List<String> executeTimeByCron = getExecuteTimeByCron(cron, 2);
LocalDateTime first = LocalDateTime.parse(executeTimeByCron.get(0), SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS);
LocalDateTime second = LocalDateTime.parse(executeTimeByCron.get(1), SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS);
Duration duration = Duration.between(first, second);
return duration.toMillis();
}
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.common.util;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
/**
* @author www.byteblogs.com
* @date 2023-11-02 23:42:53
* @since 2.4.0
*/
public class DateUtil extends cn.hutool.core.date.DateUtil {
public static long toEpochMilli(Date date) {
return DateUtil.toLocalDateTime(date).toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
public static long toEpochMilli(LocalDateTime date) {
return date.toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
public static LocalDateTime toEpochMilli(long milli) {
return LocalDateTime.ofEpochSecond(milli / 1000, 0, ZoneOffset.ofHours(8));
}
}

View File

@ -27,7 +27,7 @@ public class JobPartitionTask extends PartitionTask {
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
private long nextTriggerAt;
/**
* 阻塞策略 1丢弃 2覆盖 3并行

View File

@ -27,7 +27,7 @@ public class JobTaskPrepareDTO {
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
private long nextTriggerAt;
/**
* 阻塞策略 1丢弃 2覆盖 3并行

View File

@ -1,23 +0,0 @@
package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies;
import java.time.LocalDateTime;
/**
* 等待策略退避策略
*
* @author: www.byteblogs.com
* @date : 2021-11-29 18:18
*/
public interface WaitStrategy {
/**
* 计算下次重试触发时间
*
* @param context {@link WaitStrategies.WaitStrategyContext} 重试上下文
* @return 下次触发时间
*/
LocalDateTime computeRetryTime(WaitStrategies.WaitStrategyContext context);
}

View File

@ -14,7 +14,7 @@ import java.util.concurrent.TimeUnit;
*/
public class ResidentTaskCache {
private static final Cache<Long, LocalDateTime> cache;
private static final Cache<Long, Long/*ms*/> cache;
static {
cache = CacheBuilder.newBuilder()
@ -23,15 +23,15 @@ public class ResidentTaskCache {
.build();
}
public static void refresh(Long jobId, LocalDateTime nextTriggerTime) {
public static void refresh(Long jobId, Long nextTriggerTime) {
cache.put(jobId, nextTriggerTime);
}
public static LocalDateTime getOrDefault(Long jobId, LocalDateTime nextTriggerTime) {
public static Long getOrDefault(Long jobId, Long nextTriggerTime) {
return Optional.ofNullable(cache.getIfPresent(jobId)).orElse(nextTriggerTime);
}
public static LocalDateTime get(Long jobId) {
public static Long get(Long jobId) {
return getOrDefault(jobId, null);
}

View File

@ -2,23 +2,22 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
@ -38,7 +37,6 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -154,20 +152,18 @@ public class JobExecutorActor extends AbstractActor {
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
LocalDateTime preTriggerAt = ResidentTaskCache.get(job.getId());
if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) {
Long preTriggerAt = ResidentTaskCache.get(job.getId());
if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) {
preTriggerAt = job.getNextTriggerAt();
}
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerType(job.getTriggerType());
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(preTriggerAt);
LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext);
Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext);
// 获取时间差的毫秒数
Duration duration = Duration.between(preTriggerAt, nextTriggerAt);
long milliseconds = duration.toMillis();
long milliseconds = nextTriggerAt - preTriggerAt;
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000);
job.setNextTriggerAt(nextTriggerAt);

View File

@ -2,25 +2,22 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -32,12 +29,10 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.*;
/**
* JOB任务扫描
@ -93,16 +88,16 @@ public class ScanJobTaskActor extends AbstractActor {
job.setId(partitionTask.getId());
boolean triggerTask = true;
LocalDateTime nextTriggerAt = ResidentTaskCache.get(partitionTask.getId());
if (needCalculateNextTriggerTime(partitionTask, nextTriggerAt)) {
Long nextTriggerAt = ResidentTaskCache.get(partitionTask.getId());
if (needCalculateNextTriggerTime(partitionTask)) {
// 更新下次触发时间
nextTriggerAt = calculateNextTriggerTime(partitionTask);
nextTriggerAt = calculateNextTriggerTime(partitionTask);
} else {
// 若常驻任务的缓存时间为空则触发一次任务调度说明常驻任务长时间未更新或者是系统刚刚启动
triggerTask = Objects.isNull(nextTriggerAt);
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
LocalDateTime now = LocalDateTime.now();
if (Objects.isNull(nextTriggerAt) || nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) {
long now = System.currentTimeMillis();
if (Objects.isNull(nextTriggerAt) || (nextTriggerAt + SystemConstants.SCHEDULE_PERIOD * 1000) < now) {
nextTriggerAt = now;
}
}
@ -124,26 +119,25 @@ public class ScanJobTaskActor extends AbstractActor {
* 2常驻任务缓存的触发任务为空
* 3常驻任务中的触发时间不是最新的
*/
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) {
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) {
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
}
private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) {
// 更新下次触发时间
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(partitionTask.getTriggerType());
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
private Long calculateNextTriggerTime(JobPartitionTask partitionTask) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime nextTriggerAt = partitionTask.getNextTriggerAt();
if (nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) {
long now = System.currentTimeMillis();
long nextTriggerAt = partitionTask.getNextTriggerAt();
if ((nextTriggerAt + SystemConstants.SCHEDULE_PERIOD * 1000) < now) {
nextTriggerAt = now;
}
// 更新下次触发时间
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
return waitStrategy.computeRetryTime(waitStrategyContext);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
private List<JobPartitionTask> listAvailableJobs(Long startId, ScanTask scanTask) {
@ -155,7 +149,7 @@ public class ScanJobTaskActor extends AbstractActor {
new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))
.le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000)
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.ge(Job::getId, startId)
).getRecords();

View File

@ -1,150 +0,0 @@
package com.aizuda.easy.retry.server.job.task.support.strategy;
import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
/**
* 生成 {@link WaitStrategy} 实例.
*
* @author: www.byteblogs.com
* @date : 2021-11-29 18:19
*/
public class WaitStrategies {
private WaitStrategies() {
}
@Data
public static class WaitStrategyContext {
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
}
@Getter
@AllArgsConstructor
public enum WaitStrategyEnum {
CRON(1, cronWait()),
FIXED(2, fixedWait()),
;
private final int triggerType;
private final WaitStrategy waitStrategy;
/**
* 获取退避策略
*
* @param triggerType 触发类型
* @return 退避策略
*/
public static WaitStrategy getWaitStrategy(int triggerType) {
WaitStrategyEnum waitStrategy = getWaitStrategyEnum(triggerType);
switch (waitStrategy) {
case CRON:
return cronWait();
case FIXED:
return fixedWait();
default:
return waitStrategy.waitStrategy;
}
}
/**
* 获取退避策略枚举对象
*
* @param backOff 退避策略
* @return 退避策略枚举对象
*/
public static WaitStrategyEnum getWaitStrategyEnum(int backOff) {
for (WaitStrategyEnum value : WaitStrategyEnum.values()) {
if (value.triggerType == backOff) {
return value;
}
}
// TODO 兜底为默认等级策略
return null;
}
}
/**
* 固定定时间等待策略
*
* @return {@link FixedWaitStrategy} 固定定时间等待策略
*/
public static WaitStrategy fixedWait() {
return new FixedWaitStrategy();
}
/**
* cron等待策略
*
* @return {@link CronWaitStrategy} cron等待策略
*/
public static WaitStrategy cronWait() {
return new CronWaitStrategy();
}
/**
* 固定定时间等待策略
*/
private static final class FixedWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
long triggerInterval = Long.parseLong(context.triggerInterval);
return context.nextTriggerAt.plusSeconds(triggerInterval);
}
}
/**
* Cron等待策略
*/
private static final class CronWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
Date nextValidTime;
try {
ZonedDateTime zdt = context.nextTriggerAt.atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));
} catch (ParseException e) {
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);
}
return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8));
}
}
}

View File

@ -1,36 +1,16 @@
package com.aizuda.easy.retry.server.job.task.support.timer;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
* @date 2023-10-20 23:09:13

View File

@ -14,10 +14,10 @@ import com.aizuda.easy.retry.server.common.generator.id.IdGenerator;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext.TaskInfo;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
@ -134,9 +134,9 @@ public abstract class AbstractGenerator implements TaskGenerator {
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(now);
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setTriggerCount(1);
waitStrategyContext.setDelayLevel(1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
retryTask.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext));
waitInsertTasks.add(retryTask);
// 初始化日志
@ -185,7 +185,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
sceneConfig.setGroupName(groupName);
sceneConfig.setSceneName(sceneName);
sceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getBackOff());
sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType());
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig), () -> new EasyRetryServerException("init scene error"));

View File

@ -15,7 +15,7 @@ import com.aizuda.easy.retry.server.retry.task.service.RetryDeadLetterConverter;
import com.aizuda.easy.retry.server.retry.task.service.RetryService;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.access.ConfigAccess;
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
@ -103,7 +103,7 @@ public class RetryServiceImpl implements RetryService {
}
retryTask.setNextTriggerAt(
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null));
Assert.isTrue(1 == retryTaskAccess.insert(retryTaskDTO.getGroupName(), retryTask),
() -> new EasyRetryServerException("failed to report data"));
@ -130,7 +130,7 @@ public class RetryServiceImpl implements RetryService {
sceneConfig.setGroupName(retryTaskDTO.getGroupName());
sceneConfig.setSceneName(retryTaskDTO.getSceneName());
sceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getBackOff());
sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType());
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景");

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.retry.task.support;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.context;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import lombok.Data;

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.context;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import lombok.Data;

View File

@ -6,7 +6,6 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;

View File

@ -2,11 +2,10 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
import io.netty.util.TimerTask;
@ -59,13 +58,13 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask) {
long triggerInterval = systemProperties.getCallback().getTriggerInterval();
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getBackOff());
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval));
// 更新触发时间, 任务进入时间轮
return waitStrategy.computeRetryTime(waitStrategyContext);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
@Override

View File

@ -2,11 +2,12 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtil;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
@ -16,6 +17,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -69,12 +71,12 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
nextTriggerAt = now;
}
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
waitStrategyContext.setNextTriggerAt(DateUtil.toEpochMilli(nextTriggerAt));
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setTriggerCount(partitionTask.getRetryCount() + 1);
waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1);
// 更新触发时间, 任务进入时间轮
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
return waitStrategy.computeRetryTime(waitStrategyContext);
return DateUtil.toEpochMilli(waitStrategy.computeTriggerTime(waitStrategyContext));
}
@Override

View File

@ -4,13 +4,13 @@ import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import org.springframework.stereotype.Component;
@ -62,7 +62,7 @@ public class CallbackTaskExecutor extends AbstractTaskExecutor {
private WaitStrategy getWaitWaitStrategy() {
// 回调失败每15min重试一次
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getType());
}
@Override

View File

@ -7,13 +7,13 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import org.springframework.stereotype.Component;
@ -71,7 +71,7 @@ public class ManualCallbackTaskExecutor extends AbstractTaskExecutor {
private WaitStrategy getWaitWaitStrategy() {
// 回调失败每15min重试一次
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getType());
}
@Override

View File

@ -8,13 +8,13 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import org.springframework.stereotype.Component;

View File

@ -5,13 +5,13 @@ import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import org.springframework.stereotype.Component;

View File

@ -6,9 +6,10 @@ import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtil;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
@ -19,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
@ -50,7 +52,7 @@ public class CallbackRetryTaskHandler {
@Transactional
public void create(RetryTask retryTask) {
if (!TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) {
return;
return;
}
RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask);
@ -62,10 +64,11 @@ public class CallbackRetryTaskHandler {
callbackRetryTask.setCreateDt(LocalDateTime.now());
callbackRetryTask.setUpdateDt(LocalDateTime.now());
callbackRetryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
Long triggerTime = WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null);
callbackRetryTask.setNextTriggerAt(DateUtil.toLocalDateTime(Instant.ofEpochMilli(triggerTime)));
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),
() -> new EasyRetryServerException("failed to report data"));
// 初始化回调日志
@ -73,8 +76,8 @@ public class CallbackRetryTaskHandler {
// 记录重试日志
retryTaskLog.setTaskType(TaskTypeEnum.CALLBACK.getType());
retryTaskLog.setCreateDt(LocalDateTime.now());
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
() -> new EasyRetryServerException("新增重试日志失败"));
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
() -> new EasyRetryServerException("新增重试日志失败"));
}
@ -87,7 +90,7 @@ public class CallbackRetryTaskHandler {
public String generatorCallbackUniqueId(String uniqueId) {
// eg: CB_202307180949471
FormattingTuple callbackUniqueId = MessageFormatter.arrayFormat(CALLBACK_UNIQUE_ID_RULE,
new Object[]{systemProperties.getCallback().getPrefix(), uniqueId});
new Object[]{systemProperties.getCallback().getPrefix(), uniqueId});
return callbackUniqueId.getMessage();
}

View File

@ -4,7 +4,7 @@ import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.StopStrategy;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import org.springframework.util.CollectionUtils;
import java.util.*;

View File

@ -6,7 +6,7 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.StopStrategy;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.List;

View File

@ -4,13 +4,12 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.CronUtils;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.JobQueryVO;
import com.aizuda.easy.retry.server.web.model.request.JobRequestVO;
@ -27,14 +26,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.ParseException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -91,28 +84,14 @@ public class JobServiceImpl implements JobService {
@Override
public List<String> getTimeByCron(String cron) {
List<String> list = new ArrayList<>();
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < 5; i++) {
Date nextValidTime;
try {
ZonedDateTime zdt = now.atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(cron).getNextValidTimeAfter(Date.from(zdt.toInstant()));
now = LocalDateTime.ofEpochSecond(nextValidTime.getTime() / 1000, 0, ZoneOffset.ofHours(8));
list.add(dateTimeFormatter.format(now));
} catch (ParseException ignored) {
}
}
return list;
return CronUtils.getExecuteTimeByCron(cron, 5);
}
@Override
public List<JobResponseVO> getJobNameList(String keywords, Long jobId) {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>()
.select(Job::getId, Job::getJobName);
.select(Job::getId, Job::getJobName);
if (StrUtil.isNotBlank(keywords)) {
queryWrapper.like(Job::getJobName, keywords.trim() + "%");
}
@ -132,8 +111,8 @@ public class JobServiceImpl implements JobService {
// 判断常驻任务
Job job = updateJobResident(jobRequestVO);
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName())
% systemProperties.getBucketTotal());
calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
% systemProperties.getBucketTotal());
job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()));
return 1 == jobMapper.insert(job);
}
@ -148,17 +127,17 @@ public class JobServiceImpl implements JobService {
Job updateJob = updateJobResident(jobRequestVO);
// 非常驻任务 > 非常驻任务
if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(),
StatusEnum.NO.getStatus())) {
StatusEnum.NO.getStatus())) {
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()));
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.NO.getStatus())) {
updateJob.getResident(), StatusEnum.NO.getStatus())) {
// 常驻任务的触发时间
LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId()))
.orElse(LocalDateTime.now());
.orElse(LocalDateTime.now());
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time));
// 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.YES.getStatus())) {
updateJob.getResident(), StatusEnum.YES.getStatus())) {
updateJob.setNextTriggerAt(LocalDateTime.now());
}
@ -166,29 +145,23 @@ public class JobServiceImpl implements JobService {
}
private static LocalDateTime calculateNextTriggerAt(final JobRequestVO jobRequestVO, LocalDateTime time) {
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType());
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(time);
return waitStrategy.computeRetryTime(waitStrategyContext);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
@Override
public Job updateJobResident(JobRequestVO jobRequestVO) {
Job job = JobConverter.INSTANCE.toJob(jobRequestVO);
job.setResident(StatusEnum.NO.getStatus());
if (jobRequestVO.getTriggerType() == WaitStrategyEnum.FIXED.getTriggerType()) {
if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) {
if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) {
job.setResident(StatusEnum.YES.getStatus());
}
} else if (jobRequestVO.getTriggerType() == WaitStrategyEnum.CRON.getTriggerType()) {
List<String> timeByCron = getTimeByCron(jobRequestVO.getTriggerInterval());
LocalDateTime first = LocalDateTime.parse(timeByCron.get(0), dateTimeFormatter);
LocalDateTime second = LocalDateTime.parse(timeByCron.get(1), dateTimeFormatter);
Duration duration = Duration.between(first, second);
long milliseconds = duration.toMillis();
if (milliseconds < 10 * 1000) {
} else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) {
job.setResident(StatusEnum.YES.getStatus());
}
} else {

View File

@ -6,7 +6,7 @@ import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.web.service.RetryDeadLetterService;
import com.aizuda.easy.retry.server.web.service.convert.RetryDeadLetterResponseVOConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
@ -107,7 +107,7 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter);
retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
retryTask.setTaskType(TaskTypeEnum.RETRY.getType());
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null));
retryTask.setCreateDt(LocalDateTime.now());
waitRollbackList.add(retryTask);
}

View File

@ -19,7 +19,7 @@ import com.aizuda.easy.retry.server.retry.task.generator.task.TaskGenerator;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.BatchDeleteRetryTaskVO;
import com.aizuda.easy.retry.server.web.model.request.GenerateRetryIdempotentIdVO;
@ -47,13 +47,10 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.HttpEntity;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
@ -158,7 +155,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
// 若恢复重试则需要重新计算下次触发时间
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
retryTask.setNextTriggerAt(
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null));
}
if (RetryStatusEnum.FINISH.getStatus().equals(retryStatusEnum.getStatus())) {

View File

@ -4,6 +4,8 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.CronUtils;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.SceneConfigQueryVO;
import com.aizuda.easy.retry.server.web.model.request.SceneConfigRequestVO;
@ -17,11 +19,13 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
/**
* @author: www.byteblogs.com
@ -64,6 +68,9 @@ public class SceneConfigServiceImpl implements SceneConfigService {
@Override
public Boolean saveSceneConfig(SceneConfigRequestVO requestVO) {
checkExecuteInterval(requestVO);
SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO);
sceneConfig.setCreateDt(LocalDateTime.now());
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
@ -72,10 +79,25 @@ public class SceneConfigServiceImpl implements SceneConfigService {
return Boolean.TRUE;
}
private static void checkExecuteInterval(SceneConfigRequestVO requestVO) {
if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(), WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getBackOff())) {
if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) {
throw new EasyRetryServerException("间隔时间不得小于10");
}
} else if (requestVO.getBackOff() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(requestVO.getTriggerInterval()) < 10 * 1000) {
throw new EasyRetryServerException("间隔时间不得小于10");
}
}
}
@Override
public Boolean updateSceneConfig(SceneConfigRequestVO requestVO) {
checkExecuteInterval(requestVO);
SceneConfig sceneConfig = SceneConfigConverter.INSTANCE.toSceneConfigRequestVO(requestVO);
sceneConfig.setTriggerInterval(Optional.ofNullable(sceneConfig.getTriggerInterval()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().update(sceneConfig,
new LambdaUpdateWrapper<SceneConfig>()
.eq(SceneConfig::getGroupName, sceneConfig.getGroupName())

View File

@ -66,7 +66,7 @@
</a-tag>
</a-descriptions-item>
<a-descriptions-item label="参数" span="4">
{{ jobInfo.argsStr }}
{{ jobInfo.taskType === 3 ? JSON.parse(jobInfo.argsStr).map((item, index) => `分区:${index}=>${item}`).join('; ') : jobInfo.argsStr }}
</a-descriptions-item>
<a-descriptions-item label="描述" span="4">
{{ jobInfo.extAttrs }}

View File

@ -357,7 +357,8 @@ export default {
visible: false,
count: 0,
triggerTypeValue: '2',
taskTypeValue: '1'
taskTypeValue: '1',
argsStrValue: []
}
},
beforeCreate () {
@ -432,35 +433,28 @@ export default {
const taskType = this.form.getFieldValue('taskType')
if (taskType === '3') {
this.visible = !this.visible
const { form } = this
if (this.formType === 'create') {
return
}
const argsStr = this.form.getFieldValue('argsStr')
form.setFieldsValue({
argsStr: ''
})
console.log(argsStr.includes('#=@'))
if (!argsStr.includes('#=@')) {
console.log(this.argsStrValue)
if (this.argsStrValue.length === 0) {
return
}
//
const keyValuePairs = argsStr.split('#;@')
console.log(keyValuePairs)
const restoredArray = keyValuePairs.map(pair => {
const [index, value] = pair.split('#=@')
console.log(value)
const keys = this.argsStrValue.map((item, index) => {
this.count++
return Number.parseInt(index)
this.dynamicForm.getFieldDecorator(`sharding[${index}]`, { initialValue: item, preserve: true })
return index
})
this.dynamicForm.getFieldDecorator('keys', { initialValue: restoredArray, preserve: true })
keyValuePairs.map(pair => {
const [index, value] = pair.split('#=@')
this.dynamicForm.getFieldDecorator(`sharding[${index}]`, { initialValue: value, preserve: true })
return value
})
this.dynamicForm.getFieldDecorator('keys', { initialValue: keys, preserve: true })
}
},
getCron (cron) {
@ -472,13 +466,10 @@ export default {
const { form } = this
e.preventDefault()
this.dynamicForm.validateFields((err, values) => {
console.log()
if (!err) {
console.log(values)
const arr = values['sharding']
const formattedString = arr.map((item, index) => `${index}#=@${item}`).join('#;@')
this.argsStrValue = values['sharding']
form.setFieldsValue({
argsStr: formattedString
argsStr: this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join('; ')
})
this.visible = false
}
@ -491,6 +482,10 @@ export default {
e.preventDefault()
this.form.validateFields((err, values) => {
if (!err) {
if (this.taskTypeValue === '3') {
values['argsStr'] = this.argsStrValue
}
if (this.formType === 'create') {
saveJob(values).then(res => {
this.$message.success('任务新增完成')
@ -523,6 +518,8 @@ export default {
formData.blockStrategy = formData.blockStrategy.toString()
formData.triggerType = formData.triggerType.toString()
this.triggerTypeValue = formData.triggerType
this.argsStrValue = JSON.parse(formData.argsStr)
formData.argsStr = this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join(';')
form.setFieldsValue(formData)
})
},

View File

@ -64,7 +64,7 @@
</a-tag>
</span>
<span slot="triggerInterval" slot-scope="text">
{{ text ? text : '' }}
{{ text ? text : '10s,15s,30s,35s,40s,50s,1m,2m,4m,6m,8m,10m,20m,40m,1h,2h,3h,4h,5h,6h,7h,8h,9h,10h,11h,12h' }}
</span>
<span slot="action" slot-scope="record">
<template>
@ -125,6 +125,7 @@ export default {
title: '间隔时间',
dataIndex: 'triggerInterval',
key: 'triggerInterval',
ellipsis: true,
width: '15%',
scopedSlots: { customRender: 'triggerInterval' }
},

View File

@ -83,7 +83,7 @@
v-if="backOff === '2' || backOff === '4'"
style="width: -webkit-fill-available"
placeholder="请输入间隔时长(秒)"
:min="1"
:min="10"
v-decorator="[
'triggerInterval',
{initialValue: '60',
@ -273,13 +273,13 @@ export default {
if (!err) {
if (this.formType === 'create') {
saveScene(values).then(res => {
this.$message.success('任务新增完成')
this.$message.success('场景新增完成')
this.form.resetFields()
this.$router.go(-1)
})
} else {
updateScene(values).then(res => {
this.$message.success('任务更新完成')
this.$message.success('场景更新完成')
this.form.resetFields()
this.$router.go(-1)
})