feat:(1.2.0-beta2): 新增OPENAPI功能

This commit is contained in:
srzou 2024-10-09 11:12:05 +08:00
parent 7271eb8ee2
commit 9e5abcb43c
46 changed files with 2199 additions and 27 deletions

View File

@ -18,6 +18,7 @@
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hibernate.verion>8.0.1.Final</hibernate.verion>
</properties>
<dependencies>
<dependency>
@ -85,6 +86,12 @@
<artifactId>log4j</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>${hibernate.verion}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,132 @@
package com.aizuda.snailjob.client.job.core.dto;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author opensnail
* @date 2023-10-11 22:30:00
* @since 2.4.0
*/
@Data
public class JobResponseVO {
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 重试状态 0关闭1开启
*/
private Integer jobStatus;
/**
* 执行器路由策略
*/
private Integer routeKey;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorInfo;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
/**
* 任务类型
*/
private Integer taskType;
/**
* 并行数
*/
private Integer parallelNum;
/**
* bucket
*/
private Integer bucketIndex;
/**
* 描述
*/
private String description;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 修改时间
*/
private LocalDateTime updateDt;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
}

View File

@ -0,0 +1,107 @@
package com.aizuda.snailjob.client.job.core.dto;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@Data
public class RequestAddJobDTO{
/**
* 名称
*/
@NotBlank(message = "jobName 不能为空")
private String jobName;
/**
* 重试状态 0关闭1开启
* {@link StatusEnum}
*/
@NotNull(message = "jobStatus 不能为空")
private Integer jobStatus;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private Integer argsType;
/**
* 执行器路由策略
*/
@NotNull(message = "routeKey 不能为空")
private Integer routeKey;
/**
* 执行器类型
* {@link ExecutorTypeEnum}
*/
@NotNull(message = "executorType 不能为空")
private Integer executorType;
/**
* 执行器名称
*/
@NotBlank(message = "executorInfo 不能为空")
private String executorInfo;
/**
* 触发类型 2. 固定时间 3.CRON 表达式 99.工作流
*/
@NotNull(message = "triggerType 不能为空")
private Integer triggerType;
/**
* 间隔时长
*/
@NotNull(message = "triggerInterval 不能为空")
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
@NotNull(message = "blockStrategy 不能为空")
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
@NotNull(message = "executorTimeout 不能为空")
private Integer executorTimeout;
/**
* 最大重试次数
*/
@NotNull(message = "maxRetryTimes 不能为空")
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
@NotNull(message = "retryInterval 不能为空")
private Integer retryInterval;
/**
* 任务类型
* {@link JobTaskTypeEnum}
*/
@NotNull(message = "taskType 不能为空")
private Integer taskType;
/**
* 并行数
*/
@NotNull(message = "parallelNum 不能为空")
private Integer parallelNum;
/**
* 描述
*/
private String description;
}

View File

@ -0,0 +1,102 @@
package com.aizuda.snailjob.client.job.core.dto;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* @author opensnail
* @date 2023-10-11 22:37:55
* @since 2.4.0
*/
@Data
public class RequestUpdateJobDTO {
@NotNull(message = "id 不能为空")
private Long id;
/**
* 名称
*/
private String jobName;
/**
* 重试状态 0关闭1开启
* {@link StatusEnum}
*/
private Integer jobStatus;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private Integer argsType;
/**
* 执行器路由策略
*/
private Integer routeKey;
/**
* 执行器类型
* {@link ExecutorTypeEnum}
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorInfo;
/**
* 触发类型 2. 固定时间 3.CRON 表达式 99.工作流
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
/**
* 任务类型
* {@link JobTaskTypeEnum}
*/
private Integer taskType;
/**
* 并行数
*/
private Integer parallelNum;
/**
* 描述
*/
private String description;
}

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.client.job.core.dto;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* @author opensnail
* @date 2023-10-15 16:06:20
* @since 2.4.0
*/
@Data
public class RequestUpdateStatusDTO {
@NotNull(message = "id 不能为空")
private Long id;
@NotNull(message = "jobStatus 不能为空")
private Integer jobStatus;
}

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.client.job.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum AllocationAlgorithmEnum {
// Hash
CONSISTENT_HASH(1),
// 随机
RANDOM(2),
// LRU
LRU(3),
// 轮询
ROUND(4),
// 匹配第一个
FIRST(5),
// 匹配最后一个
LAST(6);
private final int type;
}

View File

@ -0,0 +1,14 @@
package com.aizuda.snailjob.client.job.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum TriggerTypeEnum {
SCHEDULED_TIME(2),
CRON(3),
WORK_FLOW(99);
private final int type;
}

View File

@ -0,0 +1,28 @@
package com.aizuda.snailjob.client.job.core.handler;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
/**
* @author opensnail
* @date 2024-09-29 20:40:10
* @since sj_1.1.0
*/
public abstract class AbstractRequestHandler<R> implements RequestHandler<R> {
/**
* 具体调用
* @return
*/
@Override
public R execute() {
if (checkRequest()) {
return doExecute();
} else {
throw new SnailJobClientException("snail job openapi check error");
}
}
protected abstract R doExecute();
protected abstract boolean checkRequest();
}

