feat: 2.4.0

1. 优化JOB下次触发时间不是最新的时间,导致计算的下次触发时间不准
2. 修复更新触发时间呢,为重新计算下次触发时间
3. 重试的扫描任务的拉取数据的次数使用动态计算耗时进行计算
This commit is contained in:
byteblogs168 2023-10-30 17:37:55 +08:00
parent fcf360bbdd
commit 171951f09a
4 changed files with 89 additions and 40 deletions

View File

@ -7,6 +7,9 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import java.util.function.Predicate;
/**
* @author: www.byteblogs.com
@ -18,14 +21,38 @@ public class PartitionTaskUtils {
private PartitionTaskUtils() {
}
public static long process(LongFunction<List<? extends PartitionTask>> dataSource,
Consumer<List<? extends PartitionTask>> task,
long startId) {
return process(dataSource, task, curStartId -> {}, CollectionUtils::isEmpty, startId);
}
public static long process(LongFunction<List<? extends PartitionTask>> dataSource,
Consumer<List<? extends PartitionTask>> task,
Predicate<List<? extends PartitionTask>> stopCondition,
long startId) {
return process(dataSource, task, curStartId -> {}, stopCondition, startId);
}
public static long process(LongFunction<List<? extends PartitionTask>> dataSource,
Consumer<List<? extends PartitionTask>> task,
LongConsumer stopAfterProcessor,
long startId) {
return process(dataSource, task, stopAfterProcessor, CollectionUtils::isEmpty, startId);
}
public static long process(
Function<Long, List<? extends PartitionTask>> dataSource, Consumer<List<? extends PartitionTask>> task,
long startId) {
LongFunction<List<? extends PartitionTask>> dataSource,
Consumer<List<? extends PartitionTask>> task,
LongConsumer stopAfterProcessor,
Predicate<List<? extends PartitionTask>> stopCondition,
long startId) {
int total = 0;
do {
List<? extends PartitionTask> products = dataSource.apply(startId);
if (CollectionUtils.isEmpty(products)) {
if (stopCondition.test(products)) {
// 没有查询到数据直接退出
stopAfterProcessor.accept(startId);
break;
}

View File

@ -122,7 +122,12 @@ public class WaitStrategies {
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
long triggerInterval = Long.parseLong(context.triggerInterval);
return context.getNextTriggerAt().plusSeconds(triggerInterval);
LocalDateTime nextTriggerAt = context.getNextTriggerAt();
if (nextTriggerAt.isBefore(LocalDateTime.now())) {
nextTriggerAt = LocalDateTime.now();
}
return nextTriggerAt.plusSeconds(triggerInterval);
}
}
@ -134,9 +139,14 @@ public class WaitStrategies {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
LocalDateTime nextTriggerAt = context.getNextTriggerAt();
if (nextTriggerAt.isBefore(LocalDateTime.now())) {
nextTriggerAt = LocalDateTime.now();
}
Date nextValidTime;
try {
ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8));
ZonedDateTime zdt = nextTriggerAt.atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));
} catch (ParseException e) {
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);

View File

@ -32,6 +32,9 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* 数据扫描模板类
@ -55,8 +58,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
@Autowired
protected List<TaskExecutor> taskExecutors;
private static long preCostTime = 0L;
private static long loopCount = 1L;
private static final AtomicLong preCostTime = new AtomicLong(0L);
private static final AtomicLong pullCount = new AtomicLong(1L);
@Override
public Receive createReceive() {
@ -74,8 +77,9 @@ public abstract class AbstractScanGroup extends AbstractActor {
// 获取结束时间
long endTime = System.nanoTime();
preCostTime = (endTime - startTime) / 1_000_000;
log.info("重试任务调度耗时:[{}]", preCostTime);
preCostTime.set((endTime - startTime) / 1_000_000);
log.info("重试任务调度耗时:[{}]", preCostTime.get());
}).build();
@ -84,42 +88,44 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected void doScan(final ScanTask scanTask) {
// 计算循环拉取的次数
if (preCostTime > 0) {
loopCount = (10 * 1000) / preCostTime;
loopCount = loopCount == 0 ? 1 : loopCount;
if (preCostTime.get() > 0) {
long loopCount = Math.max((10 * 1000) / preCostTime.get(), 1);
// TODO 最大拉取次数支持可配置
loopCount = Math.min(loopCount, 10);
pullCount.set(loopCount);
}
String groupName = scanTask.getGroupName();
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]",
groupName, lastId, pullCount.get(), preCostTime.get());
AtomicInteger count = new AtomicInteger(0);
PartitionTaskUtils.process(startId -> {
int i = count.getAndIncrement();
if (i > loopCount) {
// 为空则中断处理
return Lists.newArrayList();
PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> {
if (CollectionUtils.isEmpty(partitionTasks)) {
putLastId(scanTask.getGroupName(), 0L);
return Boolean.TRUE;
}
return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType());
},
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId);
// 超过最大的拉取次数则中断
if (count.getAndIncrement() > pullCount.get()) {
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
return Boolean.TRUE;
}
return false;
}, lastId);
}
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
StopWatch watch = new StopWatch();
watch.start();
if (!CollectionUtils.isEmpty(partitionTasks)) {
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
for (PartitionTask partitionTask : partitionTasks) {
processRetryTask((RetryPartitionTask) partitionTask);
}
} else {
putLastId(scanTask.getGroupName(), 0L);
for (PartitionTask partitionTask : partitionTasks) {
processRetryTask((RetryPartitionTask) partitionTask);
}
watch.getTotalTimeMillis();
}
private void processRetryTask(RetryPartitionTask partitionTask) {
@ -131,7 +137,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis();
RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), delay,
RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask),
delay,
TimeUnit.MILLISECONDS);
}
@ -151,7 +158,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType)
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId)
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10))
.gt(RetryTask::getId, lastId)
.orderByAsc(RetryTask::getId))
.getRecords();

View File

@ -125,16 +125,10 @@ public class JobServiceImpl implements JobService {
@Override
public boolean saveJob(JobRequestVO jobRequestVO) {
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType());
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(LocalDateTime.now());
// 判断常驻任务
Job job = updateJobResident(jobRequestVO);
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal());
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
calculateNextTriggerAt(jobRequestVO, job);
return 1 == jobMapper.insert(job);
}
@ -145,9 +139,19 @@ public class JobServiceImpl implements JobService {
// 判断常驻任务
Job job = updateJobResident(jobRequestVO);
calculateNextTriggerAt(jobRequestVO, job);
return 1 == jobMapper.updateById(job);
}
private static void calculateNextTriggerAt(final JobRequestVO jobRequestVO, final Job job) {
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType());
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(LocalDateTime.now());
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
}
@Override
public Job updateJobResident(JobRequestVO jobRequestVO) {
Job job = JobConverter.INSTANCE.toJob(jobRequestVO);