feat:2.4.0

1. 完成任务调度的表结构设计
This commit is contained in:
byteblogs168 2023-09-24 23:11:21 +08:00
parent cefd97edb3
commit 304118216b
17 changed files with 633 additions and 0 deletions

View File

@ -134,6 +134,7 @@ CREATE TABLE `scene_config`
`back_off` tinyint(4) NOT NULL DEFAULT '1' COMMENT '1、默认等级 2、固定间隔时间 3、CRON 表达式',
`trigger_interval` varchar(16) NOT NULL DEFAULT '' COMMENT '间隔时长',
`deadline_request` bigint(20) unsigned NOT NULL DEFAULT '60000' COMMENT 'Deadline Request 调用链超时 单位毫秒',
`bucket_index` int(11) DEFAULT NULL COMMENT 'bucket',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
@ -212,3 +213,66 @@ CREATE TABLE `sequence_alloc`
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_name` (`group_name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='号段模式序号ID分配表';
-- 分布式调度DDL
CREATE TABLE `job` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_name` VARCHAR ( 64 ) NOT NULL COMMENT '名称',
`args_str` TEXT NOT NULL COMMENT '执行方法参数',
`args_type` VARCHAR ( 16 ) NOT NULL DEFAULT '' COMMENT '参数类型 text/json',
`ext_attrs` TEXT NOT NULL COMMENT '扩展字段',
`next_trigger_at` DATETIME NOT NULL COMMENT '下次触发时间',
`job_status` TINYINT ( 4 ) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启',
`route_key` VARCHAR ( 50 ) DEFAULT NULL COMMENT '执行器路由策略',
`executor_type` TINYINT ( 4 ) NOT NULL DEFAULT '1' COMMENT '执行器类型 1、Java',
`executor_name` VARCHAR ( 255 ) DEFAULT NULL COMMENT '执行器名称',
`block_strategy` VARCHAR ( 50 ) DEFAULT NULL COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
`executor_timeout` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`max_retry_times` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '最大重试次数',
`retry_interval` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)',
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
PRIMARY KEY ( `id` ),
KEY `idx_group_name` ( `group_name` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '任务信息';
CREATE TABLE `job_task` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_id` BIGINT ( 20 ) NOT NULL COMMENT '任务id',
`retry_count` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '重试次数',
`task_status` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '任务状态 0、失败 1、成功',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '调度任务';
CREATE TABLE `job_task_instance` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_id` BIGINT ( 20 ) NOT NULL COMMENT '任务信息id',
`task_id` BIGINT ( 20 ) NOT NULL COMMENT '调度任务id',
`parent_id` BIGINT ( 20 ) NOT NULL DEFAULT '0' COMMENT '父执行器id',
`execute_status` TINYINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '执行的状态 0、失败 1、成功',
`result_message` TEXT NOT NULL COMMENT '执行结果',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '任务实例';
CREATE TABLE `job_log_message` (
`id` BIGINT ( 20 ) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` VARCHAR ( 64 ) NOT NULL COMMENT '组名称',
`job_id` BIGINT ( 20 ) NOT NULL COMMENT '任务信息id',
`task_id` BIGINT ( 20 ) NOT NULL COMMENT '任务实例id',
`task_instance_id` BIGINT ( 20 ) NOT NULL COMMENT '调度任务id',
`create_dt` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`message` TEXT NOT NULL COMMENT '调度信息',
PRIMARY KEY ( `id` )
) ENGINE = INNODB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT = '调度日志';

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* 调度日志 Mapper 接口
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Mapper
public interface JobLogMessageMapper extends BaseMapper<JobLogMessage> {
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* 任务信息 Mapper 接口
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Mapper
public interface JobMapper extends BaseMapper<Job> {
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskInstance;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* 任务实例 Mapper 接口
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Mapper
public interface JobTaskInstanceMapper extends BaseMapper<JobTaskInstance> {
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* 调度任务 Mapper 接口
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Mapper
public interface JobTaskMapper extends BaseMapper<JobTask> {
}

View File

@ -0,0 +1,126 @@
package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 任务信息
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Getter
@Setter
public class Job implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 重试状态 0关闭1开启
*/
private Integer jobStatus;
/**
* 执行器路由策略
*/
private String routeKey;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorName;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
/**
* bucket
*/
private Integer bucketIndex;
/**
* 描述
*/
private String description;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 修改时间
*/
private LocalDateTime updateDt;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
}

View File

@ -0,0 +1,63 @@
package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 调度日志
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Getter
@Setter
@TableName("job_log_message")
public class JobLogMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 任务信息id
*/
private Long jobId;
/**
* 任务实例id
*/
private Long taskId;
/**
* 调度任务id
*/
private Long taskInstanceId;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 调度信息
*/
private String message;
}

View File

@ -0,0 +1,70 @@
package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 调度任务
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Getter
@Setter
@TableName("job_task")
public class JobTask implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 任务id
*/
private Long jobId;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 任务状态 0失败 1成功
*/
private Integer taskStatus;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 修改时间
*/
private LocalDateTime updateDt;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
}