View File

@ -0,0 +1,242 @@
package com.aizuda.snailjob.client.job.core.handler;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.RequestAddJobDTO;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum;
import com.aizuda.snailjob.client.job.core.util.ValidatorUtils;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import java.util.HashMap;
import java.util.Map;
public class RequestAddHandler extends AbstractRequestHandler<Long> {
private RequestAddJobDTO requestAddJobDTO;
public RequestAddHandler(JobTaskTypeEnum taskType, Integer shardNum) {
this.requestAddJobDTO = new RequestAddJobDTO();
// 默认创建就开启
requestAddJobDTO.setJobStatus(StatusEnum.YES.getStatus());
// 设置任务类型
requestAddJobDTO.setTaskType(taskType.getType());
// 默认java
requestAddJobDTO.setExecutorType(ExecutorTypeEnum.JAVA.getType());
// 设置分片
if (shardNum != null){
Map<String, Object> map = new HashMap<>(1);
map.put("shardNum", shardNum);
requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(map));
}
}
@Override
protected Long doExecute() {
String data = JsonUtil.toJsonString(client.addJob(requestAddJobDTO).getData());
return Long.valueOf(data);
}
@Override
protected boolean checkRequest() {
boolean validated = ValidatorUtils.validateEntity(requestAddJobDTO);
// 如果校验正确则正对进行相关筛选
if (validated) {
if (requestAddJobDTO.getTaskType() == JobTaskTypeEnum.CLUSTER.getType()){
// 集群模式只允许并发为 1
setParallelNum(1);
}
if (requestAddJobDTO.getTriggerType() == TriggerTypeEnum.WORK_FLOW.getType()){
// 工作流没有调度时间
setTriggerInterval("*");
}
}
return validated;
}
/**
* 设置任务名
* @param jobName 任务名
* @return
*/
public RequestAddHandler setJobName(String jobName) {
requestAddJobDTO.setJobName(jobName);
return this;
}
/**
* 设置参数
* @param argsStr
* @return
*/
private RequestAddHandler setArgsStr(Map<String, Object> argsStr) {
Map<String, Object> args = new HashMap<>();
if (StrUtil.isNotBlank(requestAddJobDTO.getArgsStr())){
args = JsonUtil.parseHashMap(requestAddJobDTO.getArgsStr());
}
args.putAll(argsStr);
requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(args));
requestAddJobDTO.setArgsType(2);
return this;
}
/**
* 添加参数可支持多次添加
* 静态分片不可使用该方法
* @param argsKey 参数名
* @param argsValue 参数值
* @return
*/
public RequestAddHandler addArgsStr(String argsKey, Object argsValue) {
if (requestAddJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){
SnailJobLog.LOCAL.warn("静态分片任务不可使用该方法添加相关任务参数请使用addShardingArgs");
return this;
}
Map<String, Object> map = new HashMap<>();
if (StrUtil.isNotBlank(requestAddJobDTO.getArgsStr())){
map = JsonUtil.parseHashMap(requestAddJobDTO.getArgsStr());
}
map.put(argsKey, argsValue);
requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(map));
requestAddJobDTO.setArgsType(2);
return this;
}
/**
* 添加静态分片相关参数
* @param shardingValue
* @return
*/
public RequestAddHandler addShardingArgs(String[] shardingValue){
if (!requestAddJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){
SnailJobLog.LOCAL.warn("非静态分片任务不可使用该方法添加相关任务参数请使用addArgsStr");
return this;
}
requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(shardingValue));
requestAddJobDTO.setArgsType(1);
return this;
}
/**
* 设置路由
* @param algorithmEnum 路由算法
* @return
*/
public RequestAddHandler setRouteKey(AllocationAlgorithmEnum algorithmEnum) {
// 非集群模式 路由策略只能为轮询
if (requestAddJobDTO.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()){
setRouteKey(AllocationAlgorithmEnum.ROUND);
SnailJobLog.LOCAL.warn("非集群模式 路由策略只能为轮询");
return this;
}
requestAddJobDTO.setRouteKey(algorithmEnum.getType());
return this;
}
/**
* 设置执行器信息
* @param executorInfo
* @return
*/
public RequestAddHandler setExecutorInfo(String executorInfo) {
requestAddJobDTO.setExecutorInfo(executorInfo);
return this;
}
/**
* 设置调度类型
* @param triggerType
* @return
*/
public RequestAddHandler setTriggerType(TriggerTypeEnum triggerType) {
requestAddJobDTO.setTriggerType(triggerType.getType());
if (requestAddJobDTO.getTriggerType() == TriggerTypeEnum.WORK_FLOW.getType()){
// 工作流没有调度时间
setTriggerInterval("*");
}
return this;
}
/**
* 设置触发间隔
* 单位
* 工作流无需配置
* @param triggerInterval
* @return
*/
public RequestAddHandler setTriggerInterval(String triggerInterval) {
requestAddJobDTO.setTriggerInterval(triggerInterval);
return this;
}
/**
* 设置阻塞策略
* @param blockStrategy
* @return
*/
public RequestAddHandler setBlockStrategy(BlockStrategyEnum blockStrategy) {
// 非集群模式 路由策略只能为轮询
if (requestAddJobDTO.getTaskType() == JobTaskTypeEnum.CLUSTER.getType()
&& blockStrategy.getBlockStrategy() == BlockStrategyEnum.CONCURRENCY.getBlockStrategy()){
throw new SnailJobClientException("集群模式不能使用并行阻塞策略");
}
requestAddJobDTO.setBlockStrategy(blockStrategy.getBlockStrategy());
return this;
}
/**
* 设置执行器超时时间
* @param executorTimeout
* @return
*/
public RequestAddHandler setExecutorTimeout(Integer executorTimeout) {
requestAddJobDTO.setExecutorTimeout(executorTimeout);
return this;
}
/**
* 设置任务最大重试次数
* @param maxRetryTimes
* @return
*/
public RequestAddHandler setMaxRetryTimes(Integer maxRetryTimes) {
requestAddJobDTO.setMaxRetryTimes(maxRetryTimes);
return this;
}
/**
* 设置重试间隔
* @param retryInterval
* @return
*/
public RequestAddHandler setRetryInterval(Integer retryInterval) {
requestAddJobDTO.setRetryInterval(retryInterval);
return this;
}
/**
* 设置并发数量
* @param parallelNum
* @return
*/
public RequestAddHandler setParallelNum(Integer parallelNum) {
requestAddJobDTO.setParallelNum(parallelNum);
return this;
}
/**
* 设置定时任务描述
* @param description
* @return
*/
public RequestAddHandler setDescription(String description) {
requestAddJobDTO.setDescription(description);
return this;
}
}

