feat: 2.4.0
1. 阻塞条件
This commit is contained in:
parent
c04be51225
commit
50030df0d1
@ -0,0 +1,12 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-25 17:53
|
||||||
|
*/
|
||||||
|
public interface BlockStrategy {
|
||||||
|
|
||||||
|
boolean block(BlockStrategyContext context);
|
||||||
|
}
|
@ -0,0 +1,15 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.scan;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-25 17:35
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class JobContext {
|
||||||
|
|
||||||
|
private RegisterNodeInfo registerNodeInfo;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,36 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.scan;
|
||||||
|
|
||||||
|
import akka.actor.AbstractActor;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-25 17:41
|
||||||
|
*/
|
||||||
|
@Component(ActorGenerator.SCAN_JOB_ACTOR)
|
||||||
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
@Slf4j
|
||||||
|
public class JobExecutorActor extends AbstractActor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder().match(JobContext.class, jobContext -> {
|
||||||
|
try {
|
||||||
|
doExecute(jobContext);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "job executor exception. [{}]", jobContext, e);
|
||||||
|
}
|
||||||
|
}).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doExecute(final JobContext jobContext) {
|
||||||
|
|
||||||
|
// 调度客户端
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.scan;
|
||||||
|
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-25 17:28
|
||||||
|
*/
|
||||||
|
public class JobTimerTask implements TimerTask {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(final Timeout timeout) throws Exception {
|
||||||
|
// 执行任务调度
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,106 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.scan;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import io.netty.util.HashedWheelTimer;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-22 17:03
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class JobTimerWheelHandler implements Lifecycle {
|
||||||
|
|
||||||
|
private static HashedWheelTimer timer = null;
|
||||||
|
|
||||||
|
private static Cache<String, Timeout> cache;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
|
||||||
|
// TODO 支持可配置
|
||||||
|
// tickDuration 和 timeUnit 一格的时间长度
|
||||||
|
// ticksPerWheel 一圈有多少格
|
||||||
|
timer = new HashedWheelTimer(
|
||||||
|
new CustomizableThreadFactory("jop-task-timer-wheel-"), 100,
|
||||||
|
TimeUnit.MILLISECONDS, 1024);
|
||||||
|
|
||||||
|
timer.start();
|
||||||
|
|
||||||
|
cache = CacheBuilder.newBuilder()
|
||||||
|
// 设置并发级别为cpu核心数
|
||||||
|
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
||||||
|
|
||||||
|
if (delay < 0) {
|
||||||
|
delay = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO 支持可配置
|
||||||
|
if (delay > 60 * 1000) {
|
||||||
|
LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]",
|
||||||
|
groupName, uniqueId, delay);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Timeout timeout = getTimeout(groupName, uniqueId);
|
||||||
|
if (Objects.isNull(timeout)) {
|
||||||
|
try {
|
||||||
|
timeout = timer.newTimeout(task, delay, unit);
|
||||||
|
cache.put(getKey(groupName, uniqueId), timeout);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]",
|
||||||
|
groupName, uniqueId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getKey(String groupName, String uniqueId) {
|
||||||
|
return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Timeout getTimeout(String groupName, String uniqueId) {
|
||||||
|
return cache.getIfPresent(getKey(groupName, uniqueId));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isExisted(String groupName, String uniqueId) {
|
||||||
|
return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean cancel(String groupName, String uniqueId) {
|
||||||
|
String key = getKey(groupName, uniqueId);
|
||||||
|
Timeout timeout = cache.getIfPresent(key);
|
||||||
|
if (Objects.isNull(timeout)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.invalidate(key);
|
||||||
|
return timeout.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void clearCache(String groupName, String uniqueId) {
|
||||||
|
cache.invalidate(getKey(groupName, uniqueId));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
timer.stop();
|
||||||
|
cache.invalidateAll();
|
||||||
|
}
|
||||||
|
}
|
@ -7,11 +7,19 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
|||||||
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
|
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
|
||||||
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyEnum;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
@ -23,6 +31,7 @@ import java.time.LocalDateTime;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -40,6 +49,8 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private JobMapper jobMapper;
|
private JobMapper jobMapper;
|
||||||
@Autowired
|
@Autowired
|
||||||
|
private JobTaskMapper jobTaskMapper;
|
||||||
|
@Autowired
|
||||||
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||||
|
|
||||||
private static final AtomicLong lastId = new AtomicLong(0L);
|
private static final AtomicLong lastId = new AtomicLong(0L);
|
||||||
@ -77,15 +88,39 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
lastId.set(jobs.get(jobs.size() - 1).getId());
|
lastId.set(jobs.get(jobs.size() - 1).getId());
|
||||||
|
|
||||||
for (Job job : jobs) {
|
for (Job job : jobs) {
|
||||||
|
JobContext jobContext = new JobContext();
|
||||||
// 选择客户端节点
|
// 选择客户端节点
|
||||||
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(job.getGroupName());
|
|
||||||
// TODO 校验一下客户端是否存活
|
// TODO 校验一下客户端是否存活
|
||||||
|
jobContext.setRegisterNodeInfo(clientNodeAllocateHandler.getServerNode(job.getGroupName()));
|
||||||
|
|
||||||
|
Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper<JobTask>().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus()));
|
||||||
|
if (count <= 0) {
|
||||||
|
// 生成可执行任务
|
||||||
|
JobTask jobTask = new JobTask();
|
||||||
|
jobTask.setJobId(job.getId());
|
||||||
|
jobTask.setGroupName(job.getGroupName());
|
||||||
|
jobTaskMapper.insert(jobTask);
|
||||||
|
|
||||||
|
// 更新下次触发时间
|
||||||
|
// ToDo 根据CRON表达式计算
|
||||||
|
job.setNextTriggerAt(LocalDateTime.now().plusSeconds(50));
|
||||||
|
jobMapper.updateById(job);
|
||||||
|
} else {
|
||||||
|
BlockStrategyContext blockStrategyContext = new BlockStrategyContext();
|
||||||
|
|
||||||
|
BlockStrategy blockStrategy = BlockStrategyEnum.getBlockStrategy(job.getBlockStrategy());
|
||||||
|
blockStrategy.block(blockStrategyContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// 进入时间轮
|
||||||
|
JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(), 1, TimeUnit.MILLISECONDS);
|
||||||
// 校验是否存在已经在执行的任务了
|
// 校验是否存在已经在执行的任务了
|
||||||
// boolean isExist = true;
|
// boolean isExist = true;
|
||||||
// if (isExist) {
|
// if (isExist) {
|
||||||
// // 选择丢弃策略
|
// // 选择丢弃策略
|
||||||
//// String blockStrategy = job.getBlockStrategy();
|
// String blockStrategy = job.getBlockStrategy();
|
||||||
//
|
//
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
@ -93,7 +128,7 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private List<Job> listAvailableJobs(Long lastId) {
|
private List<Job> listAvailableJobs(Long lastId) {
|
||||||
return jobMapper.selectPage(new PageDTO<Job>(0, 100),
|
return jobMapper.selectPage(new PageDTO<Job>(0, 1000),
|
||||||
new LambdaQueryWrapper<Job>()
|
new LambdaQueryWrapper<Job>()
|
||||||
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
||||||
// TODO 提前10秒把需要执行的任务拉取出来
|
// TODO 提前10秒把需要执行的任务拉取出来
|
||||||
|
@ -0,0 +1,90 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.strategy;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-25 17:52
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class BlockStrategies {
|
||||||
|
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Getter
|
||||||
|
public enum BlockStrategyEnum {
|
||||||
|
DISCARD(1, new DiscardBlockStrategy()),
|
||||||
|
OVERLAY(2, new OverlayBlockStrategy()),
|
||||||
|
CONCURRENCY(3, new ConcurrencyBlockStrategy());
|
||||||
|
|
||||||
|
private final int blockStrategy;
|
||||||
|
private final BlockStrategy strategy;
|
||||||
|
|
||||||
|
public static BlockStrategy getBlockStrategy(int blockStrategy) {
|
||||||
|
for (final BlockStrategyEnum value : BlockStrategyEnum.values()) {
|
||||||
|
if (value.blockStrategy == blockStrategy) {
|
||||||
|
return value.getStrategy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class BlockStrategyContext {
|
||||||
|
|
||||||
|
private Long jobId;
|
||||||
|
|
||||||
|
private Job job;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class DiscardBlockStrategy implements BlockStrategy {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean block(final BlockStrategyContext context) {
|
||||||
|
log.warn("阻塞策略为丢弃此次执行. jobId:[{}]", context.getJobId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class OverlayBlockStrategy implements BlockStrategy {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean block(final BlockStrategyContext context) {
|
||||||
|
log.warn("阻塞策略为覆盖. jobId:[{}]", context.getJobId());
|
||||||
|
// 向客户端发送中断执行指令
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class ConcurrencyBlockStrategy implements BlockStrategy {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean block(final BlockStrategyContext context) {
|
||||||
|
log.warn("阻塞策略为并行执行. jobId:[{}]", context.getJobId());
|
||||||
|
Job job = context.getJob();
|
||||||
|
|
||||||
|
JobTask jobTask = new JobTask();
|
||||||
|
jobTask.setJobId(job.getId());
|
||||||
|
jobTask.setGroupName(job.getGroupName());
|
||||||
|
|
||||||
|
JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class);
|
||||||
|
jobTaskMapper.insert(jobTask);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user