feat:2.4.0

1. 完成任务调度的表结构设计
This commit is contained in:
byteblogs168 2023-09-25 23:44:59 +08:00
parent 409d7e42e6
commit b8d157c3f5
12 changed files with 477 additions and 39 deletions

View File

@ -2,8 +2,10 @@ package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
@ -77,6 +79,16 @@ public class Job implements Serializable {
*/
private String executorName;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/

View File

@ -15,7 +15,6 @@ public class ActorGenerator {
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
public static final String FINISH_ACTOR = "FinishActor";
public static final String FAILURE_ACTOR = "FailureActor";
@ -25,6 +24,10 @@ public class ActorGenerator {
public static final String LOG_ACTOR = "LogActor";
public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor";
/*----------------------------------------分布式任务调度----------------------------------------*/
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
private ActorGenerator() {}
@ -127,6 +130,15 @@ public class ActorGenerator {
return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR));
}
/**
* Job调度准备阶段actor
*
* @return actor 引用
*/
public static ActorRef jobTaskPrepareActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR));
}
public static SpringExtension getSpringExtension() {
return SpringContext.getBeanByType(SpringExtension.class);
}
@ -173,4 +185,15 @@ public class ActorGenerator {
public static ActorSystem getNettyActorSystem() {
return SpringContext.getBean("nettyActorSystem", ActorSystem.class);
}
/**
* 处理Job调度
*
* @return
*/
public static ActorSystem getJobActorSystem() {
return SpringContext.getBean("jobActorSystem", ActorSystem.class);
}
}

View File

@ -20,6 +20,7 @@ public class AkkaConfiguration {
private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM";
private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM";
private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM";
private static final String JOB_ACTOR_SYSTEM = "JOB_ACTOR_SYSTEM";
@Autowired
private ApplicationContext applicationContext;
@ -86,4 +87,16 @@ public class AkkaConfiguration {
springExtension.initialize(applicationContext);
return system;
}
/**
* 处理job调度
*
* @return {@link ActorSystem} 顶级actor
*/
@Bean("jobActorSystem")
public ActorSystem jobActorSystem() {
ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM);
springExtension.initialize(applicationContext);
return system;
}
}

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2023-09-25 22:48:36
* @since 2.4.0
*/
@Data
public class FilterStrategyContext {
private Long id;
private RegisterNodeInfo registerNodeInfo;
private String groupName;
private LocalDateTime nextTriggerAt;
private String uniqueId;
private List<String> sceneBlacklist;
private String sceneName;
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.server.job.task;
import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies;
import java.time.LocalDateTime;
/**
* 等待策略退避策略
*
* @author: www.byteblogs.com
* @date : 2021-11-29 18:18
*/
public interface WaitStrategy {
/**
* 计算下次重试触发时间
*
* @param context {@link WaitStrategies.WaitStrategyContext} 重试上下文
* @return 下次触发时间
*/
LocalDateTime computeRetryTime(WaitStrategies.WaitStrategyContext context);
}

View File

@ -0,0 +1,102 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author www.byteblogs.com
* @date 2023-09-25 22:42:21
* @since 2.4.0
*/
@Data
public class JobTaskPrepareDTO {
private Long jobId;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 重试状态 0关闭1开启
*/
private Integer jobStatus;
/**
* 执行器路由策略
*/
private String routeKey;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorName;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
/**
* bucket
*/
private Integer bucketIndex;
}

View File

@ -29,7 +29,6 @@ public class JobExecutorActor extends AbstractActor {
}
private void doExecute(final JobContext jobContext) {
// 调度客户端
}

View File

@ -0,0 +1,73 @@
package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
* @date 2023-09-25 22:20:53
* @since
*/
@Component(ActorGenerator.SCAN_JOB_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobTaskPrepareActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(JobTaskPrepareDTO.class, job -> {
try {
doPrepare(job);
} catch (Exception e) {
}
}).build();
}
private void doPrepare(JobTaskPrepareDTO prepare) {
Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper<JobTask>().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus()));
if (count <= 0) {
// 生成可执行任务
JobTask jobTask = new JobTask();
jobTask.setJobId(prepare.getJobId());
jobTask.setGroupName(prepare.getGroupName());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", prepare.getJobId()));
JobContext jobContext = new JobContext();
// 进入时间轮
JobTimerWheelHandler.register(prepare.getGroupName(), prepare.getJobId().toString(), new JobTimerTask(jobContext), 1, TimeUnit.MILLISECONDS);
} else {
BlockStrategies.BlockStrategyContext blockStrategyContext = new BlockStrategies.BlockStrategyContext();
BlockStrategy blockStrategy = BlockStrategies.BlockStrategyEnum.getBlockStrategy(prepare.getBlockStrategy());
blockStrategy.block(blockStrategyContext);
}
}
}

View File