View File

@ -0,0 +1,16 @@
package com.aizuda.snailjob.client.job.core.handler;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.job.core.openapi.OpenApiClient;
import com.aizuda.snailjob.common.core.model.NettyResult;
public interface RequestHandler<R> {
OpenApiClient client = RequestBuilder.<OpenApiClient, NettyResult>newBuilder()
.client(OpenApiClient.class)
.async(false)
.build();
R execute();
}

View File

@ -0,0 +1,30 @@
package com.aizuda.snailjob.client.job.core.handler;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.JobResponseVO;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import java.util.Objects;
public class RequestQueryHandler extends AbstractRequestHandler<JobResponseVO> {
private Long queryJobId;
public RequestQueryHandler(Long queryJobId) {
this.queryJobId = queryJobId;
}
@Override
protected JobResponseVO doExecute() {
Object data = client.getJobDetail(queryJobId).getData();
Assert.isTrue(Objects.nonNull(data),()-> new SnailJobClientException("获取[{}]任务详情失败", queryJobId));
return JsonUtil.parseObject(JsonUtil.toJsonString(data), JobResponseVO.class);
}
@Override
protected boolean checkRequest() {
return queryJobId != null && ! Long.valueOf(0).equals(queryJobId);
}
}

View File

@ -0,0 +1,30 @@
package com.aizuda.snailjob.client.job.core.handler;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
public class RequestTriggerJobHandler extends AbstractRequestHandler<Boolean>{
private Long triggerJobId;
// 1: job; 2: workflow
private int triggerType;
public RequestTriggerJobHandler(Long tiggerJobId, int triggerType) {
this.triggerJobId = tiggerJobId;
this.triggerType = triggerType;
}
@Override
protected Boolean doExecute() {
if (triggerType == 1) {
return (Boolean) client.triggerJob(triggerJobId).getData();
}
if (triggerType == 2) {
return (Boolean) client.triggerWorkFlow(triggerJobId).getData();
}
throw new SnailJobClientException("snail job openapi check error");
}
@Override
protected boolean checkRequest() {
return triggerJobId != null && !Long.valueOf(0).equals(triggerJobId);
}
}

View File

