diff --git a/snail-job-client/snail-job-client-common/pom.xml b/snail-job-client/snail-job-client-common/pom.xml index 76d4a030..92ac9300 100644 --- a/snail-job-client/snail-job-client-common/pom.xml +++ b/snail-job-client/snail-job-client-common/pom.xml @@ -18,6 +18,7 @@ 17 17 UTF-8 + 8.0.1.Final @@ -85,6 +86,12 @@ log4j true + + + org.hibernate + hibernate-validator + ${hibernate.verion} + diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobResponseVO.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobResponseVO.java new file mode 100644 index 00000000..60fb45d5 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobResponseVO.java @@ -0,0 +1,132 @@ +package com.aizuda.snailjob.client.job.core.dto; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author opensnail + * @date 2023-10-11 22:30:00 + * @since 2.4.0 + */ +@Data +public class JobResponseVO { + + private Long id; + + /** + * 组名称 + */ + private String groupName; + + /** + * 名称 + */ + private String jobName; + + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private String argsType; + + /** + * 扩展字段 + */ + private String extAttrs; + + /** + * 下次触发时间 + */ + private LocalDateTime nextTriggerAt; + + /** + * 重试状态 0、关闭、1、开启 + */ + private Integer jobStatus; + + /** + * 执行器路由策略 + */ + private Integer routeKey; + + /** + * 执行器类型 1、Java + */ + private Integer executorType; + + /** + * 执行器名称 + */ + private String executorInfo; + + /** + * 触发类型 1.CRON 表达式 2. 固定时间 + */ + private Integer triggerType; + + /** + * 间隔时长 + */ + private String triggerInterval; + + /** + * 阻塞策略 1、丢弃 2、覆盖 3、并行 + */ + private Integer blockStrategy; + + /** + * 任务执行超时时间,单位秒 + */ + private Integer executorTimeout; + + /** + * 最大重试次数 + */ + private Integer maxRetryTimes; + + /** + * 重试间隔(s) + */ + private Integer retryInterval; + + /** + * 任务类型 + */ + private Integer taskType; + + /** + * 并行数 + */ + private Integer parallelNum; + + /** + * bucket + */ + private Integer bucketIndex; + + /** + * 描述 + */ + private String description; + + /** + * 创建时间 + */ + private LocalDateTime createDt; + + /** + * 修改时间 + */ + private LocalDateTime updateDt; + + /** + * 逻辑删除 1、删除 + */ + private Integer deleted; + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestAddJobDTO.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestAddJobDTO.java new file mode 100644 index 00000000..9406af30 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestAddJobDTO.java @@ -0,0 +1,107 @@ +package com.aizuda.snailjob.client.job.core.dto; + +import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +@Data +public class RequestAddJobDTO{ + /** + * 名称 + */ + @NotBlank(message = "jobName 不能为空") + private String jobName; + + /** + * 重试状态 0、关闭、1、开启 + * {@link StatusEnum} + */ + @NotNull(message = "jobStatus 不能为空") + private Integer jobStatus; + + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private Integer argsType; + + /** + * 执行器路由策略 + */ + @NotNull(message = "routeKey 不能为空") + private Integer routeKey; + + /** + * 执行器类型 + * {@link ExecutorTypeEnum} + */ + @NotNull(message = "executorType 不能为空") + private Integer executorType; + + /** + * 执行器名称 + */ + @NotBlank(message = "executorInfo 不能为空") + private String executorInfo; + + /** + * 触发类型 2. 固定时间 3.CRON 表达式 99.工作流 + */ + @NotNull(message = "triggerType 不能为空") + private Integer triggerType; + + /** + * 间隔时长 + */ + @NotNull(message = "triggerInterval 不能为空") + private String triggerInterval; + + /** + * 阻塞策略 1、丢弃 2、覆盖 3、并行 + */ + @NotNull(message = "blockStrategy 不能为空") + private Integer blockStrategy; + + /** + * 任务执行超时时间,单位秒 + */ + @NotNull(message = "executorTimeout 不能为空") + private Integer executorTimeout; + + /** + * 最大重试次数 + */ + @NotNull(message = "maxRetryTimes 不能为空") + private Integer maxRetryTimes; + + /** + * 重试间隔(s) + */ + @NotNull(message = "retryInterval 不能为空") + private Integer retryInterval; + + /** + * 任务类型 + * {@link JobTaskTypeEnum} + */ + @NotNull(message = "taskType 不能为空") + private Integer taskType; + + /** + * 并行数 + */ + @NotNull(message = "parallelNum 不能为空") + private Integer parallelNum; + + /** + * 描述 + */ + private String description; +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestUpdateJobDTO.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestUpdateJobDTO.java new file mode 100644 index 00000000..c6babc06 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestUpdateJobDTO.java @@ -0,0 +1,102 @@ +package com.aizuda.snailjob.client.job.core.dto; + +import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * @author opensnail + * @date 2023-10-11 22:37:55 + * @since 2.4.0 + */ +@Data +public class RequestUpdateJobDTO { + @NotNull(message = "id 不能为空") + private Long id; + + /** + * 名称 + */ + private String jobName; + + /** + * 重试状态 0、关闭、1、开启 + * {@link StatusEnum} + */ + private Integer jobStatus; + + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private Integer argsType; + + /** + * 执行器路由策略 + */ + private Integer routeKey; + + /** + * 执行器类型 + * {@link ExecutorTypeEnum} + */ + private Integer executorType; + + /** + * 执行器名称 + */ + private String executorInfo; + + /** + * 触发类型 2. 固定时间 3.CRON 表达式 99.工作流 + */ + private Integer triggerType; + + /** + * 间隔时长 + */ + private String triggerInterval; + + /** + * 阻塞策略 1、丢弃 2、覆盖 3、并行 + */ + private Integer blockStrategy; + + /** + * 任务执行超时时间,单位秒 + */ + private Integer executorTimeout; + + /** + * 最大重试次数 + */ + private Integer maxRetryTimes; + + /** + * 重试间隔(s) + */ + private Integer retryInterval; + + /** + * 任务类型 + * {@link JobTaskTypeEnum} + */ + private Integer taskType; + + /** + * 并行数 + */ + private Integer parallelNum; + + /** + * 描述 + */ + private String description; + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestUpdateStatusDTO.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestUpdateStatusDTO.java new file mode 100644 index 00000000..eeda6f46 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/RequestUpdateStatusDTO.java @@ -0,0 +1,20 @@ +package com.aizuda.snailjob.client.job.core.dto; + +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * @author opensnail + * @date 2023-10-15 16:06:20 + * @since 2.4.0 + */ +@Data +public class RequestUpdateStatusDTO { + + @NotNull(message = "id 不能为空") + private Long id; + + @NotNull(message = "jobStatus 不能为空") + private Integer jobStatus; + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/enums/AllocationAlgorithmEnum.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/enums/AllocationAlgorithmEnum.java new file mode 100644 index 00000000..5806b663 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/enums/AllocationAlgorithmEnum.java @@ -0,0 +1,24 @@ +package com.aizuda.snailjob.client.job.core.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public enum AllocationAlgorithmEnum { + // Hash + CONSISTENT_HASH(1), + // 随机 + RANDOM(2), + // LRU + LRU(3), + // 轮询 + ROUND(4), + // 匹配第一个 + FIRST(5), + // 匹配最后一个 + LAST(6); + + private final int type; + +} \ No newline at end of file diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/enums/TriggerTypeEnum.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/enums/TriggerTypeEnum.java new file mode 100644 index 00000000..1e4b7438 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/enums/TriggerTypeEnum.java @@ -0,0 +1,14 @@ +package com.aizuda.snailjob.client.job.core.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public enum TriggerTypeEnum { + SCHEDULED_TIME(2), + CRON(3), + WORK_FLOW(99); + + private final int type; +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/AbstractRequestHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/AbstractRequestHandler.java new file mode 100644 index 00000000..9c87e636 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/AbstractRequestHandler.java @@ -0,0 +1,28 @@ +package com.aizuda.snailjob.client.job.core.handler; + +import com.aizuda.snailjob.client.common.exception.SnailJobClientException; + +/** + * @author opensnail + * @date 2024-09-29 20:40:10 + * @since sj_1.1.0 + */ +public abstract class AbstractRequestHandler implements RequestHandler { + + /** + * 具体调用 + * @return + */ + @Override + public R execute() { + if (checkRequest()) { + return doExecute(); + } else { + throw new SnailJobClientException("snail job openapi check error"); + } + } + + protected abstract R doExecute(); + + protected abstract boolean checkRequest(); +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestAddHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestAddHandler.java new file mode 100644 index 00000000..e480d214 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestAddHandler.java @@ -0,0 +1,242 @@ +package com.aizuda.snailjob.client.job.core.handler; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.client.common.exception.SnailJobClientException; +import com.aizuda.snailjob.client.job.core.dto.RequestAddJobDTO; +import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum; +import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum; +import com.aizuda.snailjob.client.job.core.util.ValidatorUtils; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; + +import java.util.HashMap; +import java.util.Map; + +public class RequestAddHandler extends AbstractRequestHandler { + private RequestAddJobDTO requestAddJobDTO; + + public RequestAddHandler(JobTaskTypeEnum taskType, Integer shardNum) { + this.requestAddJobDTO = new RequestAddJobDTO(); + // 默认创建就开启 + requestAddJobDTO.setJobStatus(StatusEnum.YES.getStatus()); + // 设置任务类型 + requestAddJobDTO.setTaskType(taskType.getType()); + // 默认java + requestAddJobDTO.setExecutorType(ExecutorTypeEnum.JAVA.getType()); + // 设置分片 + if (shardNum != null){ + Map map = new HashMap<>(1); + map.put("shardNum", shardNum); + requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(map)); + } + } + + + @Override + protected Long doExecute() { + String data = JsonUtil.toJsonString(client.addJob(requestAddJobDTO).getData()); + return Long.valueOf(data); + } + + @Override + protected boolean checkRequest() { + boolean validated = ValidatorUtils.validateEntity(requestAddJobDTO); + // 如果校验正确,则正对进行相关筛选 + if (validated) { + if (requestAddJobDTO.getTaskType() == JobTaskTypeEnum.CLUSTER.getType()){ + // 集群模式只允许并发为 1 + setParallelNum(1); + } + if (requestAddJobDTO.getTriggerType() == TriggerTypeEnum.WORK_FLOW.getType()){ + // 工作流没有调度时间 + setTriggerInterval("*"); + } + } + return validated; + } + + /** + * 设置任务名 + * @param jobName 任务名 + * @return + */ + public RequestAddHandler setJobName(String jobName) { + requestAddJobDTO.setJobName(jobName); + return this; + } + + /** + * 设置参数 + * @param argsStr + * @return + */ + private RequestAddHandler setArgsStr(Map argsStr) { + Map args = new HashMap<>(); + if (StrUtil.isNotBlank(requestAddJobDTO.getArgsStr())){ + args = JsonUtil.parseHashMap(requestAddJobDTO.getArgsStr()); + } + args.putAll(argsStr); + requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(args)); + requestAddJobDTO.setArgsType(2); + return this; + } + + /** + * 添加参数,可支持多次添加 + * 静态分片不可使用该方法 + * @param argsKey 参数名 + * @param argsValue 参数值 + * @return + */ + public RequestAddHandler addArgsStr(String argsKey, Object argsValue) { + if (requestAddJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){ + SnailJobLog.LOCAL.warn("静态分片任务,不可使用该方法添加相关任务参数,请使用addShardingArgs"); + return this; + } + Map map = new HashMap<>(); + if (StrUtil.isNotBlank(requestAddJobDTO.getArgsStr())){ + map = JsonUtil.parseHashMap(requestAddJobDTO.getArgsStr()); + } + map.put(argsKey, argsValue); + requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(map)); + requestAddJobDTO.setArgsType(2); + return this; + } + + /** + * 添加静态分片相关参数 + * @param shardingValue + * @return + */ + public RequestAddHandler addShardingArgs(String[] shardingValue){ + if (!requestAddJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){ + SnailJobLog.LOCAL.warn("非静态分片任务,不可使用该方法添加相关任务参数,请使用addArgsStr"); + return this; + } + requestAddJobDTO.setArgsStr(JsonUtil.toJsonString(shardingValue)); + requestAddJobDTO.setArgsType(1); + return this; + } + + /** + * 设置路由 + * @param algorithmEnum 路由算法 + * @return + */ + public RequestAddHandler setRouteKey(AllocationAlgorithmEnum algorithmEnum) { + // 非集群模式 路由策略只能为轮询 + if (requestAddJobDTO.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()){ + setRouteKey(AllocationAlgorithmEnum.ROUND); + SnailJobLog.LOCAL.warn("非集群模式 路由策略只能为轮询"); + return this; + } + requestAddJobDTO.setRouteKey(algorithmEnum.getType()); + return this; + } + + /** + * 设置执行器信息 + * @param executorInfo + * @return + */ + public RequestAddHandler setExecutorInfo(String executorInfo) { + requestAddJobDTO.setExecutorInfo(executorInfo); + return this; + } + + /** + * 设置调度类型 + * @param triggerType + * @return + */ + public RequestAddHandler setTriggerType(TriggerTypeEnum triggerType) { + requestAddJobDTO.setTriggerType(triggerType.getType()); + if (requestAddJobDTO.getTriggerType() == TriggerTypeEnum.WORK_FLOW.getType()){ + // 工作流没有调度时间 + setTriggerInterval("*"); + } + return this; + } + + /** + * 设置触发间隔; + * 单位:秒 + * 工作流无需配置 + * @param triggerInterval + * @return + */ + public RequestAddHandler setTriggerInterval(String triggerInterval) { + requestAddJobDTO.setTriggerInterval(triggerInterval); + return this; + } + + /** + * 设置阻塞策略 + * @param blockStrategy + * @return + */ + public RequestAddHandler setBlockStrategy(BlockStrategyEnum blockStrategy) { + // 非集群模式 路由策略只能为轮询 + if (requestAddJobDTO.getTaskType() == JobTaskTypeEnum.CLUSTER.getType() + && blockStrategy.getBlockStrategy() == BlockStrategyEnum.CONCURRENCY.getBlockStrategy()){ + throw new SnailJobClientException("集群模式不能使用并行阻塞策略"); + } + requestAddJobDTO.setBlockStrategy(blockStrategy.getBlockStrategy()); + return this; + } + + /** + * 设置执行器超时时间 + * @param executorTimeout + * @return + */ + public RequestAddHandler setExecutorTimeout(Integer executorTimeout) { + requestAddJobDTO.setExecutorTimeout(executorTimeout); + return this; + } + + /** + * 设置任务最大重试次数 + * @param maxRetryTimes + * @return + */ + public RequestAddHandler setMaxRetryTimes(Integer maxRetryTimes) { + requestAddJobDTO.setMaxRetryTimes(maxRetryTimes); + return this; + } + + /** + * 设置重试间隔 + * @param retryInterval + * @return + */ + public RequestAddHandler setRetryInterval(Integer retryInterval) { + requestAddJobDTO.setRetryInterval(retryInterval); + return this; + } + + /** + * 设置并发数量 + * @param parallelNum + * @return + */ + public RequestAddHandler setParallelNum(Integer parallelNum) { + requestAddJobDTO.setParallelNum(parallelNum); + return this; + } + + /** + * 设置定时任务描述 + * @param description + * @return + */ + public RequestAddHandler setDescription(String description) { + requestAddJobDTO.setDescription(description); + return this; + } + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestHandler.java new file mode 100644 index 00000000..62df2d24 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestHandler.java @@ -0,0 +1,16 @@ +package com.aizuda.snailjob.client.job.core.handler; + +import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.client.job.core.openapi.OpenApiClient; +import com.aizuda.snailjob.common.core.model.NettyResult; + +public interface RequestHandler { + + OpenApiClient client = RequestBuilder.newBuilder() + .client(OpenApiClient.class) + .async(false) + .build(); + + R execute(); + +} \ No newline at end of file diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestQueryHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestQueryHandler.java new file mode 100644 index 00000000..84a7fb17 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestQueryHandler.java @@ -0,0 +1,30 @@ +package com.aizuda.snailjob.client.job.core.handler; + + +import cn.hutool.core.lang.Assert; +import com.aizuda.snailjob.client.common.exception.SnailJobClientException; +import com.aizuda.snailjob.client.job.core.dto.JobResponseVO; +import com.aizuda.snailjob.common.core.util.JsonUtil; + +import java.util.Objects; + +public class RequestQueryHandler extends AbstractRequestHandler { + private Long queryJobId; + + public RequestQueryHandler(Long queryJobId) { + this.queryJobId = queryJobId; + } + + @Override + protected JobResponseVO doExecute() { + Object data = client.getJobDetail(queryJobId).getData(); + Assert.isTrue(Objects.nonNull(data),()-> new SnailJobClientException("获取[{}]任务详情失败", queryJobId)); + return JsonUtil.parseObject(JsonUtil.toJsonString(data), JobResponseVO.class); + } + + @Override + protected boolean checkRequest() { + return queryJobId != null && ! Long.valueOf(0).equals(queryJobId); + } + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestTriggerJobHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestTriggerJobHandler.java new file mode 100644 index 00000000..924e5ca4 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestTriggerJobHandler.java @@ -0,0 +1,30 @@ +package com.aizuda.snailjob.client.job.core.handler; + +import com.aizuda.snailjob.client.common.exception.SnailJobClientException; + +public class RequestTriggerJobHandler extends AbstractRequestHandler{ + private Long triggerJobId; + // 1: job; 2: workflow + private int triggerType; + + public RequestTriggerJobHandler(Long tiggerJobId, int triggerType) { + this.triggerJobId = tiggerJobId; + this.triggerType = triggerType; + } + + @Override + protected Boolean doExecute() { + if (triggerType == 1) { + return (Boolean) client.triggerJob(triggerJobId).getData(); + } + if (triggerType == 2) { + return (Boolean) client.triggerWorkFlow(triggerJobId).getData(); + } + throw new SnailJobClientException("snail job openapi check error"); + } + + @Override + protected boolean checkRequest() { + return triggerJobId != null && !Long.valueOf(0).equals(triggerJobId); + } +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestUpdateHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestUpdateHandler.java new file mode 100644 index 00000000..5981e0fa --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestUpdateHandler.java @@ -0,0 +1,249 @@ +package com.aizuda.snailjob.client.job.core.handler; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.client.common.exception.SnailJobClientException; +import com.aizuda.snailjob.client.job.core.dto.RequestUpdateJobDTO; +import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum; +import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum; +import com.aizuda.snailjob.client.job.core.util.ValidatorUtils; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; + +import java.util.HashMap; +import java.util.Map; + +public class RequestUpdateHandler extends AbstractRequestHandler { + private RequestUpdateJobDTO requestUpdateJobDTO; + + public RequestUpdateHandler(Long jobId) { + this.requestUpdateJobDTO = new RequestUpdateJobDTO(); + // 更新必须要id + requestUpdateJobDTO.setId(jobId); + // 默认java + requestUpdateJobDTO.setExecutorType(ExecutorTypeEnum.JAVA.getType()); + } + + @Override + protected Boolean doExecute() { + return (Boolean) client.updateJob(requestUpdateJobDTO).getData(); + } + + @Override + protected boolean checkRequest() { + boolean validated = ValidatorUtils.validateEntity(requestUpdateJobDTO); + // 如果校验正确,则正对进行相关筛选 + if (validated) { + if (requestUpdateJobDTO.getTaskType() != null + && requestUpdateJobDTO.getTaskType() == JobTaskTypeEnum.CLUSTER.getType()){ + // 集群模式只允许并发为 1 + setParallelNum(1); + } + // 非集群模式 路由策略只能为轮询 + if (requestUpdateJobDTO.getTaskType() != null + && requestUpdateJobDTO.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()){ + setRouteKey(AllocationAlgorithmEnum.ROUND); + } + if (requestUpdateJobDTO.getTriggerType() != null + && requestUpdateJobDTO.getTriggerType() == TriggerTypeEnum.WORK_FLOW.getType()){ + // 工作流没有调度时间 + setTriggerInterval("*"); + } + } + return validated; + } + + /** + * 修改Reduce的分片数 + * 只允许MAP_REDUCE设置 + * @param shardNum + * @return + */ + public RequestUpdateHandler setShardNum(Integer shardNum){ + Integer taskType = requestUpdateJobDTO.getTaskType(); + if (taskType != null && taskType.equals(JobTaskTypeEnum.MAP_REDUCE.getType())){ + // 设置分片 + if (shardNum != null){ + Map map = new HashMap<>(1); + map.put("shardNum", shardNum); + requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(map)); + } + }else { + throw new SnailJobClientException("非MapReduce模式不能设置分片数"); + } + return this; + } + + /** + * 修改任务名称 + * @param jobName + * @return + */ + public RequestUpdateHandler setJobName(String jobName) { + requestUpdateJobDTO.setJobName(jobName); + return this; + } + + /** + * 修改时会直接覆盖之前的任务参数 + * 修改参数 + * @param argsStr + * @return + */ + private RequestUpdateHandler setArgsStr(Map argsStr) { + Map args = new HashMap<>(); + if (StrUtil.isNotBlank(requestUpdateJobDTO.getArgsStr())){ + args = JsonUtil.parseHashMap(requestUpdateJobDTO.getArgsStr()); + } + args.putAll(argsStr); + requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(args)); + requestUpdateJobDTO.setArgsType(2); + return this; + } + + /** + * 修改时会直接覆盖之前的任务参数 + * 添加参数,可支持多次添加 + * 静态分片不可使用该方法 + * @param argsKey 参数名 + * @param argsValue 参数值 + * @return + */ + public RequestUpdateHandler addArgsStr(String argsKey, Object argsValue) { + if (requestUpdateJobDTO.getTaskType() != null + && requestUpdateJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){ + SnailJobLog.LOCAL.warn("静态分片任务,不可使用该方法添加相关任务参数,请使用addShardingArgs"); + return this; + } + Map map = new HashMap<>(); + if (StrUtil.isNotBlank(requestUpdateJobDTO.getArgsStr())){ + map = JsonUtil.parseHashMap(requestUpdateJobDTO.getArgsStr()); + } + map.put(argsKey, argsValue); + requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(map)); + requestUpdateJobDTO.setArgsType(2); + return this; + } + + /** + * 添加静态分片相关参数 + * @param shardingValue + * @return + */ + public RequestUpdateHandler addShardingArgs(String[] shardingValue){ + if (requestUpdateJobDTO.getTaskType() != null + && !requestUpdateJobDTO.getTaskType().equals(JobTaskTypeEnum.SHARDING.getType())){ + SnailJobLog.LOCAL.warn("非静态分片任务,不可使用该方法添加相关任务参数,请使用addArgsStr"); + return this; + } + requestUpdateJobDTO.setArgsStr(JsonUtil.toJsonString(shardingValue)); + requestUpdateJobDTO.setArgsType(1); + return this; + } + + /** + * 修改路由 + * @param algorithmEnum + * @return + */ + public RequestUpdateHandler setRouteKey(AllocationAlgorithmEnum algorithmEnum) { + requestUpdateJobDTO.setRouteKey(algorithmEnum.getType()); + return this; + } + + /** + * 修改相关执行器 + * @param executorInfo + * @return + */ + public RequestUpdateHandler setExecutorInfo(String executorInfo) { + requestUpdateJobDTO.setExecutorInfo(executorInfo); + return this; + } + + /** + * 修改调度类型 + * @param triggerType + * @return + */ + public RequestUpdateHandler setTriggerType(TriggerTypeEnum triggerType) { + requestUpdateJobDTO.setTriggerType(triggerType.getType()); + return this; + } + + /** + * 修改调度时间 + * 单位:秒 + * 工作流无需配置 + * @param triggerInterval + * @return + */ + public RequestUpdateHandler setTriggerInterval(String triggerInterval) { + requestUpdateJobDTO.setTriggerInterval(triggerInterval); + return this; + } + + /** + * 修改阻塞策略 + * @param blockStrategy + * @return + */ + public RequestUpdateHandler setBlockStrategy(BlockStrategyEnum blockStrategy) { + requestUpdateJobDTO.setBlockStrategy(blockStrategy.getBlockStrategy()); + return this; + } + + /** + * 修改执行器超时时间 + * @param executorTimeout + * @return + */ + public RequestUpdateHandler setExecutorTimeout(Integer executorTimeout) { + requestUpdateJobDTO.setExecutorTimeout(executorTimeout); + return this; + } + + /** + * 修改任务最大重试次数 + * @param maxRetryTimes + * @return + */ + public RequestUpdateHandler setMaxRetryTimes(Integer maxRetryTimes) { + requestUpdateJobDTO.setMaxRetryTimes(maxRetryTimes); + return this; + } + + /** + * 修改重试间隔 + * @param retryInterval + * @return + */ + public RequestUpdateHandler setRetryInterval(Integer retryInterval) { + requestUpdateJobDTO.setRetryInterval(retryInterval); + return this; + } + + /** + * 修改并发数量 + * @param parallelNum + * @return + */ + public RequestUpdateHandler setParallelNum(Integer parallelNum) { + requestUpdateJobDTO.setParallelNum(parallelNum); + return this; + } + + /** + * 修改定时任务描述 + * @param description + * @return + */ + public RequestUpdateHandler setDescription(String description) { + requestUpdateJobDTO.setDescription(description); + return this; + } + + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestUpdateStatusHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestUpdateStatusHandler.java new file mode 100644 index 00000000..adbf3a41 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/handler/RequestUpdateStatusHandler.java @@ -0,0 +1,55 @@ +package com.aizuda.snailjob.client.job.core.handler; + +import com.aizuda.snailjob.client.common.exception.SnailJobClientException; +import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO; +import com.aizuda.snailjob.client.job.core.util.ValidatorUtils; +import com.aizuda.snailjob.common.core.enums.StatusEnum; + + +public class RequestUpdateStatusHandler extends AbstractRequestHandler{ + private RequestUpdateStatusDTO statusDTO; + // 1: job; 2: workflow + private int type; + + public RequestUpdateStatusHandler(Long id, int type) { + this.statusDTO = new RequestUpdateStatusDTO(); + this.type = type; + setId(id); + } + + @Override + protected Boolean doExecute() { + if (type == 1){ + return (Boolean) client.updateJobStatus(statusDTO).getData(); + } + if (type == 2){ + return (Boolean) client.updateWorkFlowStatus(statusDTO).getData(); + } + throw new SnailJobClientException("snail job openapi check error"); + } + + @Override + protected boolean checkRequest() { + return ValidatorUtils.validateEntity(statusDTO); + } + + /** + * 设置任务/工作流ID + * @param id + * @return + */ + private RequestUpdateStatusHandler setId(Long id){ + this.statusDTO.setId(id); + return this; + } + + /** + * 设置状态 + * @param status + * @return + */ + public RequestUpdateStatusHandler setStatus(StatusEnum status){ + this.statusDTO.setJobStatus(status.getStatus()); + return this; + } +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/openapi/OpenApiClient.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/openapi/OpenApiClient.java new file mode 100644 index 00000000..b386cdcd --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/openapi/OpenApiClient.java @@ -0,0 +1,31 @@ +package com.aizuda.snailjob.client.job.core.openapi; + +import com.aizuda.snailjob.client.common.annotation.Mapping; +import com.aizuda.snailjob.client.common.rpc.client.RequestMethod; +import com.aizuda.snailjob.client.job.core.dto.RequestAddJobDTO; +import com.aizuda.snailjob.client.job.core.dto.RequestUpdateJobDTO; +import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO; +import com.aizuda.snailjob.common.core.model.Result; + +public interface OpenApiClient { + @Mapping(method = RequestMethod.POST, path = "/api/job/add") + Result addJob(RequestAddJobDTO requestAddJobDTO); + + @Mapping(method = RequestMethod.POST, path = "/api/job/update") + Result updateJob(RequestUpdateJobDTO requestUpdateJobDTO); + + @Mapping(method = RequestMethod.POST, path = "/api/job/getJobDetail") + Result getJobDetail(Long jobId); + + @Mapping(method = RequestMethod.POST, path = "/api/job/triggerJob") + Result triggerJob(Long triggerId); + + @Mapping(method = RequestMethod.POST, path = "/api/job/triggerWorkFlow") + Result triggerWorkFlow(Long triggerId); + + @Mapping(method = RequestMethod.POST, path = "/api/job/updateJobStatus") + Result updateJobStatus(RequestUpdateStatusDTO statusDTO); + + @Mapping(method = RequestMethod.POST, path = "/api/job/updateWorkFlowStatus") + Result updateWorkFlowStatus(RequestUpdateStatusDTO statusDTO); +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/openapi/SnailJobOpenApi.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/openapi/SnailJobOpenApi.java new file mode 100644 index 00000000..1cdef329 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/openapi/SnailJobOpenApi.java @@ -0,0 +1,120 @@ +package com.aizuda.snailjob.client.job.core.openapi; + +import com.aizuda.snailjob.client.job.core.handler.*; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; + +/** + * @author opensnail + * @date 2024-09-21 21:35:34 + * @since sj_1.1.0 + */ +public final class SnailJobOpenApi { + private SnailJobOpenApi() { + } + + /** + * 添加集群定时任务 + * + * @return + */ + public static RequestAddHandler addClusterJob() { + return new RequestAddHandler(JobTaskTypeEnum.CLUSTER, null); + } + + /** + * 添加广播定时任务 + * + * @return + */ + public static RequestAddHandler addBroadcastJob() { + return new RequestAddHandler(JobTaskTypeEnum.BROADCAST, null); + } + + /** + * 添加固定分片定时任务 + * + * @return + */ + public static RequestAddHandler addShardingJob() { + return new RequestAddHandler(JobTaskTypeEnum.SHARDING, null); + } + + /** + * 添加Map定时任务 + * + * @return + */ + public static RequestAddHandler addMapJob() { + return new RequestAddHandler(JobTaskTypeEnum.MAP, null); + } + + /** + * 添加MapReduce定时任务 + * + * @param shardNum Reduce数量 + * @return + */ + public static RequestAddHandler addMapReduceJob(Integer shardNum) { + return new RequestAddHandler(JobTaskTypeEnum.MAP_REDUCE, shardNum); + } + + /** + * 更新定时任务 + * + * @param jobId 定时任务ID + * @return + */ + public static RequestUpdateHandler updateJob(Long jobId) { + return new RequestUpdateHandler(jobId); + } + + /** + * 获取定时任务详情 + * + * @param jobId 定时任务ID + * @return + */ + public static RequestQueryHandler getJobDetail(Long jobId) { + return new RequestQueryHandler(jobId); + } + + /** + * 手动触发定时任务 + * + * @param jobId 定时任务ID + * @return + */ + public static RequestTriggerJobHandler triggerJob(Long jobId) { + return new RequestTriggerJobHandler(jobId, 1); + } + + /** + * 手动触发工作流任务 + * + * @param id 工作流任务ID + * @return + */ + public static RequestTriggerJobHandler triggerWorkFlow(Long id) { + return new RequestTriggerJobHandler(id, 2); + } + + /** + * 更新定时任务状态 + * + * @param jobId 任务ID + * @return + */ + public static RequestUpdateStatusHandler updateJobStatus(Long jobId) { + return new RequestUpdateStatusHandler(jobId, 1); + } + + /** + * 更新工作流任务状态 + * + * @param workFlowId 工作流ID + * @return + */ + public static RequestUpdateStatusHandler updateWorkFlowStatus(Long workFlowId) { + return new RequestUpdateStatusHandler(workFlowId, 2); + } +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/util/ValidatorUtils.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/util/ValidatorUtils.java new file mode 100644 index 00000000..abdd994d --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/util/ValidatorUtils.java @@ -0,0 +1,37 @@ +package com.aizuda.snailjob.client.job.core.util; + +import com.aizuda.snailjob.client.common.exception.SnailJobClientException; +import com.aizuda.snailjob.common.log.SnailJobLog; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.Validator; + +import java.util.Set; + +public class ValidatorUtils { + private static Validator validator; + + static { + validator = Validation.buildDefaultValidatorFactory().getValidator(); + } + + /** + * 校验对象 + * @param object 待校验对象 + * @throws SnailJobClientException 校验不通过,则报SnailJobClientException异常 + */ + public static boolean validateEntity(Object object) + throws SnailJobClientException { + Set> constraintViolations = validator.validate(object); + if (!constraintViolations.isEmpty()) { + StringBuilder msg = new StringBuilder(); + for(ConstraintViolation constraint: constraintViolations){ + msg.append(constraint.getMessage()).append("\n"); + } + SnailJobLog.LOCAL.error(msg.toString()); + return false; + }else { + return true; + } + } +} \ No newline at end of file diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index 5f004a3d..0b54bdd8 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -125,6 +125,20 @@ public interface SystemConstants { */ String RETRY_GENERATE_IDEM_ID = "/retry/generate/idempotent-id/v1"; + String OPENAPI_ADD_JOB = "/api/job/add"; + + String OPENAPI_UPDATE_JOB = "/api/job/update"; + + String OPENAPI_GET_JOB_DETAIL = "/api/job/getJobDetail"; + + String OPENAPI_TRIGGER_JOB = "/api/job/triggerJob"; + + String OPENAPI_TRIGGER_WORKFLOW = "/api/job/triggerWorkFlow"; + + String OPENAPI_UPDATE_JOB_STATUS = "/api/job/updateJobStatus"; + + String OPENAPI_UPDATE_WORKFLOW_STATUS = "/api/job/updateWorkFlowStatus"; + } String LOGO = """ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/BlockStrategyEnum.java similarity index 84% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java rename to snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/BlockStrategyEnum.java index bf9eb654..f436e716 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/BlockStrategyEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/BlockStrategyEnum.java @@ -1,6 +1,6 @@ -package com.aizuda.snailjob.server.job.task.enums; +package com.aizuda.snailjob.common.core.enums; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.common.core.exception.SnailJobCommonException; import lombok.AllArgsConstructor; import lombok.Getter; @@ -43,7 +43,7 @@ public enum BlockStrategyEnum { } } - throw new SnailJobServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy); + throw new SnailJobCommonException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobConverter.java new file mode 100644 index 00000000..e1d2a9ff --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobConverter.java @@ -0,0 +1,24 @@ +package com.aizuda.snailjob.server.common.convert; + +import com.aizuda.snailjob.server.common.vo.JobRequestVO; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + * @author: opensnail + * @date : 2023-10-12 09:40 + * @since : 2.4.0 + */ +@Mapper +public interface JobConverter { + + JobConverter INSTANCE = Mappers.getMapper(JobConverter.class); + + Job convert(JobRequestVO jobRequestVO); + + List convertList(List jobs); + +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobResponseVOConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobResponseVOConverter.java new file mode 100644 index 00000000..16c2e1d0 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobResponseVOConverter.java @@ -0,0 +1,42 @@ +package com.aizuda.snailjob.server.common.convert; + +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.common.vo.JobResponseVO; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Mappings; +import org.mapstruct.factory.Mappers; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Objects; + +/** + * @author opensnail + * @date 2023-10-11 22:50:40 + * @since 2.4.0 + */ +@Mapper +public interface JobResponseVOConverter { + + JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class); + + // @Mappings({ +// @Mapping(source = "nextTriggerAt", target = "nextTriggerAt", expression = "java(DateUtils.toLocalDateTime())") +// }) + List convertList(List jobs); + + @Mappings({ + @Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))") + }) + JobResponseVO convert(Job job); + + static LocalDateTime toLocalDateTime(Long nextTriggerAt) { + if (Objects.isNull(nextTriggerAt) || nextTriggerAt == 0) { + return null; + } + + return DateUtils.toLocalDateTime(nextTriggerAt); + } +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobRequestVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobRequestVO.java new file mode 100644 index 00000000..aac86135 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobRequestVO.java @@ -0,0 +1,124 @@ +package com.aizuda.snailjob.server.common.vo; + +import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Pattern; +import lombok.Data; + +/** + * @author opensnail + * @date 2023-10-11 22:37:55 + * @since 2.4.0 + */ +@Data +public class JobRequestVO { + + private Long id; + + /** + * 组名称 + */ + @NotBlank(message = "groupName 不能为空") + @Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线") + private String groupName; + + /** + * 名称 + */ + @NotBlank(message = "jobName 不能为空") + private String jobName; + + /** + * 重试状态 0、关闭、1、开启 + * {@link StatusEnum} + */ + @NotNull(message = "jobStatus 不能为空") + private Integer jobStatus; + + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private Integer argsType; + + /** + * 执行器路由策略 + */ + @NotNull(message = "routeKey 不能为空") + private Integer routeKey; + + /** + * 执行器类型 + * {@link ExecutorTypeEnum} + */ + @NotNull(message = "executorType 不能为空") + private Integer executorType; + + /** + * 执行器名称 + */ + @NotBlank(message = "executorInfo 不能为空") + private String executorInfo; + + /** + * 触发类型 2. 固定时间 3.CRON 表达式 99.工作流 + */ + @NotNull(message = "triggerType 不能为空") + private Integer triggerType; + + /** + * 间隔时长 + */ + @NotNull(message = "triggerInterval 不能为空") + private String triggerInterval; + + /** + * 阻塞策略 1、丢弃 2、覆盖 3、并行 + */ + @NotNull(message = "blockStrategy 不能为空") + private Integer blockStrategy; + + /** + * 任务执行超时时间,单位秒 + */ + @NotNull(message = "executorTimeout 不能为空") + private Integer executorTimeout; + + /** + * 最大重试次数 + */ + @NotNull(message = "maxRetryTimes 不能为空") + private Integer maxRetryTimes; + + /** + * 重试间隔(s) + */ + @NotNull(message = "retryInterval 不能为空") + private Integer retryInterval; + + /** + * 任务类型 + * {@link JobTaskTypeEnum} + */ + @NotNull(message = "taskType 不能为空") + private Integer taskType; + + /** + * 并行数 + */ + @NotNull(message = "parallelNum 不能为空") + private Integer parallelNum; + + /** + * 描述 + */ + private String description; + +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobResponseVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobResponseVO.java new file mode 100644 index 00000000..368a277d --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobResponseVO.java @@ -0,0 +1,132 @@ +package com.aizuda.snailjob.server.common.vo; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author opensnail + * @date 2023-10-11 22:30:00 + * @since 2.4.0 + */ +@Data +public class JobResponseVO { + + private Long id; + + /** + * 组名称 + */ + private String groupName; + + /** + * 名称 + */ + private String jobName; + + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private String argsType; + + /** + * 扩展字段 + */ + private String extAttrs; + + /** + * 下次触发时间 + */ + private LocalDateTime nextTriggerAt; + + /** + * 重试状态 0、关闭、1、开启 + */ + private Integer jobStatus; + + /** + * 执行器路由策略 + */ + private Integer routeKey; + + /** + * 执行器类型 1、Java + */ + private Integer executorType; + + /** + * 执行器名称 + */ + private String executorInfo; + + /** + * 触发类型 1.CRON 表达式 2. 固定时间 + */ + private Integer triggerType; + + /** + * 间隔时长 + */ + private String triggerInterval; + + /** + * 阻塞策略 1、丢弃 2、覆盖 3、并行 + */ + private Integer blockStrategy; + + /** + * 任务执行超时时间,单位秒 + */ + private Integer executorTimeout; + + /** + * 最大重试次数 + */ + private Integer maxRetryTimes; + + /** + * 重试间隔(s) + */ + private Integer retryInterval; + + /** + * 任务类型 + */ + private Integer taskType; + + /** + * 并行数 + */ + private Integer parallelNum; + + /** + * bucket + */ + private Integer bucketIndex; + + /** + * 描述 + */ + private String description; + + /** + * 创建时间 + */ + private LocalDateTime createDt; + + /** + * 修改时间 + */ + private LocalDateTime updateDt; + + /** + * 逻辑删除 1、删除 + */ + private Integer deleted; + +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobStatusUpdateRequestVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobStatusUpdateRequestVO.java new file mode 100644 index 00000000..1e9cbc79 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobStatusUpdateRequestVO.java @@ -0,0 +1,20 @@ +package com.aizuda.snailjob.server.common.vo; + +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * @author opensnail + * @date 2023-10-15 16:06:20 + * @since 2.4.0 + */ +@Data +public class JobStatusUpdateRequestVO { + + @NotNull(message = "id 不能为空") + private Long id; + + @NotNull(message = "jobStatus 不能为空") + private Integer jobStatus; + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/AbstracJobBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/AbstracJobBlockStrategy.java index f2dbd3d9..71919c16 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/AbstracJobBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/AbstracJobBlockStrategy.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.block.job; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import org.springframework.beans.factory.InitializingBean; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/ConcurrencyBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/ConcurrencyBlockStrategy.java index 8d8a15f4..e2bc5f99 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/ConcurrencyBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/ConcurrencyBlockStrategy.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.block.job; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator; import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardBlockStrategy.java index a6d8bed1..e412acb9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/DiscardBlockStrategy.java @@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.block.job; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator; import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/JobBlockStrategyFactory.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/JobBlockStrategyFactory.java index a14b8977..02943eb6 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/JobBlockStrategyFactory.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/JobBlockStrategyFactory.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.block.job; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import java.util.concurrent.ConcurrentHashMap; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/OverlayBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/OverlayBlockStrategy.java index 697cb2b3..2069bf56 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/OverlayBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/OverlayBlockStrategy.java @@ -1,7 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.block.job; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java index 7516ebc6..baab24f0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/job/RecoveryBlockStrategy.java @@ -5,12 +5,11 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; @@ -20,12 +19,10 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.List; -import java.util.stream.Stream; /** * 重新触发执行失败的任务 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java index 08c207d9..d5ea5abe 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/AbstractWorkflowBlockStrategy.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext; import org.springframework.beans.factory.InitializingBean; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java index 04f12a45..b1766e83 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/ConcurrencyWorkflowBlockStrategy.java @@ -1,7 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; @@ -9,8 +8,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import java.io.IOException; - /** * @author: shuguang.zhang * @date : 2023-12-26 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java index 6cc5c33e..d78052c5 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/DiscardWorkflowBlockStrategy.java @@ -2,8 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; @@ -11,8 +10,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import java.io.IOException; - /** * @author: xiaowoniu * @date : 2023-12-26 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java index 4ad8d1dd..1b41f7a3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/OverlayWorkflowBlockStrategy.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java index eed4835f..c66b53e0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/RecoveryWorkflowBlockStrategy.java @@ -1,7 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java index c5152c8e..81f216f8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/block/workflow/WorkflowBlockStrategyFactory.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import java.util.concurrent.ConcurrentHashMap; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiAddJobRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiAddJobRequestHandler.java new file mode 100644 index 00000000..1e0c0032 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiAddJobRequestHandler.java @@ -0,0 +1,105 @@ +package com.aizuda.snailjob.server.job.task.support.handler; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.net.url.UrlQuery; +import cn.hutool.core.util.HashUtil; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.WaitStrategy; +import com.aizuda.snailjob.server.common.config.SystemProperties; +import com.aizuda.snailjob.server.common.convert.JobConverter; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.strategy.WaitStrategies; +import com.aizuda.snailjob.server.common.util.CronUtils; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.common.util.HttpHeaderUtil; +import com.aizuda.snailjob.server.common.vo.JobRequestVO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +/** + * OPENAPI + * 新增定时任务 + */ +@Component +@RequiredArgsConstructor +public class OpenApiAddJobRequestHandler extends PostHttpRequestHandler { + private final SystemProperties systemProperties; + private final JobMapper jobMapper; + + @Override + public boolean supports(String path) { + return HTTP_PATH.OPENAPI_ADD_JOB.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobLog.LOCAL.debug("Add job content:[{}]", content); + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + JobRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobRequestVO.class); + if(StrUtil.isBlank(jobRequestVO.getGroupName())){ + jobRequestVO.setGroupName(HttpHeaderUtil.getGroupName(headers)); + } + // 判断常驻任务 + Job job = JobConverter.INSTANCE.convert(jobRequestVO); + job.setResident(isResident(jobRequestVO)); + job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) + % systemProperties.getBucketTotal()); + job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli())); + job.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); + job.setId(null); + Assert.isTrue(1 == jobMapper.insert(job), ()-> new SnailJobServerException("新增任务失败")); + return JsonUtil.toJsonString(new NettyResult(job.getId(), retryRequest.getReqId())); + } + + private Integer isResident(JobRequestVO jobRequestVO) { + if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) { + return StatusEnum.NO.getStatus(); + } + + if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) { + if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) { + return StatusEnum.YES.getStatus(); + } + } else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) { + if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) { + return StatusEnum.YES.getStatus(); + } + } else { + throw new SnailJobServerException("未知触发类型"); + } + + return StatusEnum.NO.getStatus(); + } + + private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) { + if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) { + return 0L; + } + + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(time); + return waitStrategy.computeTriggerTime(waitStrategyContext); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiGetJobDetailRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiGetJobDetailRequestHandler.java new file mode 100644 index 00000000..a41fa8c6 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiGetJobDetailRequestHandler.java @@ -0,0 +1,53 @@ +package com.aizuda.snailjob.server.job.task.support.handler; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.convert.JobResponseVOConverter; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.vo.JobResponseVO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +/** + * OPENAPI + * 获取定时任务详情 + */ +@Component +@RequiredArgsConstructor +public class OpenApiGetJobDetailRequestHandler extends PostHttpRequestHandler { + private final JobMapper jobMapper; + + @Override + public boolean supports(String path) { + return HTTP_PATH.OPENAPI_GET_JOB_DETAIL.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobLog.LOCAL.debug("Update job content:[{}]", content); + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + Long jobId = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class); + Assert.notNull(jobId, () -> new SnailJobServerException("id 不能为空")); + + Job job = jobMapper.selectById(jobId); + JobResponseVO convert = JobResponseVOConverter.INSTANCE.convert(job); + return JsonUtil.toJsonString(new NettyResult(convert, retryRequest.getReqId())); + + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiTriggerJobRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiTriggerJobRequestHandler.java new file mode 100644 index 00000000..e59cf56d --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiTriggerJobRequestHandler.java @@ -0,0 +1,80 @@ +package com.aizuda.snailjob.server.job.task.support.handler; + +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.snailjob.server.job.task.support.JobPrepareHandler; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +/** + * OPENAPI + * 调度定时任务 + */ +@Component +@RequiredArgsConstructor +public class OpenApiTriggerJobRequestHandler extends PostHttpRequestHandler { + private final JobMapper jobMapper; + private final AccessTemplate accessTemplate; + private final JobPrepareHandler terminalJobPrepareHandler; + + @Override + public boolean supports(String path) { + return HTTP_PATH.OPENAPI_TRIGGER_JOB.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content); + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + Long jobId = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class); + Job job = jobMapper.selectById(jobId); + if (Objects.isNull(job)){ + SnailJobLog.LOCAL.warn("job can not be null."); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + + long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() + .eq(GroupConfig::getGroupName, job.getGroupName()) + .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + ); + + if (count <= 0){ + SnailJobLog.LOCAL.warn("组:[{}]已经关闭,不支持手动执行.", job.getGroupName()); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); + // 设置now表示立即执行 + jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); + jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType()); + // 创建批次 + terminalJobPrepareHandler.handle(jobTaskPrepare); + + return JsonUtil.toJsonString(new NettyResult(true, retryRequest.getReqId())); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiTriggerWorkFlowRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiTriggerWorkFlowRequestHandler.java new file mode 100644 index 00000000..e9e8e22d --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiTriggerWorkFlowRequestHandler.java @@ -0,0 +1,97 @@ +package com.aizuda.snailjob.server.job.task.support.handler; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.net.url.UrlQuery; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler; +import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.Workflow; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * OPENAPI + * 新增工作流任务 + */ +@Component +@RequiredArgsConstructor +public class OpenApiTriggerWorkFlowRequestHandler extends PostHttpRequestHandler { + private final WorkflowMapper workflowMapper; + private final AccessTemplate accessTemplate; + private final WorkflowPrePareHandler terminalWorkflowPrepareHandler; + + @Override + public boolean supports(String path) { + return HTTP_PATH.OPENAPI_TRIGGER_WORKFLOW.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content); + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + Long id = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class); + Workflow workflow = workflowMapper.selectById(id); + if (Objects.isNull(workflow)){ + SnailJobLog.LOCAL.warn("workflow can not be null."); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + // 将字符串反序列化为 Set + if (StrUtil.isNotBlank(workflow.getGroupName())) { + Set namesSet = new HashSet<>(Arrays.asList(workflow.getGroupName().split(", "))); + + // 判断任务节点相关组有无关闭,存在关闭组则停止执行工作流执行 + if (CollectionUtil.isNotEmpty(namesSet)) { + for (String groupName : namesSet) { + long count = accessTemplate.getGroupConfigAccess().count( + new LambdaQueryWrapper() + .eq(GroupConfig::getGroupName, groupName) + .eq(GroupConfig::getNamespaceId, workflow.getNamespaceId()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + ); + + if (count <= 0){ + SnailJobLog.LOCAL.warn("组:[{}]已经关闭,不支持手动执行.", workflow.getGroupName()); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + } + } + } + + WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow); + // 设置now表示立即执行 + prepareDTO.setNextTriggerAt(DateUtils.toNowMilli()); + prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType()); + + terminalWorkflowPrepareHandler.handler(prepareDTO); + + return JsonUtil.toJsonString(new NettyResult(true, retryRequest.getReqId())); + + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateJobRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateJobRequestHandler.java new file mode 100644 index 00000000..60f53409 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateJobRequestHandler.java @@ -0,0 +1,131 @@ +package com.aizuda.snailjob.server.job.task.support.handler; + +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.WaitStrategy; +import com.aizuda.snailjob.server.common.convert.JobConverter; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.strategy.WaitStrategies; +import com.aizuda.snailjob.server.common.util.CronUtils; +import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.common.util.HttpHeaderUtil; +import com.aizuda.snailjob.server.common.vo.JobRequestVO; +import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Objects; +import java.util.Optional; + +/** + * OPENAPI + * 更新定时任务 + */ +@Component +@RequiredArgsConstructor +public class OpenApiUpdateJobRequestHandler extends PostHttpRequestHandler { + private final JobMapper jobMapper; + + @Override + public boolean supports(String path) { + return HTTP_PATH.OPENAPI_UPDATE_JOB.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobLog.LOCAL.debug("Update job content:[{}]", content); + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + String namespace = HttpHeaderUtil.getNamespace(headers); + JobRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobRequestVO.class); + if (Objects.isNull(jobRequestVO.getId())){ + SnailJobLog.LOCAL.warn("id不能为空,更新失败"); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + + Job job = jobMapper.selectById(jobRequestVO.getId()); + if (Objects.isNull(job)){ + SnailJobLog.LOCAL.warn("job为空,更新失败"); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + + // 判断常驻任务 + Job updateJob = JobConverter.INSTANCE.convert(jobRequestVO); + updateJob.setResident(isResident(jobRequestVO)); + updateJob.setNamespaceId(namespace); + + // 工作流任务 + if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) { + job.setNextTriggerAt(0L); + // 非常驻任务 > 非常驻任务 + } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals( + updateJob.getResident(), + StatusEnum.NO.getStatus())) { + updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli())); + } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals( + updateJob.getResident(), StatusEnum.NO.getStatus())) { + // 常驻任务的触发时间 + long time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())) + .orElse(DateUtils.toNowMilli()); + updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time)); + // 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间 + } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals( + updateJob.getResident(), StatusEnum.YES.getStatus())) { + updateJob.setNextTriggerAt(DateUtils.toNowMilli()); + } + + // 禁止更新组 + updateJob.setGroupName(null); + boolean insert = 1 == jobMapper.updateById(updateJob); + return JsonUtil.toJsonString(new NettyResult(insert, retryRequest.getReqId())); + + } + + private Integer isResident(JobRequestVO jobRequestVO) { + if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) { + return StatusEnum.NO.getStatus(); + } + + if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) { + if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) { + return StatusEnum.YES.getStatus(); + } + } else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) { + if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) { + return StatusEnum.YES.getStatus(); + } + } else { + throw new SnailJobServerException("未知触发类型"); + } + + return StatusEnum.NO.getStatus(); + } + + private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) { + if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) { + return 0L; + } + + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(time); + return waitStrategy.computeTriggerTime(waitStrategyContext); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateJobStatusRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateJobStatusRequestHandler.java new file mode 100644 index 00000000..77362393 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateJobStatusRequestHandler.java @@ -0,0 +1,55 @@ +package com.aizuda.snailjob.server.job.task.support.handler; + +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.vo.JobStatusUpdateRequestVO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +/** + * OPENAPI + * 更新定时任务状态 + */ +@Component +@RequiredArgsConstructor +public class OpenApiUpdateJobStatusRequestHandler extends PostHttpRequestHandler { + private final JobMapper jobMapper; + + @Override + public boolean supports(String path) { + return HTTP_PATH.OPENAPI_UPDATE_JOB_STATUS.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + JobStatusUpdateRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobStatusUpdateRequestVO.class); + Long count = jobMapper.selectCount(new LambdaQueryWrapper().eq(Job::getId, jobRequestVO.getId())); + if (1 != count){ + SnailJobLog.LOCAL.warn("更新任务失败"); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + Job job = new Job(); + job.setId(jobRequestVO.getId()); + job.setJobStatus(jobRequestVO.getJobStatus()); + boolean update = 1 == jobMapper.updateById(job); + return JsonUtil.toJsonString(new NettyResult(update, retryRequest.getReqId())); + + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateWorkFlowStatusRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateWorkFlowStatusRequestHandler.java new file mode 100644 index 00000000..6ec4156c --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/OpenApiUpdateWorkFlowStatusRequestHandler.java @@ -0,0 +1,60 @@ +package com.aizuda.snailjob.server.job.task.support.handler; + +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.vo.JobStatusUpdateRequestVO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Workflow; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +/** + * OPENAPI + * 更新工作流状态 + */ +@Component +@RequiredArgsConstructor +public class OpenApiUpdateWorkFlowStatusRequestHandler extends PostHttpRequestHandler { + private final WorkflowMapper workflowMapper; + + @Override + public boolean supports(String path) { + return HTTP_PATH.OPENAPI_UPDATE_WORKFLOW_STATUS.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + JobStatusUpdateRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobStatusUpdateRequestVO.class); + Workflow workflow = workflowMapper.selectOne( + new LambdaQueryWrapper() + .select(Workflow::getId) + .eq(Workflow::getId, jobRequestVO.getId())); + + if (Objects.isNull(workflow)){ + SnailJobLog.LOCAL.warn("工作流不存在"); + return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId())); + } + workflow.setWorkflowStatus(jobRequestVO.getJobStatus()); + boolean update = 1 == workflowMapper.updateById(workflow); + + return JsonUtil.toJsonString(new NettyResult(update, retryRequest.getReqId())); + + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java index 97dcec2d..a9cd83d6 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/job/RunningJobPrepareHandler.java @@ -7,7 +7,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java index bd90bc00..745e56d8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java @@ -6,7 +6,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java index c8012018..4c636566 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobRequestVO.java @@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.web.model.request; import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; -import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum; +import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Pattern;