feat: 2.4.0

1. 优化定时任务
This commit is contained in:
byteblogs168 2023-10-19 18:46:56 +08:00
parent f3a2addfc5
commit e1c44c5606
7 changed files with 22 additions and 16 deletions

View File

@ -8,6 +8,7 @@ import com.google.common.collect.Tables;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -31,6 +32,10 @@ public class FutureCache {
public static void remove(Long taskBatchId) { public static void remove(Long taskBatchId) {
Map<Long, ListenableFuture<ExecuteResult>> futureMap = futureCache.row(taskBatchId); Map<Long, ListenableFuture<ExecuteResult>> futureMap = futureCache.row(taskBatchId);
if (Objects.isNull(futureMap)) {
return;
}
futureMap.forEach((taskId, future) -> { futureMap.forEach((taskId, future) -> {
future.cancel(true); future.cancel(true);
futureCache.remove(taskBatchId, taskId); futureCache.remove(taskBatchId, taskId);

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.client.job.core.cache;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -40,6 +41,10 @@ public class ThreadPoolCache {
public static void stopThreadPool(Long taskBatchId) { public static void stopThreadPool(Long taskBatchId) {
FutureCache.remove(taskBatchId); FutureCache.remove(taskBatchId);
ThreadPoolExecutor threadPoolExecutor = CACHE_THREAD_POOL.get(taskBatchId); ThreadPoolExecutor threadPoolExecutor = CACHE_THREAD_POOL.get(taskBatchId);
if (Objects.isNull(threadPoolExecutor)) {
return;
}
threadPoolExecutor.shutdownNow(); threadPoolExecutor.shutdownNow();
} }

View File

@ -39,7 +39,7 @@ public class JobEndPoint {
JobContext jobContext = buildJobContext(dispatchJob); JobContext jobContext = buildJobContext(dispatchJob);
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo()); JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
if (Objects.isNull(jobExecutorInfo)) { if (Objects.isNull(jobExecutorInfo)) {
return new Result<>(Boolean.FALSE); return new Result<>("执行器配置有误", Boolean.FALSE);
} }
// 选择执行器 // 选择执行器

View File

@ -5,6 +5,7 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.dto.ScanTask;
@ -71,7 +72,7 @@ public class ScanJobTaskActor extends AbstractActor {
WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); waitStrategyContext.setTriggerType(partitionTask.getTriggerType());
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); waitStrategyContext.setNextTriggerAt(jobTaskPrepare.getNextTriggerAt());
Job job = new Job(); Job job = new Job();
job.setId(partitionTask.getId()); job.setId(partitionTask.getId());

View File

@ -67,6 +67,7 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}); });
return jobTasks; return jobTasks;

View File

@ -142,7 +142,7 @@ public class WaitStrategies {
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);
} }
return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8)); return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8));
} }
} }

View File

@ -5,6 +5,8 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.Lifecycle;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
@ -23,19 +25,18 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class JobTimerWheelHandler implements Lifecycle { public class JobTimerWheelHandler implements Lifecycle {
private static final int TICK_DURATION = 100;
private static final String THREAD_NAME_REFIX = "job-task-timer-wheel-";
private static HashedWheelTimer timer = null; private static HashedWheelTimer timer = null;
private static Cache<String, Timeout> cache; private static Cache<String, Timeout> cache;
@Override @Override
public void start() { public void start() {
// TODO 支持可配置
// tickDuration timeUnit 一格的时间长度
// ticksPerWheel 一圈有多少格
timer = new HashedWheelTimer( timer = new HashedWheelTimer(
new CustomizableThreadFactory("job-task-timer-wheel-"), 1000, new CustomizableThreadFactory(THREAD_NAME_REFIX), TICK_DURATION,
TimeUnit.MILLISECONDS, 1024); TimeUnit.MILLISECONDS);
timer.start(); timer.start();
@ -51,13 +52,6 @@ public class JobTimerWheelHandler implements Lifecycle {
delay = 0; delay = 0;
} }
// TODO 支持可配置
if (delay > 60 * 1000) {
LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]",
groupName, taskBatchId, delay);
return;
}
Timeout timeout = getTimeout(groupName, taskBatchId); Timeout timeout = getTimeout(groupName, taskBatchId);
if (Objects.isNull(timeout)) { if (Objects.isNull(timeout)) {
try { try {