From 409d7e42e66be727d35be711403087500e2cdeb3 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Mon, 25 Sep 2023 18:39:05 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.4.0=201.=20=E9=98=BB=E5=A1=9E?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../retry/server/job/task/BlockStrategy.java | 12 ++ .../server/job/task/scan/JobContext.java | 15 +++ .../job/task/scan/JobExecutorActor.java | 36 ++++++ .../server/job/task/scan/JobTimerTask.java | 17 +++ .../job/task/scan/JobTimerWheelHandler.java | 106 ++++++++++++++++++ .../job/task/scan/ScanJobTaskActor.java | 41 ++++++- .../job/task/strategy/BlockStrategies.java | 90 +++++++++++++++ 7 files changed, 314 insertions(+), 3 deletions(-) create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/BlockStrategy.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobContext.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerWheelHandler.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/BlockStrategy.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/BlockStrategy.java new file mode 100644 index 00000000..cce133c4 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/BlockStrategy.java @@ -0,0 +1,12 @@ +package com.aizuda.easy.retry.server.job.task; + +import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-25 17:53 + */ +public interface BlockStrategy { + + boolean block(BlockStrategyContext context); +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobContext.java new file mode 100644 index 00000000..8e32793a --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobContext.java @@ -0,0 +1,15 @@ +package com.aizuda.easy.retry.server.job.task.scan; + +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import lombok.Data; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-25 17:35 + */ +@Data +public class JobContext { + + private RegisterNodeInfo registerNodeInfo; + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java new file mode 100644 index 00000000..fa76d52a --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobExecutorActor.java @@ -0,0 +1,36 @@ +package com.aizuda.easy.retry.server.job.task.scan; + +import akka.actor.AbstractActor; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-25 17:41 + */ +@Component(ActorGenerator.SCAN_JOB_ACTOR) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class JobExecutorActor extends AbstractActor { + + @Override + public Receive createReceive() { + return receiveBuilder().match(JobContext.class, jobContext -> { + try { + doExecute(jobContext); + } catch (Exception e) { + LogUtils.error(log, "job executor exception. [{}]", jobContext, e); + } + }).build(); + } + + private void doExecute(final JobContext jobContext) { + + // 调度客户端 + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java new file mode 100644 index 00000000..1ff6cf8a --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerTask.java @@ -0,0 +1,17 @@ +package com.aizuda.easy.retry.server.job.task.scan; + +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-25 17:28 + */ +public class JobTimerTask implements TimerTask { + + @Override + public void run(final Timeout timeout) throws Exception { + // 执行任务调度 + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerWheelHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerWheelHandler.java new file mode 100644 index 00000000..f43d7229 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/JobTimerWheelHandler.java @@ -0,0 +1,106 @@ +package com.aizuda.easy.retry.server.job.task.scan; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.stereotype.Component; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-22 17:03 + */ +@Component +@Slf4j +public class JobTimerWheelHandler implements Lifecycle { + + private static HashedWheelTimer timer = null; + + private static Cache cache; + + @Override + public void start() { + + // TODO 支持可配置 + // tickDuration 和 timeUnit 一格的时间长度 + // ticksPerWheel 一圈有多少格 + timer = new HashedWheelTimer( + new CustomizableThreadFactory("jop-task-timer-wheel-"), 100, + TimeUnit.MILLISECONDS, 1024); + + timer.start(); + + cache = CacheBuilder.newBuilder() + // 设置并发级别为cpu核心数 + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .build(); + } + + public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) { + + if (delay < 0) { + delay = 0; + } + + // TODO 支持可配置 + if (delay > 60 * 1000) { + LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]", + groupName, uniqueId, delay); + return; + } + + Timeout timeout = getTimeout(groupName, uniqueId); + if (Objects.isNull(timeout)) { + try { + timeout = timer.newTimeout(task, delay, unit); + cache.put(getKey(groupName, uniqueId), timeout); + } catch (Exception e) { + LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]", + groupName, uniqueId, e); + } + } + + } + + private static String getKey(String groupName, String uniqueId) { + return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId); + } + + public static Timeout getTimeout(String groupName, String uniqueId) { + return cache.getIfPresent(getKey(groupName, uniqueId)); + } + + public static boolean isExisted(String groupName, String uniqueId) { + return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId))); + } + + public static boolean cancel(String groupName, String uniqueId) { + String key = getKey(groupName, uniqueId); + Timeout timeout = cache.getIfPresent(key); + if (Objects.isNull(timeout)) { + return false; + } + + cache.invalidate(key); + return timeout.cancel(); + } + + public static void clearCache(String groupName, String uniqueId) { + cache.invalidate(getKey(groupName, uniqueId)); + } + + @Override + public void close() { + timer.stop(); + cache.invalidateAll(); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java index 21f9d05f..561e8628 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/scan/ScanJobTaskActor.java @@ -7,11 +7,19 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.job.task.BlockStrategy; +import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies; +import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext; +import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -23,6 +31,7 @@ import java.time.LocalDateTime; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -40,6 +49,8 @@ public class ScanJobTaskActor extends AbstractActor { @Autowired private JobMapper jobMapper; @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired protected ClientNodeAllocateHandler clientNodeAllocateHandler; private static final AtomicLong lastId = new AtomicLong(0L); @@ -77,15 +88,39 @@ public class ScanJobTaskActor extends AbstractActor { lastId.set(jobs.get(jobs.size() - 1).getId()); for (Job job : jobs) { + JobContext jobContext = new JobContext(); // 选择客户端节点 - RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(job.getGroupName()); // TODO 校验一下客户端是否存活 + jobContext.setRegisterNodeInfo(clientNodeAllocateHandler.getServerNode(job.getGroupName())); + Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus())); + if (count <= 0) { + // 生成可执行任务 + JobTask jobTask = new JobTask(); + jobTask.setJobId(job.getId()); + jobTask.setGroupName(job.getGroupName()); + jobTaskMapper.insert(jobTask); + + // 更新下次触发时间 + // ToDo 根据CRON表达式计算 + job.setNextTriggerAt(LocalDateTime.now().plusSeconds(50)); + jobMapper.updateById(job); + } else { + BlockStrategyContext blockStrategyContext = new BlockStrategyContext(); + + BlockStrategy blockStrategy = BlockStrategyEnum.getBlockStrategy(job.getBlockStrategy()); + blockStrategy.block(blockStrategyContext); + } + + + + // 进入时间轮 + JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(), 1, TimeUnit.MILLISECONDS); // 校验是否存在已经在执行的任务了 // boolean isExist = true; // if (isExist) { // // 选择丢弃策略 -//// String blockStrategy = job.getBlockStrategy(); +// String blockStrategy = job.getBlockStrategy(); // // } } @@ -93,7 +128,7 @@ public class ScanJobTaskActor extends AbstractActor { } private List listAvailableJobs(Long lastId) { - return jobMapper.selectPage(new PageDTO(0, 100), + return jobMapper.selectPage(new PageDTO(0, 1000), new LambdaQueryWrapper() .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) // TODO 提前10秒把需要执行的任务拉取出来 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java new file mode 100644 index 00000000..61b6687b --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java @@ -0,0 +1,90 @@ +package com.aizuda.easy.retry.server.job.task.strategy; + +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.server.job.task.BlockStrategy; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-25 17:52 + */ +@Slf4j +public class BlockStrategies { + + @AllArgsConstructor + @Getter + public enum BlockStrategyEnum { + DISCARD(1, new DiscardBlockStrategy()), + OVERLAY(2, new OverlayBlockStrategy()), + CONCURRENCY(3, new ConcurrencyBlockStrategy()); + + private final int blockStrategy; + private final BlockStrategy strategy; + + public static BlockStrategy getBlockStrategy(int blockStrategy) { + for (final BlockStrategyEnum value : BlockStrategyEnum.values()) { + if (value.blockStrategy == blockStrategy) { + return value.getStrategy(); + } + } + + return null; + } + + + } + + @Data + public static class BlockStrategyContext { + + private Long jobId; + + private Job job; + } + + private static final class DiscardBlockStrategy implements BlockStrategy { + + @Override + public boolean block(final BlockStrategyContext context) { + log.warn("阻塞策略为丢弃此次执行. jobId:[{}]", context.getJobId()); + return false; + } + } + + private static final class OverlayBlockStrategy implements BlockStrategy { + + @Override + public boolean block(final BlockStrategyContext context) { + log.warn("阻塞策略为覆盖. jobId:[{}]", context.getJobId()); + // 向客户端发送中断执行指令 + + return true; + } + } + + private static final class ConcurrencyBlockStrategy implements BlockStrategy { + + @Override + public boolean block(final BlockStrategyContext context) { + log.warn("阻塞策略为并行执行. jobId:[{}]", context.getJobId()); + Job job = context.getJob(); + + JobTask jobTask = new JobTask(); + jobTask.setJobId(job.getId()); + jobTask.setGroupName(job.getGroupName()); + + JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class); + jobTaskMapper.insert(jobTask); + + return false; + } + } + + +}