feat(sj_1.4.0-beta1): sdk手动触发调用时,支持传递临时参数

This commit is contained in:
srzou 2025-01-13 15:54:11 +08:00 committed by opensnail
parent 6fb6de922c
commit fe740edc81
15 changed files with 252 additions and 37 deletions

View File

@ -21,7 +21,7 @@
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<revision>1.3.0</revision>
<revision>1.4.0-beta1</revision>
<netty-all.version>4.1.114.Final</netty-all.version>
<hutool-all.version>5.8.32</hutool-all.version>
<mybatis-plus.version>3.5.9</mybatis-plus.version>

View File

@ -0,0 +1,17 @@
package com.aizuda.snailjob.client.job.core.dto;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@Data
public class JobTriggerDTO {
@NotNull(message = "jobId 不能为空")
private Long jobId;
/**
* 临时任务参数
*/
private String tmpArgsStr;
}

View File

@ -0,0 +1,56 @@
package com.aizuda.snailjob.client.job.core.handler;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.job.core.dto.JobTriggerDTO;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import lombok.Getter;
import lombok.Setter;
import java.util.HashMap;
import java.util.Map;
public abstract class AbstractTriggerHandler<H, R> extends AbstractRequestHandler<R> {
@Getter
private final JobTriggerDTO reqDTO;
@Setter
private H r;
public AbstractTriggerHandler(Long jobId) {
this.reqDTO = new JobTriggerDTO();
// 设置调度id
reqDTO.setJobId(jobId);
}
/**
* 添加参数可支持多次添加
* 静态分片不可使用该方法
*
* @param argsKey 参数名
* @param argsValue 参数值
* @return
*/
protected H addArgsStr(String argsKey, Object argsValue) {
Map<String, Object> map = new HashMap<>();
if (StrUtil.isNotBlank(reqDTO.getTmpArgsStr())) {
map = JsonUtil.parseHashMap(reqDTO.getTmpArgsStr());
}
map.put(argsKey, argsValue);
reqDTO.setTmpArgsStr(JsonUtil.toJsonString(map));
return r;
}
/**
* 添加静态分片相关参数
*
* @param shardingValue 分片参数
* @return r
*/
protected H addShardingArgs(String... shardingValue) {
reqDTO.setTmpArgsStr(JsonUtil.toJsonString(shardingValue));
return r;
}
protected abstract void afterExecute(Boolean aBoolean);
}

View File

