From 60d52e8524fbb0fcad85ef08239774b32570874c Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sun, 22 Oct 2023 21:45:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E6=94=AF=E6=8C=8110=E7=A7=92?= =?UTF-8?q?=E4=BB=A5=E5=86=85=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_mysql.sql | 1 + .../client/job/core/cache/FutureCache.java | 26 ++--- .../job/core/cache/ThreadPoolCache.java | 2 +- .../client/job/core/client/JobEndPoint.java | 4 +- .../core/executor/AbstractJobExecutor.java | 2 +- .../executor/JobExecutorFutureCallback.java | 12 +++ .../easy/retry/client/model/StopJobDTO.java | 4 +- .../datasource/persistence/po/Job.java | 5 + .../server/job/task/dto/JobPartitionTask.java | 4 + .../job/task/support/JobTaskConverter.java | 1 + .../task/support/cache/ResidentTaskCache.java | 42 ++++++++ .../dispatch/JobExecutorResultActor.java | 17 +++- .../support/dispatch/ScanJobTaskActor.java | 99 +++++++++++++------ .../support/handler/JobTaskBatchHandler.java | 4 + .../stop/AbstractJobTaskStopHandler.java | 13 ++- .../support/stop/ClusterTaskStopHandler.java | 3 +- .../task/support/stop/RealStopTaskActor.java | 2 +- .../task/support/stop/TaskStopJobContext.java | 14 ++- .../job/task/support/timer/JobTimerTask.java | 55 +++++++++-- .../support/timer/ResidentJobTimerTask.java | 59 +++++++++++ .../starter/dispatch/DispatchService.java | 3 +- frontend/src/views/job/JobList.vue | 14 +-- frontend/src/views/job/from/JobFrom.vue | 8 +- 23 files changed, 301 insertions(+), 93 deletions(-) create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 416a6b05..ca8cdd59 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -235,6 +235,7 @@ CREATE TABLE `job` ( `parallel_num` int(11) NOT NULL DEFAULT '1' COMMENT '并行数', `retry_interval` int(11) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)', `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket', + `resident` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否是常驻任务', `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java index 43d18c4e..dbacda9b 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/FutureCache.java @@ -1,14 +1,14 @@ package com.aizuda.easy.retry.client.job.core.cache; -import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo; import com.aizuda.easy.retry.client.model.ExecuteResult; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; -import com.google.common.collect.Tables; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Iterator; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** @@ -18,28 +18,18 @@ import java.util.concurrent.ConcurrentHashMap; */ public class FutureCache { - private static final Table> futureCache = HashBasedTable.create(); + private static final ConcurrentHashMap> futureCache = new ConcurrentHashMap<>(); - public static void addFuture(Long taskBatchId, Long taskId, ListenableFuture future) { - futureCache.put(taskBatchId, taskId, future); - } - - public static void remove(Long taskBatchId, Long taskId) { - ListenableFuture future = futureCache.get(taskBatchId, taskId); - future.cancel(true); - futureCache.remove(taskBatchId, taskId); + public static void addFuture(Long taskBatchId, ListenableFuture future) { + futureCache.put(taskBatchId, future); } public static void remove(Long taskBatchId) { - Map> futureMap = futureCache.row(taskBatchId); - if (Objects.isNull(futureMap)) { - return; - } - - futureMap.forEach((taskId, future) -> { + Optional.ofNullable(futureCache.get(taskBatchId)).ifPresent(future -> { future.cancel(true); - futureCache.remove(taskBatchId, taskId); + futureCache.remove(taskBatchId); }); + } } diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java index ac847028..6828a9ae 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java @@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** - * TODO 任务执行完成了,该如何优雅的终止线程池????? * * @author: www.byteblogs.com * @date : 2023-09-27 17:12 @@ -46,6 +45,7 @@ public class ThreadPoolCache { } threadPoolExecutor.shutdownNow(); + CACHE_THREAD_POOL.remove(taskBatchId); } } diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java index cd202b35..b8bfd5be 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java @@ -72,12 +72,12 @@ public class JobEndPoint { @PostMapping("/stop/v1") public Result stopJob(@RequestBody @Validated StopJobDTO interruptJob) { - ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskId()); + ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId()); if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) { return new Result<>(Boolean.TRUE); } - ThreadPoolCache.stopThreadPool(interruptJob.getTaskId()); + ThreadPoolCache.stopThreadPool(interruptJob.getTaskBatchId()); return new Result<>(threadPool.isShutdown() || threadPool.isTerminated()); } } diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java index efb58fd5..f666afc3 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java @@ -50,7 +50,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { return doJobExecute(jobArgs); }); - FutureCache.addFuture(jobContext.getTaskBatchId(), jobContext.getTaskId(), submit); + FutureCache.addFuture(jobContext.getTaskBatchId(), submit); Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator); } diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java index c04fafa4..913f4056 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java @@ -1,12 +1,14 @@ package com.aizuda.easy.retry.client.job.core.executor; import com.aizuda.easy.retry.client.common.proxy.RequestBuilder; +import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache; import com.aizuda.easy.retry.client.job.core.client.JobNettyClient; import com.aizuda.easy.retry.client.job.core.dto.JobContext; import com.aizuda.easy.retry.client.model.ExecuteResult; import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.model.NettyResult; import com.aizuda.easy.retry.common.core.util.JsonUtil; @@ -47,6 +49,14 @@ public class JobExecutorFutureCallback implements FutureCallback CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus)); } catch (Exception e) { log.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); + } finally { + stopThreadPool(); + } + } + + private void stopThreadPool() { + if (jobContext.getTaskType() == TaskTypeEnum.CLUSTER.getType()) { + ThreadPoolCache.stopThreadPool(jobContext.getTaskBatchId()); } } @@ -60,6 +70,8 @@ public class JobExecutorFutureCallback implements FutureCallback ); } catch (Exception e) { log.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); + } finally { + stopThreadPool(); } } diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/StopJobDTO.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/StopJobDTO.java index ce7d0213..b511883b 100644 --- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/StopJobDTO.java +++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/StopJobDTO.java @@ -15,8 +15,8 @@ public class StopJobDTO { @NotNull(message = "jobId 不能为空") private Long jobId; - @NotNull(message = "taskId 不能为空") - private Long taskId; + @NotNull(message = "taskBatchId 不能为空") + private Long taskBatchId; @NotBlank(message = "group 不能为空") private String groupName; diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java index a515470a..db97aa23 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java @@ -124,6 +124,11 @@ public class Job implements Serializable { */ private Integer bucketIndex; + /** + * 是否是常驻任务 + */ + private Integer resident; + /** * 描述 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java index a4796b06..8450464d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java @@ -54,4 +54,8 @@ public class JobPartitionTask extends PartitionTask { */ private Integer taskType; + /** + * 是否是常驻任务 + */ + private Integer resident; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java index dabcedaf..24601856 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java @@ -49,6 +49,7 @@ public interface JobTaskConverter { BlockStrategies.BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO); TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context); + TaskStopJobContext toStopJobContext(JobExecutorResultDTO context); JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java new file mode 100644 index 00000000..a17ba0b7 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/ResidentTaskCache.java @@ -0,0 +1,42 @@ +package com.aizuda.easy.retry.server.job.task.support.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.time.LocalDateTime; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** + * @author www.byteblogs.com + * @date 2023-10-21 23:35:42 + * @since 2.4.0 + */ +public class ResidentTaskCache { + + private static final Cache cache; + + static { + cache = CacheBuilder.newBuilder() + .concurrencyLevel(8) // 并发级别 + .expireAfterWrite(10, TimeUnit.SECONDS) // 写入后的过期时间 + .build(); + } + + public static void refresh(Long jobId, LocalDateTime nextTriggerTime) { + cache.put(jobId, nextTriggerTime); + } + + public static LocalDateTime getOrDefault(Long jobId, LocalDateTime nextTriggerTime) { + return Optional.ofNullable(cache.getIfPresent(jobId)).orElse(nextTriggerTime); + } + + public static LocalDateTime get(Long jobId) { + return getOrDefault(jobId, null); + } + + public static boolean isResident(Long jobId) { + return cache.asMap().containsKey(jobId); + } + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java index aa68df55..98c0e7c4 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -3,13 +3,17 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler; +import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; @@ -60,7 +64,18 @@ public class JobExecutorResultActor extends AbstractActor { ()-> new EasyRetryServerException("更新任务实例失败")); // 更新批次上的状态 - jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReasonEnum()); + boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReasonEnum()); + if (complete) { + // 尝试停止任务 + // 若是集群任务则客户端会主动关闭 + if (result.getTaskType() != TaskTypeEnum.CLUSTER.getType()) { + JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType()); + TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result); + stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE); + stopJobContext.setForceStop(Boolean.TRUE); + instanceInterrupt.stop(stopJobContext); + } + } } }); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index 60ff70f5..10120e69 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -5,9 +5,9 @@ import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; @@ -15,6 +15,9 @@ import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask; +import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -27,6 +30,7 @@ import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.List; +import java.util.Objects; import static com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.*; @@ -62,46 +66,77 @@ public class ScanJobTaskActor extends AbstractActor { private void doScan(final ScanTask scanTask) { log.info("job scan start"); - long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), partitionTasks -> { - for (final JobPartitionTask partitionTask : (List) partitionTasks) { - CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName()); - JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask); - - // 更新下次触发时间 - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType()); - WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); - waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); - waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(jobTaskPrepare.getNextTriggerAt()); - - Job job = new Job(); - job.setId(partitionTask.getId()); - job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); - Assert.isTrue(1 == jobMapper.updateById(job), - () -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId())); - - // 执行预处理阶段 - ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); - actorRef.tell(jobTaskPrepare, actorRef); - } - }, scanTask.getStartId()); + long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), this::processJobPartitionTasks, scanTask.getStartId()); log.info("job scan end. total:[{}]", total); } + private void processJobPartitionTasks(List partitionTasks) { + for (PartitionTask partitionTask : partitionTasks) { + processJob((JobPartitionTask) partitionTask); + } + } + + private void processJob(JobPartitionTask partitionTask) { + CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName()); + JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask); + + Job job = new Job(); + job.setId(partitionTask.getId()); + + boolean triggerTask = true; + LocalDateTime nextTriggerAt = ResidentTaskCache.get(partitionTask.getId()); + if (needCalculateNextTriggerTime(partitionTask, nextTriggerAt)) { + // 更新下次触发时间 + nextTriggerAt = calculateNextTriggerTime(partitionTask); + } else { + triggerTask = false; + } + + job.setNextTriggerAt(nextTriggerAt); + Assert.isTrue(1 == jobMapper.updateById(job), + () -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId())); + + if (triggerTask) { + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + actorRef.tell(jobTaskPrepare, actorRef); + } + } + + /** + * 需要重新计算触发时间的条件 + * 1、不是常驻任务 + * 2、常驻任务缓存的触发任务为空 + * 3、常驻任务中的触发时间不是最新的 + */ + private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) { + return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()) + || Objects.isNull(nextTriggerAt) || partitionTask.getNextTriggerAt().isAfter(nextTriggerAt); + } + + private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) { + // 更新下次触发时间 + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType()); + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); + waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt()); + + return waitStrategy.computeRetryTime(waitStrategyContext); + } + private List listAvailableJobs(Long startId, ScanTask scanTask) { List jobs = jobMapper.selectPage(new PageDTO(0, scanTask.getSize()), - new LambdaQueryWrapper() - .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) - .in(Job::getBucketIndex, scanTask.getBuckets()) - .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) - .eq(Job::getDeleted, StatusEnum.NO.getStatus()) - .ge(Job::getId, startId) + new LambdaQueryWrapper() + .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) + .in(Job::getBucketIndex, scanTask.getBuckets()) + .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) + .eq(Job::getDeleted, StatusEnum.NO.getStatus()) + .ge(Job::getId, startId) ).getRecords(); return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs); } - - } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java index 2f592ce1..7471bf9c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java @@ -3,6 +3,10 @@ package com.aizuda.easy.retry.server.job.task.support.handler; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/AbstractJobTaskStopHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/AbstractJobTaskStopHandler.java index 13808967..4146ace2 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/AbstractJobTaskStopHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/AbstractJobTaskStopHandler.java @@ -28,11 +28,14 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler, @Override public void stop(TaskStopJobContext context) { - List jobTasks = jobTaskMapper.selectList( - new LambdaQueryWrapper() - .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - .in(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_COMPLETE) - ); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()); + + if (!context.isForceStop()) { + queryWrapper.in(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_COMPLETE); + } + + List jobTasks = jobTaskMapper.selectList(queryWrapper); if (CollectionUtils.isEmpty(jobTasks)) { return; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/ClusterTaskStopHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/ClusterTaskStopHandler.java index b579f6ff..d817bb73 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/ClusterTaskStopHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/ClusterTaskStopHandler.java @@ -28,7 +28,7 @@ public class ClusterTaskStopHandler extends AbstractJobTaskStopHandler { } @Override - public void doStop(TaskStopJobContext context) { + public void doStop(TaskStopJobContext context) { List jobTasks = context.getJobTasks(); RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context); @@ -40,5 +40,4 @@ public class ClusterTaskStopHandler extends AbstractJobTaskStopHandler { } - } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java index ff982505..57051fca 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java @@ -57,7 +57,7 @@ public class RealStopTaskActor extends AbstractActor { .build(); StopJobDTO stopJobDTO = new StopJobDTO(); - stopJobDTO.setTaskId(realStopTaskInstanceDTO.getTaskBatchId()); + stopJobDTO.setTaskBatchId(realStopTaskInstanceDTO.getTaskBatchId()); stopJobDTO.setJobId(realStopTaskInstanceDTO.getJobId()); stopJobDTO.setGroupName(realStopTaskInstanceDTO.getGroupName()); return rpcClient.stop(stopJobDTO); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/TaskStopJobContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/TaskStopJobContext.java index 973e9e0b..e8c3d974 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/TaskStopJobContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/TaskStopJobContext.java @@ -35,11 +35,6 @@ public class TaskStopJobContext { */ private Integer taskType; - /** - * 下次触发时间 - */ - private LocalDateTime nextTriggerAt; - /** * 是否需要变更任务状态 */ @@ -49,4 +44,13 @@ public class TaskStopJobContext { private JobOperationReasonEnum jobOperationReasonEnum; + private boolean forceStop; + + protected List getJobTasks() { + return jobTasks; + } + + protected void setJobTasks(List jobTasks) { + this.jobTasks = jobTasks; + } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java index 5b8e80b5..6dfac7da 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java @@ -8,8 +8,13 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; +import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; @@ -20,6 +25,7 @@ import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import java.time.Duration; import java.time.LocalDateTime; import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; @@ -37,8 +43,8 @@ public class JobTimerTask implements TimerTask { private JobTimerTaskDTO jobTimerTaskDTO; - private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); + private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>()); @Override public void run(final Timeout timeout) throws Exception { @@ -46,15 +52,15 @@ public class JobTimerTask implements TimerTask { log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); executor.execute(() -> { - + Job job = null; try { // 清除时间轮的缓存 JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class); - Job job = jobMapper.selectOne(new LambdaQueryWrapper() - .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) - .eq(Job::getId, jobTimerTaskDTO.getJobId()) + job = jobMapper.selectOne(new LambdaQueryWrapper() + .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) + .eq(Job::getId, jobTimerTaskDTO.getJobId()) ); int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); @@ -72,7 +78,7 @@ public class JobTimerTask implements TimerTask { jobTaskBatch.setTaskBatchStatus(taskStatus); jobTaskBatch.setOperationReason(operationReason); Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch), - () -> new EasyRetryServerException("更新任务失败")); + () -> new EasyRetryServerException("更新任务失败")); // 如果任务已经关闭则不需要执行 if (Objects.isNull(job)) { @@ -88,9 +94,42 @@ public class JobTimerTask implements TimerTask { } catch (Exception e) { log.error("任务调度执行失败", e); + } finally { + // 处理常驻任务 + doHandlerResidentTask(job); + } - }); + } + private void doHandlerResidentTask(Job job) { + if (Objects.nonNull(job)) { + // 是否是常驻任务 + if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) { + ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); + + LocalDateTime preTriggerAt = ResidentTaskCache.get(jobTimerTaskDTO.getJobId()); + if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) { + preTriggerAt = job.getNextTriggerAt(); + } + + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerType(job.getTriggerType()); + waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(preTriggerAt); + LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext); + + // 获取时间差的毫秒数 + Duration duration = Duration.between(preTriggerAt, nextTriggerAt); + long milliseconds = duration.toMillis(); + + log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000); + job.setNextTriggerAt(nextTriggerAt); + + JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - System.currentTimeMillis() % 1000, TimeUnit.MILLISECONDS); + ResidentTaskCache.refresh(jobTimerTaskDTO.getJobId(), nextTriggerAt); + } + } } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java new file mode 100644 index 00000000..6ef72e0e --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java @@ -0,0 +1,59 @@ +package com.aizuda.easy.retry.server.job.task.support.timer; + +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author www.byteblogs.com + * @date 2023-10-20 23:09:13 + * @since 2.4.0 + */ +@Slf4j +@AllArgsConstructor +public class ResidentJobTimerTask implements TimerTask { + + private JobTimerTaskDTO jobTimerTaskDTO; + private Job job; + + @Override + public void run(Timeout timeout) throws Exception { + try { + // 清除时间轮的缓存 + JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); + JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + actorRef.tell(jobTaskPrepare, actorRef); + } catch (Exception e) { + log.error("任务调度执行失败", e); + } + } +} diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java index 9708852b..d1fc0da0 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java @@ -52,13 +52,12 @@ public class DispatchService implements Lifecycle { try { // 当正在rebalance时延迟10s,尽量等待所有节点都完成rebalance - if ( DistributeInstance.RE_BALANCE_ING.get()) { + if (DistributeInstance.RE_BALANCE_ING.get()) { LogUtils.info(log, "正在rebalance中...."); TimeUnit.SECONDS.sleep(INITIAL_DELAY); } Set currentConsumerBuckets = getConsumerBucket(); -// LogUtils.info(log, "当前节点分配的桶:[{}]", currentConsumerBuckets); if (!CollectionUtils.isEmpty(currentConsumerBuckets)) { ConsumerBucket scanTaskDTO = new ConsumerBucket(); scanTaskDTO.setBuckets(currentConsumerBuckets); diff --git a/frontend/src/views/job/JobList.vue b/frontend/src/views/job/JobList.vue index c0d8c03f..c9c265cc 100644 --- a/frontend/src/views/job/JobList.vue +++ b/frontend/src/views/job/JobList.vue @@ -14,15 +14,11 @@ - - - - - - - - - + + + + + diff --git a/frontend/src/views/job/from/JobFrom.vue b/frontend/src/views/job/from/JobFrom.vue index 005f8aab..d622461b 100644 --- a/frontend/src/views/job/from/JobFrom.vue +++ b/frontend/src/views/job/from/JobFrom.vue @@ -311,7 +311,6 @@ import CronModal from '@/views/job/from/CronModal' const enums = require('@/utils/enum') -let id = 0 export default { name: 'JobFrom', components: { CronModal }, @@ -341,7 +340,8 @@ export default { executorType: enums.executorType, routeKey: enums.routeKey, loading: false, - visible: false + visible: false, + count: 0 } }, beforeCreate () { @@ -395,7 +395,7 @@ export default { // can use data-binding to get const keys = dynamicForm.getFieldValue('keys') console.log(keys) - const nextKeys = keys.concat(id++) + const nextKeys = keys.concat(this.count++) // can use data-binding to set // important! notify form to detect changes dynamicForm.setFieldsValue({ @@ -419,7 +419,7 @@ export default { const restoredArray = keyValuePairs.map(pair => { const [index, value] = pair.split('=') console.log(value) - id++ + this.count++ return Number.parseInt(index) })