@ -9,6 +9,12 @@ import io.netty.util.TimerTask;
*/
public class JobTimerTask implements TimerTask {
private JobContext jobContext;
public JobTimerTask(JobContext jobContext) {
this.jobContext = jobContext;
}
@Override
public void run(final Timeout timeout) throws Exception {
// 执行任务调度

View File

@ -1,16 +1,22 @@
package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
@ -34,6 +40,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.*;
/**
* JOB任务扫描
*
@ -88,41 +96,23 @@ public class ScanJobTaskActor extends AbstractActor {
lastId.set(jobs.get(jobs.size() - 1).getId());
for (Job job : jobs) {
JobContext jobContext = new JobContext();
// 选择客户端节点
// TODO 校验一下客户端是否存活
jobContext.setRegisterNodeInfo(clientNodeAllocateHandler.getServerNode(job.getGroupName()));
Long count = jobTaskMapper.selectCount(new LambdaQueryWrapper<JobTask>().eq(JobTask::getTaskStatus, StatusEnum.YES.getStatus()));
if (count <= 0) {
// 生成可执行任务
JobTask jobTask = new JobTask();
jobTask.setJobId(job.getId());
jobTask.setGroupName(job.getGroupName());
jobTaskMapper.insert(jobTask);
// 更新下次触发时间
// ToDo 根据CRON表达式计算
job.setNextTriggerAt(LocalDateTime.now().plusSeconds(50));
jobMapper.updateById(job);
} else {
BlockStrategyContext blockStrategyContext = new BlockStrategyContext();
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(job.getTriggerType());
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(job.getNextTriggerAt());
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
Assert.isTrue(1 == jobMapper.updateById(job), () -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId()));
BlockStrategy blockStrategy = BlockStrategyEnum.getBlockStrategy(job.getBlockStrategy());
blockStrategy.block(blockStrategyContext);
}
// 进入时间轮
JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(), 1, TimeUnit.MILLISECONDS);
// 校验是否存在已经在执行的任务了
// boolean isExist = true;
// if (isExist) {
// // 选择丢弃策略
// String blockStrategy = job.getBlockStrategy();
//
// }
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
JobTaskPrepareDTO jobTaskPrepareDTO = new JobTaskPrepareDTO();
jobTaskPrepareDTO.setJobId(job.getId());
jobTaskPrepareDTO.setTriggerType(job.getTriggerType());
jobTaskPrepareDTO.setNextTriggerAt(job.getNextTriggerAt());
actorRef.tell(jobTaskPrepareDTO, actorRef);
}
}

View File

@ -1,7 +1,12 @@
package com.aizuda.easy.retry.server.job.task.strategy;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.scan.JobContext;
import com.aizuda.easy.retry.server.job.task.scan.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.scan.JobTimerWheelHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -10,6 +15,8 @@ import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:52
@ -78,9 +85,12 @@ public class BlockStrategies {
JobTask jobTask = new JobTask();
jobTask.setJobId(job.getId());
jobTask.setGroupName(job.getGroupName());
JobTaskMapper jobTaskMapper = SpringContext.getBeanByType(JobTaskMapper.class);
jobTaskMapper.insert(jobTask);
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", job.getId()));
JobContext jobContext = new JobContext();
// 进入时间轮
JobTimerWheelHandler.register(job.getGroupName(), job.getId().toString(), new JobTimerTask(jobContext), 1, TimeUnit.MILLISECONDS);
return false;
}

View File

@ -0,0 +1,155 @@
package com.aizuda.easy.retry.server.job.task.strategy;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.WaitStrategy;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
/**
* 生成 {@link WaitStrategy} 实例.
*
* @author: www.byteblogs.com
* @date : 2021-11-29 18:19
*/
public class WaitStrategies {
private WaitStrategies() {
}
@Data
public static class WaitStrategyContext {
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
}
@Getter
@AllArgsConstructor
public enum WaitStrategyEnum {
CRON(1, cronWait()),
FIXED(2, fixedWait()),
;
private final int triggerType;
private final WaitStrategy waitStrategy;
/**
* 获取退避策略
*
* @param triggerType 触发类型
* @return 退避策略
*/
public static WaitStrategy getWaitStrategy(int triggerType) {
WaitStrategyEnum waitStrategy = getWaitStrategyEnum(triggerType);
switch (waitStrategy) {
case CRON:
return cronWait();
case FIXED:
return fixedWait();
default:
return waitStrategy.waitStrategy;
}
}
/**
* 获取退避策略枚举对象
*
* @param backOff 退避策略
* @return 退避策略枚举对象
*/
public static WaitStrategyEnum getWaitStrategyEnum(int backOff) {
for (WaitStrategyEnum value : WaitStrategyEnum.values()) {
if (value.triggerType == backOff) {
return value;
}
}
// TODO 兜底为默认等级策略
return null;
}
}
/**
* 固定定时间等待策略
*
* @return {@link FixedWaitStrategy} 固定定时间等待策略
*/
public static WaitStrategy fixedWait() {
return new FixedWaitStrategy();
}
/**
* cron等待策略
*
* @return {@link CronWaitStrategy} cron等待策略
*/
public static WaitStrategy cronWait() {
return new CronWaitStrategy();
}
/**
* 固定定时间等待策略
*/
private static final class FixedWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
long triggerInterval = Long.parseLong(context.triggerInterval);
return context.getNextTriggerAt().plusSeconds(triggerInterval);
}
}
/**
* Cron等待策略
*/
private static final class CronWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
Date nextValidTime;
try {
ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));
} catch (ParseException e) {
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);
}
return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8));
}
}
}