@ -0,0 +1,249 @@
package com.aizuda.snailjob.client.job.core.handler;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateJobDTO;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum;
import com.aizuda.snailjob.client.job.core.util.ValidatorUtils;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import java.util.HashMap;
import java.util.Map;
public class RequestUpdateHandler extends AbstractRequestHandler<Boolean> {
private RequestUpdateJobDTO requestUpdateJobDTO;
public RequestUpdateHandler(Long jobId) {
this.requestUpdateJobDTO = new RequestUpdateJobDTO();
// 更新必须要id
requestUpdateJobDTO.setId(jobId);
// 默认java
requestUpdateJobDTO.setExecutorType(ExecutorTypeEnum.JAVA.getType());
}
@Override
protected Boolean doExecute() {
return (Boolean) client.updateJob(requestUpdateJobDTO).getData();
}
@Override
protected boolean checkRequest() {
boolean validated = ValidatorUtils.validateEntity(requestUpdateJobDTO);
// 如果校验正确则正对进行相关筛选
if (validated) {
if (requestUpdateJobDTO.getTaskType() != null
&& requestUpdateJobDTO.getTaskType() == JobTaskTypeEnum.CLUSTER.getType()){
// 集群模式只允许并发为 1
setParallelNum(1);
}
// 非集群模式 路由策略只能为轮询
if (requestUpdateJobDTO.getTaskType() != null
&& requestUpdateJobDTO.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()){
setRouteKey(AllocationAlgorithmEnum.ROUND);
}
if (requestUpdateJobDTO.getTriggerType() != null
&& requestUpdateJobDTO.getTriggerType() == TriggerTypeEnum.WORK_FLOW.getType()){
// 工作流没有调度时间
setTriggerInterval("*");
}
}
return validated;
}
/**
* 修改Reduce的分片数
* 只允许MAP_REDUCE设置
* @param shardNum
* @return
*/
public RequestUpdateHandler setShardNum(Integer shardNum){
Integer taskType = requestUpdateJobDTO.getTaskType();
if (taskType != null && taskType.equals(JobTaskTypeEnum.MAP_REDUCE.getType())){
// 设置分片
if (shardNum != null){
Map<String, Object> map = new HashMap<>(1);
map.put("shardNum", shardNum);
requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(map));
}
}else {
throw new SnailJobClientException("非MapReduce模式不能设置分片数");
}
return this;
}
/**
* 修改任务名称
* @param jobName
* @return
*/
public RequestUpdateHandler setJobName(String jobName) {
requestUpdateJobDTO.setJobName(jobName);
return this;
}
/**
* 修改时会直接覆盖之前的任务参数
* 修改参数
* @param argsStr
* @return
*/
private RequestUpdateHandler setArgsStr(Map<String, Object> argsStr) {
Map<String, Object> args = new HashMap<>();
if (StrUtil.isNotBlank(requestUpdateJobDTO.getArgsStr())){
args = JsonUtil.parseHashMap(requestUpdateJobDTO.getArgsStr());
}
args.putAll(argsStr);
requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(args));
requestUpdateJobDTO.setArgsType(2);
return this;
}
/**
* 修改时会直接覆盖之前的任务参数
* 添加参数可支持多次添加
* 静态分片不可使用该方法
* @param argsKey 参数名
* @param argsValue 参数值
* @return
*/
public RequestUpdateHandler addArgsStr(String argsKey, Object argsValue) {
if (requestUpdateJobDTO.getTaskType() != null
&& requestUpdateJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){
SnailJobLog.LOCAL.warn("静态分片任务不可使用该方法添加相关任务参数请使用addShardingArgs");
return this;
}
Map<String, Object> map = new HashMap<>();
if (StrUtil.isNotBlank(requestUpdateJobDTO.getArgsStr())){
map = JsonUtil.parseHashMap(requestUpdateJobDTO.getArgsStr());
}
map.put(argsKey, argsValue);
requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(map));
requestUpdateJobDTO.setArgsType(2);
return this;
}
/**
* 添加静态分片相关参数
* @param shardingValue
* @return
*/
public RequestUpdateHandler addShardingArgs(String[] shardingValue){
if (requestUpdateJobDTO.getTaskType() != null
&& !requestUpdateJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){
SnailJobLog.LOCAL.warn("非静态分片任务不可使用该方法添加相关任务参数请使用addArgsStr");
return this;
}
requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(shardingValue));
requestUpdateJobDTO.setArgsType(1);
return this;
}
/**
* 修改路由
* @param algorithmEnum
* @return
*/
public RequestUpdateHandler setRouteKey(AllocationAlgorithmEnum algorithmEnum) {
requestUpdateJobDTO.setRouteKey(algorithmEnum.getType());
return this;
}
/**
* 修改相关执行器
* @param executorInfo
* @return
*/
public RequestUpdateHandler setExecutorInfo(String executorInfo) {
requestUpdateJobDTO.setExecutorInfo(executorInfo);
return this;
}
/**
* 修改调度类型
* @param triggerType
* @return
*/
public RequestUpdateHandler setTriggerType(TriggerTypeEnum triggerType) {
requestUpdateJobDTO.setTriggerType(triggerType.getType());
return this;
}
/**
* 修改调度时间
* 单位
* 工作流无需配置
* @param triggerInterval
* @return
*/
public RequestUpdateHandler setTriggerInterval(String triggerInterval) {
requestUpdateJobDTO.setTriggerInterval(triggerInterval);
return this;
}
/**
* 修改阻塞策略
* @param blockStrategy
* @return
*/
public RequestUpdateHandler setBlockStrategy(BlockStrategyEnum blockStrategy) {
requestUpdateJobDTO.setBlockStrategy(blockStrategy.getBlockStrategy());
return this;
}
/**
* 修改执行器超时时间
* @param executorTimeout
* @return
*/
public RequestUpdateHandler setExecutorTimeout(Integer executorTimeout) {
requestUpdateJobDTO.setExecutorTimeout(executorTimeout);
return this;
}
/**
* 修改任务最大重试次数
* @param maxRetryTimes
* @return
*/
public RequestUpdateHandler setMaxRetryTimes(Integer maxRetryTimes) {
requestUpdateJobDTO.setMaxRetryTimes(maxRetryTimes);
return this;
}
/**
* 修改重试间隔
* @param retryInterval
* @return
*/
public RequestUpdateHandler setRetryInterval(Integer retryInterval) {
requestUpdateJobDTO.setRetryInterval(retryInterval);
return this;
}
/**
* 修改并发数量
* @param parallelNum
* @return
*/
public RequestUpdateHandler setParallelNum(Integer parallelNum) {
requestUpdateJobDTO.setParallelNum(parallelNum);
return this;
}
/**
* 修改定时任务描述
* @param description
* @return
*/
public RequestUpdateHandler setDescription(String description) {
requestUpdateJobDTO.setDescription(description);
return this;
}
}