@ -0,0 +1,14 @@
package com.aizuda.snailjob.client.job.core.handler.trigger;
public class BroadcastTriggerHandler extends TriggerJobHandler<BroadcastTriggerHandler>{
public BroadcastTriggerHandler(Long triggerJobId) {
super(triggerJobId);
}
@Override
public BroadcastTriggerHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,14 @@
package com.aizuda.snailjob.client.job.core.handler.trigger;
public class ClusterTriggerHandler extends TriggerJobHandler<ClusterTriggerHandler>{
public ClusterTriggerHandler(Long triggerJobId) {
super(triggerJobId);
}
@Override
public ClusterTriggerHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,14 @@
package com.aizuda.snailjob.client.job.core.handler.trigger;
public class MapReduceTriggerHandler extends TriggerJobHandler<MapReduceTriggerHandler>{
public MapReduceTriggerHandler(Long triggerJobId) {
super(triggerJobId);
}
@Override
public MapReduceTriggerHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,14 @@
package com.aizuda.snailjob.client.job.core.handler.trigger;
public class MapTriggerHandler extends TriggerJobHandler<MapTriggerHandler>{
public MapTriggerHandler(Long triggerJobId) {
super(triggerJobId);
}
@Override
public MapTriggerHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,14 @@
package com.aizuda.snailjob.client.job.core.handler.trigger;
public class ShardingTriggerHandler extends TriggerJobHandler<ShardingTriggerHandler>{
public ShardingTriggerHandler(Long triggerJobId) {
super(triggerJobId);
}
@Override
public ShardingTriggerHandler addShardingArgs(String... shardingValue) {
return super.addShardingArgs(shardingValue);
}
}

View File

@ -3,16 +3,14 @@ package com.aizuda.snailjob.client.job.core.handler.trigger;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.handler.AbstractRequestHandler;
import com.aizuda.snailjob.client.job.core.handler.AbstractTriggerHandler;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
public class TriggerJobHandler extends AbstractRequestHandler<Boolean> {
private final Long triggerJobId;
public abstract class TriggerJobHandler<H> extends AbstractTriggerHandler<H, Boolean> {
public TriggerJobHandler(Long triggerJobId) {
this.triggerJobId = triggerJobId;
super(triggerJobId);
}
@Override
@ -27,7 +25,7 @@ public class TriggerJobHandler extends AbstractRequestHandler<Boolean> {
@Override
protected Boolean doExecute() {
Result<Object> result = client.triggerJob(triggerJobId);
Result<Object> result = client.triggerJob(getReqDTO());
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
return (Boolean)result.getData();
@ -35,6 +33,6 @@ public class TriggerJobHandler extends AbstractRequestHandler<Boolean> {
@Override
protected Pair<Boolean, String> checkRequest() {
return Pair.of(triggerJobId != null && !Long.valueOf(0).equals(triggerJobId), "triggerJobId不能为null并且必须大于0");
return Pair.of(getReqDTO().getJobId() != null && !Long.valueOf(0).equals(getReqDTO().getJobId()), "triggerJobId不能为null并且必须大于0");
}
}

View File

@ -3,15 +3,16 @@ package com.aizuda.snailjob.client.job.core.handler.trigger;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.handler.AbstractRequestHandler;
import com.aizuda.snailjob.client.job.core.handler.AbstractTriggerHandler;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
public class TriggerWorkflowHandler extends AbstractRequestHandler<Boolean> {
private final Long triggerJobId;
public class TriggerWorkflowHandler extends AbstractTriggerHandler<TriggerWorkflowHandler,Boolean> {
public TriggerWorkflowHandler(Long triggerJobId) {
this.triggerJobId = triggerJobId;
super(triggerJobId);
setR(this);
}
@Override
@ -26,7 +27,7 @@ public class TriggerWorkflowHandler extends AbstractRequestHandler<Boolean> {
@Override
protected Boolean doExecute() {
Result<Object> result = client.triggerWorkFlow(triggerJobId);
Result<Object> result = client.triggerWorkFlow(getReqDTO());
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
return (Boolean) result.getData();
@ -34,6 +35,11 @@ public class TriggerWorkflowHandler extends AbstractRequestHandler<Boolean> {
@Override
protected Pair<Boolean, String> checkRequest() {
return Pair.of(triggerJobId != null && !Long.valueOf(0).equals(triggerJobId), "triggerJobId不能为null并且必须大于0");
return Pair.of(getReqDTO().getJobId() != null && !Long.valueOf(0).equals(getReqDTO().getJobId()), "triggerJobId不能为null并且必须大于0");
}
@Override
public TriggerWorkflowHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.client.job.core.openapi;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.job.core.dto.JobTriggerDTO;
import com.aizuda.snailjob.client.job.core.dto.RequestAddOrUpdateJobDTO;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO;
import com.aizuda.snailjob.common.core.model.Result;
@ -19,10 +20,10 @@ public interface OpenApiClient {
Result<Object> getJobDetail(Long jobId);
@Mapping(method = RequestMethod.POST, path = "/api/job/triggerJob")
Result<Object> triggerJob(Long triggerId);
Result<Object> triggerJob(JobTriggerDTO jobTriggerDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/triggerWorkFlow")
Result<Object> triggerWorkFlow(Long triggerId);
Result<Object> triggerWorkFlow(JobTriggerDTO jobTriggerDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/updateJobStatus")
Result<Object> updateJobStatus(RequestUpdateStatusDTO statusDTO);

View File

@ -4,8 +4,7 @@ import com.aizuda.snailjob.client.job.core.handler.add.*;
import com.aizuda.snailjob.client.job.core.handler.delete.DeleteJobHandler;
import com.aizuda.snailjob.client.job.core.handler.delete.DeleteWorkflowHandler;
import com.aizuda.snailjob.client.job.core.handler.query.RequestQueryHandler;
import com.aizuda.snailjob.client.job.core.handler.trigger.TriggerJobHandler;
import com.aizuda.snailjob.client.job.core.handler.trigger.TriggerWorkflowHandler;
import com.aizuda.snailjob.client.job.core.handler.trigger.*;
import com.aizuda.snailjob.client.job.core.handler.update.*;
import java.util.Set;
@ -125,15 +124,58 @@ public final class SnailJobOpenApi {
}
/**
* 手动触发定时任务
* 手动触发广播定时任务
*
* @param jobId 定时任务ID
* @return {@link TriggerJobHandler}
* @return {@link BroadcastTriggerHandler}
*/
public static TriggerJobHandler triggerJob(Long jobId) {
return new TriggerJobHandler(jobId);
public static BroadcastTriggerHandler triggerBroadcastJob(Long jobId) {
return new BroadcastTriggerHandler(jobId);
}
/**
* 手动触发集群定时任务
*
* @param jobId 定时任务ID
* @return {@link ClusterTriggerHandler}
*/
public static ClusterTriggerHandler triggerClusterJob(Long jobId) {
return new ClusterTriggerHandler(jobId);
}
/**
* 手动触发MapReduce定时任务
*
* @param jobId 定时任务ID
* @return {@link MapReduceTriggerHandler}
*/
public static MapReduceTriggerHandler triggerMapReduceJob(Long jobId) {
return new MapReduceTriggerHandler(jobId);
}
/**
* 手动触发Map定时任务
*
* @param jobId 定时任务ID
* @return {@link MapTriggerHandler}
*/
public static MapTriggerHandler triggerMapJob(Long jobId) {
return new MapTriggerHandler(jobId);
}
/**
* 手动触发静态分片定时任务
*
* @param jobId 定时任务ID
* @return {@link ShardingTriggerHandler}
*/
public static ShardingTriggerHandler triggerShardingJob(Long jobId) {
return new ShardingTriggerHandler(jobId);
}
/**
* 手动触发工作流任务
*

View File

@ -0,0 +1,17 @@
package com.aizuda.snailjob.server.common.dto;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@Data
public class JobTriggerDTO {
@NotNull(message = "jobId 不能为空")
private Long jobId;
/**
* 临时任务参数
*/
private String tmpArgsStr;
}

View File

@ -1,13 +1,17 @@
package com.aizuda.snailjob.server.job.task.support.request;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.JobTriggerDTO;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
@ -23,7 +27,6 @@ import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* OPENAPI
@ -51,12 +54,9 @@ public class OpenApiTriggerJobRequestHandler extends PostHttpRequestHandler {
SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long jobId = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Job job = jobMapper.selectById(jobId);
if (Objects.isNull(job)){
SnailJobLog.LOCAL.warn("job can not be null.");
return new SnailJobRpcResult(false, retryRequest.getReqId());
}
JobTriggerDTO jobTriggerDTO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobTriggerDTO.class);
Job job = jobMapper.selectById(jobTriggerDTO.getJobId());
Assert.notNull(job, () -> new SnailJobServerException("job can not be null."));
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, job.getGroupName())
@ -72,6 +72,10 @@ public class OpenApiTriggerJobRequestHandler extends PostHttpRequestHandler {
// 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
// 设置手动参数
if (StrUtil.isNotBlank(jobTriggerDTO.getTmpArgsStr())) {
jobTaskPrepare.setTmpArgsStr(jobTriggerDTO.getTmpArgsStr());
}
// 创建批次
terminalJobPrepareHandler.handle(jobTaskPrepare);

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.request;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
@ -9,7 +10,9 @@ import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.JobTriggerDTO;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
@ -27,7 +30,6 @@ import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
/**
@ -56,12 +58,10 @@ public class OpenApiTriggerWorkFlowRequestHandler extends PostHttpRequestHandler
SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long id = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Workflow workflow = workflowMapper.selectById(id);
if (Objects.isNull(workflow)){
SnailJobLog.LOCAL.warn("workflow can not be null.");
return new SnailJobRpcResult(false, retryRequest.getReqId());
}
JobTriggerDTO workflowDTO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobTriggerDTO.class);
Workflow workflow = workflowMapper.selectById(workflowDTO.getJobId());
Assert.notNull(workflow, () -> new SnailJobServerException("workflow can not be null."));
// 将字符串反序列化为 Set
if (StrUtil.isNotBlank(workflow.getGroupName())) {
Set<String> namesSet = new HashSet<>(Arrays.asList(workflow.getGroupName().split(", ")));
@ -88,7 +88,11 @@ public class OpenApiTriggerWorkFlowRequestHandler extends PostHttpRequestHandler
// 设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
// 设置工作流上下文
String tmpWfContext = workflowDTO.getTmpArgsStr();
if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){
prepareDTO.setWfContext(tmpWfContext);
}
terminalWorkflowPrepareHandler.handler(prepareDTO);
return new SnailJobRpcResult(true, retryRequest.getReqId());