diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java index 5c42014c..43d18c4e 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java @@ -8,6 +8,7 @@ import com.google.common.collect.Tables; import com.google.common.util.concurrent.ListenableFuture; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** @@ -31,6 +32,10 @@ public class FutureCache { public static void remove(Long taskBatchId) { Map> futureMap = futureCache.row(taskBatchId); + if (Objects.isNull(futureMap)) { + return; + } + futureMap.forEach((taskId, future) -> { future.cancel(true); futureCache.remove(taskBatchId, taskId); diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java index fa539110..ac847028 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.client.job.core.cache; import org.springframework.stereotype.Component; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -40,6 +41,10 @@ public class ThreadPoolCache { public static void stopThreadPool(Long taskBatchId) { FutureCache.remove(taskBatchId); ThreadPoolExecutor threadPoolExecutor = CACHE_THREAD_POOL.get(taskBatchId); + if (Objects.isNull(threadPoolExecutor)) { + return; + } + threadPoolExecutor.shutdownNow(); } diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java index dd5f650d..cd202b35 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java @@ -39,7 +39,7 @@ public class JobEndPoint { JobContext jobContext = buildJobContext(dispatchJob); JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo()); if (Objects.isNull(jobExecutorInfo)) { - return new Result<>(Boolean.FALSE); + return new Result<>("执行器配置有误", Boolean.FALSE); } // 选择执行器 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index ef580a45..60ff70f5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -5,6 +5,7 @@ import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.common.dto.ScanTask; @@ -71,7 +72,7 @@ public class ScanJobTaskActor extends AbstractActor { WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); + waitStrategyContext.setNextTriggerAt(jobTaskPrepare.getNextTriggerAt()); Job job = new Job(); job.setId(partitionTask.getId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java index da17c49a..125f62ff 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java @@ -67,6 +67,7 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); + jobTasks.add(jobTask); }); return jobTasks; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java index 83f48e87..4be25dba 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java @@ -142,7 +142,7 @@ public class WaitStrategies { throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); } - return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8)); + return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8)); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java index 6eb2e54a..b6da4dc6 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheelHandler.java @@ -5,6 +5,8 @@ import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -23,19 +25,18 @@ import java.util.concurrent.TimeUnit; @Slf4j public class JobTimerWheelHandler implements Lifecycle { + private static final int TICK_DURATION = 100; + private static final String THREAD_NAME_REFIX = "job-task-timer-wheel-"; + private static HashedWheelTimer timer = null; private static Cache cache; @Override public void start() { - - // TODO 支持可配置 - // tickDuration 和 timeUnit 一格的时间长度 - // ticksPerWheel 一圈有多少格 timer = new HashedWheelTimer( - new CustomizableThreadFactory("job-task-timer-wheel-"), 1000, - TimeUnit.MILLISECONDS, 1024); + new CustomizableThreadFactory(THREAD_NAME_REFIX), TICK_DURATION, + TimeUnit.MILLISECONDS); timer.start(); @@ -51,13 +52,6 @@ public class JobTimerWheelHandler implements Lifecycle { delay = 0; } - // TODO 支持可配置 - if (delay > 60 * 1000) { - LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]", - groupName, taskBatchId, delay); - return; - } - Timeout timeout = getTimeout(groupName, taskBatchId); if (Objects.isNull(timeout)) { try {