View File

@ -0,0 +1,55 @@
package com.aizuda.snailjob.client.job.core.handler;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO;
import com.aizuda.snailjob.client.job.core.util.ValidatorUtils;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
public class RequestUpdateStatusHandler extends AbstractRequestHandler<Boolean>{
private RequestUpdateStatusDTO statusDTO;
// 1: job; 2: workflow
private int type;
public RequestUpdateStatusHandler(Long id, int type) {
this.statusDTO = new RequestUpdateStatusDTO();
this.type = type;
setId(id);
}
@Override
protected Boolean doExecute() {
if (type == 1){
return (Boolean) client.updateJobStatus(statusDTO).getData();
}
if (type == 2){
return (Boolean) client.updateWorkFlowStatus(statusDTO).getData();
}
throw new SnailJobClientException("snail job openapi check error");
}
@Override
protected boolean checkRequest() {
return ValidatorUtils.validateEntity(statusDTO);
}
/**
* 设置任务/工作流ID
* @param id
* @return
*/
private RequestUpdateStatusHandler setId(Long id){
this.statusDTO.setId(id);
return this;
}
/**
* 设置状态
* @param status
* @return
*/
public RequestUpdateStatusHandler setStatus(StatusEnum status){
this.statusDTO.setJobStatus(status.getStatus());
return this;
}
}

View File