View File

@ -0,0 +1,73 @@
package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 任务实例
* </p>
*
* @author www.byteblogs.com
* @since 2023-09-24
*/
@Getter
@Setter
@TableName("job_task_instance")
public class JobTaskInstance implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 任务信息id
*/
private Long jobId;
/**
* 调度任务id
*/
private Long taskId;
/**
* 父执行器id
*/
private Long parentId;
/**
* 执行的状态 0失败 1成功
*/
private Integer executeStatus;
/**
* 执行结果
*/
private String resultMessage;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 修改时间
*/
private LocalDateTime updateDt;
}

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_id" property="taskId" />
<result column="task_instance_id" property="taskInstanceId" />
<result column="create_dt" property="createDt" />
<result column="message" property="message" />
</resultMap>
</mapper>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.Job">
<id column="id" property="id"/>
<result column="group_name" property="groupName"/>
<result column="job_name" property="jobName"/>
<result column="args_str" property="argsStr"/>
<result column="args_type" property="argsType"/>
<result column="ext_attrs" property="extAttrs"/>
<result column="next_trigger_at" property="nextTriggerAt"/>
<result column="job_status" property="jobStatus"/>
<result column="route_key" property="routeKey"/>
<result column="executor_type" property="executorType"/>
<result column="executor_name" property="executorName"/>
<result column="block_strategy" property="blockStrategy"/>
<result column="executor_timeout" property="executorTimeout"/>
<result column="max_retry_times" property="maxRetryTimes"/>
<result column="retry_interval" property="retryInterval"/>
<result column="bucket_index" property="bucketIndex"/>
<result column="description" property="description"/>
<result column="create_dt" property="createDt"/>
<result column="update_dt" property="updateDt"/>
<result column="deleted" property="deleted"/>
</resultMap>
</mapper>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskInstanceMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskInstance">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_id" property="taskId" />
<result column="parent_id" property="parentId" />
<result column="execute_status" property="executeStatus" />
<result column="result_message" property="resultMessage" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
</resultMap>
</mapper>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTask">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="retry_count" property="retryCount" />
<result column="task_status" property="taskStatus" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
<result column="deleted" property="deleted" />
</resultMap>
</mapper>

View File

@ -0,0 +1,9 @@
package com.aizuda.easy.retry.server.job.task.enums;
/**
* @author www.byteblogs.com
* @date 2023-09-24 12:01:31
* @since 2.4.0
*/
public enum BlockStrategyEnum {
}

View File

@ -0,0 +1,9 @@
package com.aizuda.easy.retry.server.job.task.executor;
/**
* @author www.byteblogs.com
* @date 2023-09-24 11:40:21
* @since 2.4.0
*/
public class JobExecutor {
}

View File

@ -1,13 +1,29 @@
package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* JOB任务扫描
@ -21,6 +37,13 @@ import org.springframework.stereotype.Component;
@Slf4j
public class ScanJobTaskActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
private static final AtomicLong lastId = new AtomicLong(0L);
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
@ -37,5 +60,46 @@ public class ScanJobTaskActor extends AbstractActor {
private void doScan(final ScanTask scanTask) {
List<Job> jobs = listAvailableJobs(lastId.get());
if (CollectionUtils.isEmpty(jobs)) {
// 数据为空则休眠5s
try {
Thread.sleep((10 / 2) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 重置最大id
lastId.set(0L);
return;
}
lastId.set(jobs.get(jobs.size() - 1).getId());
for (Job job : jobs) {
// 选择客户端节点
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(job.getGroupName());
// TODO 校验一下客户端是否存活
// 校验是否存在已经在执行的任务了
// boolean isExist = true;
// if (isExist) {
// // 选择丢弃策略
//// String blockStrategy = job.getBlockStrategy();
//
// }
}
}
private List<Job> listAvailableJobs(Long lastId) {
return jobMapper.selectPage(new PageDTO<Job>(0, 100),
new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
// TODO 提前10秒把需要执行的任务拉取出来
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(10))
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.gt(Job::getId, lastId)
).getRecords();
}
}

View File

@ -116,7 +116,10 @@ public abstract class AbstractScanGroup extends AbstractActor {
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName)
.eq(RetryTask::getTaskType, taskType)
// TODO 提前10秒把需要执行的任务拉取出来
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10))
.gt(RetryTask::getId, lastId)
// TODO 验证一下lastAt会不会改变
.gt(RetryTask::getCreateDt, lastAt)
.orderByAsc(RetryTask::getId)
.orderByAsc(RetryTask::getCreateDt))