feat: 2.4.0

1. 阻塞条件
This commit is contained in:
byteblogs168 2023-09-25 18:39:05 +08:00
parent 304118216b
commit 409d7e42e6
7 changed files with 314 additions and 3 deletions

View File

@ -0,0 +1,12 @@
package com.aizuda.easy.retry.server.job.task;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:53
*/
public interface BlockStrategy {
boolean block(BlockStrategyContext context);
}

View File

@ -0,0 +1,15 @@
package com.aizuda.easy.retry.server.job.task.scan;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import lombok.Data;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:35
*/
@Data
public class JobContext {
private RegisterNodeInfo registerNodeInfo;
}

View File

@ -0,0 +1,36 @@
package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:41
*/
@Component(ActorGenerator.SCAN_JOB_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobExecutorActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(JobContext.class, jobContext -> {
try {
doExecute(jobContext);
} catch (Exception e) {
LogUtils.error(log, "job executor exception. [{}]", jobContext, e);
}
}).build();
}
private void doExecute(final JobContext jobContext) {
// 调度客户端
}
}

View File

@ -0,0 +1,17 @@
package com.aizuda.easy.retry.server.job.task.scan;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:28
*/
public class JobTimerTask implements TimerTask {
@Override
public void run(final Timeout timeout) throws Exception {
// 执行任务调度
}
}

View File

@ -0,0 +1,106 @@
package com.aizuda.easy.retry.server.job.task.scan;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @author: www.byteblogs.com
* @date : 2023-09-22 17:03
*/
@Component
@Slf4j
public class JobTimerWheelHandler implements Lifecycle {
private static HashedWheelTimer timer = null;
private static Cache<String, Timeout> cache;
@Override
public void start() {
// TODO 支持可配置
// tickDuration timeUnit 一格的时间长度
// ticksPerWheel 一圈有多少格
timer = new HashedWheelTimer(
new CustomizableThreadFactory("jop-task-timer-wheel-"), 100,
TimeUnit.MILLISECONDS, 1024);
timer.start();
cache = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
if (delay < 0) {
delay = 0;
}
// TODO 支持可配置
if (delay > 60 * 1000) {
LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]",
groupName, uniqueId, delay);
return;
}
Timeout timeout = getTimeout(groupName, uniqueId);
if (Objects.isNull(timeout)) {
try {
timeout = timer.newTimeout(task, delay, unit);
cache.put(getKey(groupName, uniqueId), timeout);
} catch (Exception e) {
LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]",
groupName, uniqueId, e);
}
}
}
private static String getKey(String groupName, String uniqueId) {
return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId);
}
public static Timeout getTimeout(String groupName, String uniqueId) {
return cache.getIfPresent(getKey(groupName, uniqueId));
}
public static boolean isExisted(String groupName, String uniqueId) {
return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId)));
}
public static boolean cancel(String groupName, String uniqueId) {
String key = getKey(groupName, uniqueId);
Timeout timeout = cache.getIfPresent(key);
if (Objects.isNull(timeout)) {
return false;
}
cache.invalidate(key);
return timeout.cancel();
}
public static void clearCache(String groupName, String uniqueId) {
cache.invalidate(getKey(groupName, uniqueId));
}
@Override
public void close() {
timer.stop();
cache.invalidateAll();
}
}

View File

@ -7,11 +7,19 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -23,6 +31,7 @@ import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -40,6 +49,8 @@ public class ScanJobTaskActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
private static final AtomicLong lastId = new AtomicLong(0L);
@ -77,15 +88,39 @@ public class ScanJobTaskActor extends AbstractActor {
lastId.set(jobs.get(jobs.size() - 1).getId());
for (Job job : jobs) {
JobContext jobContext = new JobContext();
// 选择客户端节点
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(job.getGroupName());
// TODO 校验一下客户端是否存活
jobContext.setRegisterNodeInfo(clientNodeAllocateHandler.getServerNode(job.getGroupName()));
Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper<JobTask>().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus()));
if (count <= 0) {
// 生成可执行任务
JobTask jobTask = new JobTask();
jobTask.setJobId(job.getId());
jobTask.setGroupName(job.getGroupName());
jobTaskMapper.insert(jobTask);
// 更新下次触发时间
// ToDo 根据CRON表达式计算
job.setNextTriggerAt(LocalDateTime.now().plusSeconds(50));
jobMapper.updateById(job);
} else {
BlockStrategyContext blockStrategyContext = new BlockStrategyContext();
BlockStrategy blockStrategy = BlockStrategyEnum.getBlockStrategy(job.getBlockStrategy());
blockStrategy.block(blockStrategyContext);
}
// 进入时间轮
JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(), 1, TimeUnit.MILLISECONDS);
// 校验是否存在已经在执行的任务了
// boolean isExist = true;
// if (isExist) {
// // 选择丢弃策略
//// String blockStrategy = job.getBlockStrategy();
// String blockStrategy = job.getBlockStrategy();
//
// }
}
@ -93,7 +128,7 @@ public class ScanJobTaskActor extends AbstractActor {
}
private List<Job> listAvailableJobs(Long lastId) {
return jobMapper.selectPage(new PageDTO<Job>(0, 100),
return jobMapper.selectPage(new PageDTO<Job>(0, 1000),
new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
// TODO 提前10秒把需要执行的任务拉取出来

View File

@ -0,0 +1,90 @@
package com.aizuda.easy.retry.server.job.task.strategy;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:52
*/
@Slf4j
public class BlockStrategies {
@AllArgsConstructor
@Getter
public enum BlockStrategyEnum {
DISCARD(1, new DiscardBlockStrategy()),
OVERLAY(2, new OverlayBlockStrategy()),
CONCURRENCY(3, new ConcurrencyBlockStrategy());
private final int blockStrategy;
private final BlockStrategy strategy;
public static BlockStrategy getBlockStrategy(int blockStrategy) {
for (final BlockStrategyEnum value : BlockStrategyEnum.values()) {
if (value.blockStrategy == blockStrategy) {
return value.getStrategy();
}
}
return null;
}
}
@Data
public static class BlockStrategyContext {
private Long jobId;
private Job job;
}
private static final class DiscardBlockStrategy implements BlockStrategy {
@Override
public boolean block(final BlockStrategyContext context) {
log.warn("阻塞策略为丢弃此次执行. jobId:[{}]", context.getJobId());
return false;
}
}
private static final class OverlayBlockStrategy implements BlockStrategy {
@Override
public boolean block(final BlockStrategyContext context) {
log.warn("阻塞策略为覆盖. jobId:[{}]", context.getJobId());
// 向客户端发送中断执行指令
return true;
}
}
private static final class ConcurrencyBlockStrategy implements BlockStrategy {
@Override
public boolean block(final BlockStrategyContext context) {
log.warn("阻塞策略为并行执行. jobId:[{}]", context.getJobId());
Job job = context.getJob();
JobTask jobTask = new JobTask();
jobTask.setJobId(job.getId());
jobTask.setGroupName(job.getGroupName());
JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class);
jobTaskMapper.insert(jobTask);
return false;
}
}
}