@ -0,0 +1,31 @@
package com.aizuda.snailjob.client.job.core.openapi;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.job.core.dto.RequestAddJobDTO;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateJobDTO;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO;
import com.aizuda.snailjob.common.core.model.Result;
public interface OpenApiClient {
@Mapping(method = RequestMethod.POST, path = "/api/job/add")
Result<Object> addJob(RequestAddJobDTO requestAddJobDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/update")
Result<Object> updateJob(RequestUpdateJobDTO requestUpdateJobDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/getJobDetail")
Result<Object> getJobDetail(Long jobId);
@Mapping(method = RequestMethod.POST, path = "/api/job/triggerJob")
Result<Object> triggerJob(Long triggerId);
@Mapping(method = RequestMethod.POST, path = "/api/job/triggerWorkFlow")
Result<Object> triggerWorkFlow(Long triggerId);
@Mapping(method = RequestMethod.POST, path = "/api/job/updateJobStatus")
Result<Object> updateJobStatus(RequestUpdateStatusDTO statusDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/updateWorkFlowStatus")
Result<Object> updateWorkFlowStatus(RequestUpdateStatusDTO statusDTO);
}

View File

@ -0,0 +1,120 @@
package com.aizuda.snailjob.client.job.core.openapi;
import com.aizuda.snailjob.client.job.core.handler.*;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
/**
* @author opensnail
* @date 2024-09-21 21:35:34
* @since sj_1.1.0
*/
public final class SnailJobOpenApi {
private SnailJobOpenApi() {
}
/**
* 添加集群定时任务
*
* @return
*/
public static RequestAddHandler addClusterJob() {
return new RequestAddHandler(JobTaskTypeEnum.CLUSTER, null);
}
/**
* 添加广播定时任务
*
* @return
*/
public static RequestAddHandler addBroadcastJob() {
return new RequestAddHandler(JobTaskTypeEnum.BROADCAST, null);
}
/**
* 添加固定分片定时任务
*
* @return
*/
public static RequestAddHandler addShardingJob() {
return new RequestAddHandler(JobTaskTypeEnum.SHARDING, null);
}
/**
* 添加Map定时任务
*
* @return
*/
public static RequestAddHandler addMapJob() {
return new RequestAddHandler(JobTaskTypeEnum.MAP, null);
}
/**
* 添加MapReduce定时任务
*
* @param shardNum Reduce数量
* @return
*/
public static RequestAddHandler addMapReduceJob(Integer shardNum) {
return new RequestAddHandler(JobTaskTypeEnum.MAP_REDUCE, shardNum);
}
/**
* 更新定时任务
*
* @param jobId 定时任务ID
* @return
*/
public static RequestUpdateHandler updateJob(Long jobId) {
return new RequestUpdateHandler(jobId);
}
/**
* 获取定时任务详情
*
* @param jobId 定时任务ID
* @return
*/
public static RequestQueryHandler getJobDetail(Long jobId) {
return new RequestQueryHandler(jobId);
}
/**
* 手动触发定时任务
*
* @param jobId 定时任务ID
* @return
*/
public static RequestTriggerJobHandler triggerJob(Long jobId) {
return new RequestTriggerJobHandler(jobId, 1);
}
/**
* 手动触发工作流任务
*
* @param id 工作流任务ID
* @return
*/
public static RequestTriggerJobHandler triggerWorkFlow(Long id) {
return new RequestTriggerJobHandler(id, 2);
}
/**
* 更新定时任务状态
*
* @param jobId 任务ID
* @return
*/
public static RequestUpdateStatusHandler updateJobStatus(Long jobId) {
return new RequestUpdateStatusHandler(jobId, 1);
}
/**
* 更新工作流任务状态
*
* @param workFlowId 工作流ID
* @return
*/
public static RequestUpdateStatusHandler updateWorkFlowStatus(Long workFlowId) {
return new RequestUpdateStatusHandler(workFlowId, 2);
}
}

View File

@ -0,0 +1,37 @@
package com.aizuda.snailjob.client.job.core.util;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.common.log.SnailJobLog;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import java.util.Set;
public class ValidatorUtils {
private static Validator validator;
static {
validator = Validation.buildDefaultValidatorFactory().getValidator();
}
/**
* 校验对象
* @param object 待校验对象
* @throws SnailJobClientException 校验不通过则报SnailJobClientException异常
*/
public static boolean validateEntity(Object object)
throws SnailJobClientException {
Set<ConstraintViolation<Object>> constraintViolations = validator.validate(object);
if (!constraintViolations.isEmpty()) {
StringBuilder msg = new StringBuilder();
for(ConstraintViolation<Object> constraint: constraintViolations){
msg.append(constraint.getMessage()).append("\n");
}
SnailJobLog.LOCAL.error(msg.toString());
return false;
}else {
return true;
}
}
}

View File

@ -125,6 +125,20 @@ public interface SystemConstants {
*/
String RETRY_GENERATE_IDEM_ID = "/retry/generate/idempotent-id/v1";
String OPENAPI_ADD_JOB = "/api/job/add";
String OPENAPI_UPDATE_JOB = "/api/job/update";
String OPENAPI_GET_JOB_DETAIL = "/api/job/getJobDetail";
String OPENAPI_TRIGGER_JOB = "/api/job/triggerJob";
String OPENAPI_TRIGGER_WORKFLOW = "/api/job/triggerWorkFlow";
String OPENAPI_UPDATE_JOB_STATUS = "/api/job/updateJobStatus";
String OPENAPI_UPDATE_WORKFLOW_STATUS = "/api/job/updateWorkFlowStatus";
}
String LOGO = """

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.enums;
package com.aizuda.snailjob.common.core.enums;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.common.core.exception.SnailJobCommonException;
import lombok.AllArgsConstructor;
import lombok.Getter;
@ -43,7 +43,7 @@ public enum BlockStrategyEnum {
}
}
throw new SnailJobServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
throw new SnailJobCommonException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
}

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.server.common.convert;
import com.aizuda.snailjob.server.common.vo.JobRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @author: opensnail
* @date : 2023-10-12 09:40
* @since : 2.4.0
*/
@Mapper
public interface JobConverter {
JobConverter INSTANCE = Mappers.getMapper(JobConverter.class);
Job convert(JobRequestVO jobRequestVO);
List<JobRequestVO> convertList(List<Job> jobs);
}

View File

@ -0,0 +1,42 @@
package com.aizuda.snailjob.server.common.convert;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.vo.JobResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
/**
* @author opensnail
* @date 2023-10-11 22:50:40
* @since 2.4.0
*/
@Mapper
public interface JobResponseVOConverter {
JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class);
// @Mappings({
// @Mapping(source = "nextTriggerAt", target = "nextTriggerAt", expression = "java(DateUtils.toLocalDateTime())")
// })
List<JobResponseVO> convertList(List<Job> jobs);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))")
})
JobResponseVO convert(Job job);
static LocalDateTime toLocalDateTime(Long nextTriggerAt) {
if (Objects.isNull(nextTriggerAt) || nextTriggerAt == 0) {
return null;
}
return DateUtils.toLocalDateTime(nextTriggerAt);
}
}

View File

@ -0,0 +1,124 @@
package com.aizuda.snailjob.server.common.vo;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Data;
/**
* @author opensnail
* @date 2023-10-11 22:37:55
* @since 2.4.0
*/
@Data
public class JobRequestVO {
private Long id;
/**
* 组名称
*/
@NotBlank(message = "groupName 不能为空")
@Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线")
private String groupName;
/**
* 名称
*/
@NotBlank(message = "jobName 不能为空")
private String jobName;
/**
* 重试状态 0关闭1开启
* {@link StatusEnum}
*/
@NotNull(message = "jobStatus 不能为空")
private Integer jobStatus;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private Integer argsType;
/**
* 执行器路由策略
*/
@NotNull(message = "routeKey 不能为空")
private Integer routeKey;
/**
* 执行器类型
* {@link ExecutorTypeEnum}
*/
@NotNull(message = "executorType 不能为空")
private Integer executorType;
/**
* 执行器名称
*/
@NotBlank(message = "executorInfo 不能为空")
private String executorInfo;
/**
* 触发类型 2. 固定时间 3.CRON 表达式 99.工作流
*/
@NotNull(message = "triggerType 不能为空")
private Integer triggerType;
/**
* 间隔时长
*/
@NotNull(message = "triggerInterval 不能为空")
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
@NotNull(message = "blockStrategy 不能为空")
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
@NotNull(message = "executorTimeout 不能为空")
private Integer executorTimeout;
/**
* 最大重试次数
*/
@NotNull(message = "maxRetryTimes 不能为空")
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
@NotNull(message = "retryInterval 不能为空")
private Integer retryInterval;
/**
* 任务类型
* {@link JobTaskTypeEnum}
*/
@NotNull(message = "taskType 不能为空")
private Integer taskType;
/**
* 并行数
*/
@NotNull(message = "parallelNum 不能为空")
private Integer parallelNum;
/**
* 描述
*/
private String description;
}

View File

@ -0,0 +1,132 @@
package com.aizuda.snailjob.server.common.vo;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author opensnail
* @date 2023-10-11 22:30:00
* @since 2.4.0
*/
@Data
public class JobResponseVO {
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 重试状态 0关闭1开启
*/
private Integer jobStatus;
/**
* 执行器路由策略
*/
private Integer routeKey;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorInfo;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
/**
* 任务类型
*/
private Integer taskType;
/**
* 并行数
*/
private Integer parallelNum;
/**
* bucket
*/
private Integer bucketIndex;
/**
* 描述
*/
private String description;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 修改时间
*/
private LocalDateTime updateDt;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
}

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.server.common.vo;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* @author opensnail
* @date 2023-10-15 16:06:20
* @since 2.4.0
*/
@Data
public class JobStatusUpdateRequestVO {
@NotNull(message = "id 不能为空")
private Long id;
@NotNull(message = "jobStatus 不能为空")
private Integer jobStatus;
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import org.springframework.beans.factory.InitializingBean;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import java.util.concurrent.ConcurrentHashMap;

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator;

View File

@ -5,12 +5,11 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
@ -20,12 +19,10 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Stream;
/**
* 重新触发执行失败的任务

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext;
import org.springframework.beans.factory.InitializingBean;

View File

@ -1,7 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
@ -9,8 +8,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: shuguang.zhang
* @date : 2023-12-26

View File

@ -2,8 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
@ -11,8 +10,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: xiaowoniu
* @date : 2023-12-26

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import java.util.concurrent.ConcurrentHashMap;

View File

@ -0,0 +1,105 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.convert.JobConverter;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.CronUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.server.common.vo.JobRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* OPENAPI
* 新增定时任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiAddJobRequestHandler extends PostHttpRequestHandler {
private final SystemProperties systemProperties;
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_ADD_JOB.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Add job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
JobRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobRequestVO.class);
if(StrUtil.isBlank(jobRequestVO.getGroupName())){
jobRequestVO.setGroupName(HttpHeaderUtil.getGroupName(headers));
}
// 判断常驻任务
Job job = JobConverter.INSTANCE.convert(jobRequestVO);
job.setResident(isResident(jobRequestVO));
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName())
% systemProperties.getBucketTotal());
job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
job.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
job.setId(null);
Assert.isTrue(1 == jobMapper.insert(job), ()-> new SnailJobServerException("新增任务失败"));
return JsonUtil.toJsonString(new NettyResult(job.getId(), retryRequest.getReqId()));
}
private Integer isResident(JobRequestVO jobRequestVO) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return StatusEnum.NO.getStatus();
}
if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) {
if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) {
return StatusEnum.YES.getStatus();
}
} else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) {
return StatusEnum.YES.getStatus();
}
} else {
throw new SnailJobServerException("未知触发类型");
}
return StatusEnum.NO.getStatus();
}
private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return 0L;
}
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(time);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
}

View File

@ -0,0 +1,53 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.convert.JobResponseVOConverter;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.vo.JobResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* OPENAPI
* 获取定时任务详情
*/
@Component
@RequiredArgsConstructor
public class OpenApiGetJobDetailRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_GET_JOB_DETAIL.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Update job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long jobId = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Assert.notNull(jobId, () -> new SnailJobServerException("id 不能为空"));
Job job = jobMapper.selectById(jobId);
JobResponseVO convert = JobResponseVOConverter.INSTANCE.convert(job);
return JsonUtil.toJsonString(new NettyResult(convert, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,80 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobPrepareHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* OPENAPI
* 调度定时任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiTriggerJobRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
private final AccessTemplate accessTemplate;
private final JobPrepareHandler terminalJobPrepareHandler;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_TRIGGER_JOB.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long jobId = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Job job = jobMapper.selectById(jobId);
if (Objects.isNull(job)){
SnailJobLog.LOCAL.warn("job can not be null.");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
if (count <= 0){
SnailJobLog.LOCAL.warn("组:[{}]已经关闭,不支持手动执行.", job.getGroupName());
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
// 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
// 创建批次
terminalJobPrepareHandler.handle(jobTaskPrepare);
return JsonUtil.toJsonString(new NettyResult(true, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,97 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.net.url.UrlQuery;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
/**
* OPENAPI
* 新增工作流任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiTriggerWorkFlowRequestHandler extends PostHttpRequestHandler {
private final WorkflowMapper workflowMapper;
private final AccessTemplate accessTemplate;
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_TRIGGER_WORKFLOW.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long id = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Workflow workflow = workflowMapper.selectById(id);
if (Objects.isNull(workflow)){
SnailJobLog.LOCAL.warn("workflow can not be null.");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
// 将字符串反序列化为 Set
if (StrUtil.isNotBlank(workflow.getGroupName())) {
Set<String> namesSet = new HashSet<>(Arrays.asList(workflow.getGroupName().split(", ")));
// 判断任务节点相关组有无关闭存在关闭组则停止执行工作流执行
if (CollectionUtil.isNotEmpty(namesSet)) {
for (String groupName : namesSet) {
long count = accessTemplate.getGroupConfigAccess().count(
new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, groupName)
.eq(GroupConfig::getNamespaceId, workflow.getNamespaceId())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
if (count <= 0){
SnailJobLog.LOCAL.warn("组:[{}]已经关闭,不支持手动执行.", workflow.getGroupName());
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
}
}
}
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
// 设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
terminalWorkflowPrepareHandler.handler(prepareDTO);
return JsonUtil.toJsonString(new NettyResult(true, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,131 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.convert.JobConverter;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.CronUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.server.common.vo.JobRequestVO;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
/**
* OPENAPI
* 更新定时任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiUpdateJobRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_UPDATE_JOB.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Update job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
String namespace = HttpHeaderUtil.getNamespace(headers);
JobRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobRequestVO.class);
if (Objects.isNull(jobRequestVO.getId())){
SnailJobLog.LOCAL.warn("id不能为空更新失败");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
Job job = jobMapper.selectById(jobRequestVO.getId());
if (Objects.isNull(job)){
SnailJobLog.LOCAL.warn("job为空更新失败");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
// 判断常驻任务
Job updateJob = JobConverter.INSTANCE.convert(jobRequestVO);
updateJob.setResident(isResident(jobRequestVO));
updateJob.setNamespaceId(namespace);
// 工作流任务
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
job.setNextTriggerAt(0L);
// 非常驻任务 > 非常驻任务
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
updateJob.getResident(),
StatusEnum.NO.getStatus())) {
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.NO.getStatus())) {
// 常驻任务的触发时间
long time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId()))
.orElse(DateUtils.toNowMilli());
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time));
// 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.YES.getStatus())) {
updateJob.setNextTriggerAt(DateUtils.toNowMilli());
}
// 禁止更新组
updateJob.setGroupName(null);
boolean insert = 1 == jobMapper.updateById(updateJob);
return JsonUtil.toJsonString(new NettyResult(insert, retryRequest.getReqId()));
}
private Integer isResident(JobRequestVO jobRequestVO) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return StatusEnum.NO.getStatus();
}
if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) {
if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) {
return StatusEnum.YES.getStatus();
}
} else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) {
return StatusEnum.YES.getStatus();
}
} else {
throw new SnailJobServerException("未知触发类型");
}
return StatusEnum.NO.getStatus();
}
private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return 0L;
}
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(time);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
}

View File

@ -0,0 +1,55 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.vo.JobStatusUpdateRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* OPENAPI
* 更新定时任务状态
*/
@Component
@RequiredArgsConstructor
public class OpenApiUpdateJobStatusRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_UPDATE_JOB_STATUS.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
JobStatusUpdateRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobStatusUpdateRequestVO.class);
Long count = jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getId, jobRequestVO.getId()));
if (1 != count){
SnailJobLog.LOCAL.warn("更新任务失败");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
Job job = new Job();
job.setId(jobRequestVO.getId());
job.setJobStatus(jobRequestVO.getJobStatus());
boolean update = 1 == jobMapper.updateById(job);
return JsonUtil.toJsonString(new NettyResult(update, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,60 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.vo.JobStatusUpdateRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* OPENAPI
* 更新工作流状态
*/
@Component
@RequiredArgsConstructor
public class OpenApiUpdateWorkFlowStatusRequestHandler extends PostHttpRequestHandler {
private final WorkflowMapper workflowMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_UPDATE_WORKFLOW_STATUS.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
JobStatusUpdateRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobStatusUpdateRequestVO.class);
Workflow workflow = workflowMapper.selectOne(
new LambdaQueryWrapper<Workflow>()
.select(Workflow::getId)
.eq(Workflow::getId, jobRequestVO.getId()));
if (Objects.isNull(workflow)){
SnailJobLog.LOCAL.warn("工作流不存在");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
workflow.setWorkflowStatus(jobRequestVO.getJobStatus());
boolean update = 1 == workflowMapper.updateById(workflow);
return JsonUtil.toJsonString(new NettyResult(update, retryRequest.getReqId()));
}
}

View File

@ -7,7 +7,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;

View File

@ -6,7 +6,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;