fix(sj_1.2.0-beta1):优化CMD、PowerShell、Shell、Http相关执行器
This commit is contained in:
parent
18b7b8fc4e
commit
5e5d8b5fb1
@ -27,6 +27,8 @@ public class JobArgs {
|
||||
|
||||
private Long taskBatchId;
|
||||
|
||||
private Long jobId;
|
||||
|
||||
private Map<String, Object> wfContext;
|
||||
|
||||
private Map<String, Object> changeWfContext;
|
||||
|
@ -118,7 +118,9 @@ public abstract class AbstractHttpExecutor {
|
||||
httpParams.getHeaders().forEach(request::header);
|
||||
}
|
||||
|
||||
if (httpParams.getBody() != null && (httpParams.getMethod().equals(POST_REQUEST_METHOD) || httpParams.getMethod().equals(PUT_REQUEST_METHOD) || httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) {
|
||||
if (httpParams.getBody() != null && (httpParams.getMethod().equals(POST_REQUEST_METHOD)
|
||||
|| httpParams.getMethod().equals(PUT_REQUEST_METHOD)
|
||||
|| httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) {
|
||||
request.body(httpParams.getBody(), httpParams.getMediaType());
|
||||
}
|
||||
|
||||
|
@ -71,6 +71,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
|
||||
jobArgs.setWfContext(jobContext.getWfContext());
|
||||
jobArgs.setChangeWfContext(jobContext.getChangeWfContext());
|
||||
jobArgs.setJobId(jobContext.getJobId());
|
||||
|
||||
try {
|
||||
// 初始化调度信息(日志上报LogUtil)
|
||||
|
@ -7,41 +7,38 @@ import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||
import com.aizuda.snailjob.common.core.exception.SnailJobInnerExecutorException;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.google.common.collect.Sets;
|
||||
import io.micrometer.common.util.StringUtils;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public abstract class AbstractScriptExecutor {
|
||||
|
||||
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
|
||||
// 将所有协议组合成一个正则表达式,使用 "|" 分隔表示或操作
|
||||
private static final String PROTOCOLS_PATTERN = String.join("|", DOWNLOAD_PROTOCOL);
|
||||
// 编译正则表达式,创建 Pattern 对象,作为常量使用
|
||||
private static final Pattern PROTOCOL_PATTERN = Pattern.compile("^(" + PROTOCOLS_PATTERN + ")");
|
||||
|
||||
protected static final String SH_SHELL = "/bin/sh";
|
||||
|
||||
protected static final String CMD_SHELL = "cmd.exe";
|
||||
|
||||
private static final String READ_PATH = "READPATH:";
|
||||
|
||||
private static final String WORKER_DIR = SnailFileUtils.workspace() + "/script_processor/";
|
||||
|
||||
protected ExecuteResult process(Long taskBatchId, String scriptParams) {
|
||||
// 下载脚本模式
|
||||
private static final String SCRIPT_DOWNLOAD_METHOD = "DOWNLOAD";
|
||||
|
||||
// 直接传入脚本代码
|
||||
private static final String SCRIPT_SCRIPT_CODE_METHOD = "SCRIPT_CODE";
|
||||
|
||||
// 读取本地现成的脚本代码
|
||||
private static final String SCRIPT_LOCAL_SCRIPT_METHOD = "LOCAL_SCRIPT";
|
||||
|
||||
protected ExecuteResult process(Long jobId, ScriptParams scriptParams) {
|
||||
logInfo("ScriptProcessor start to process, params: {}", scriptParams);
|
||||
if (scriptParams == null) {
|
||||
logWarn("ScriptParams is null, please check jobParam configuration.");
|
||||
return ExecuteResult.failure("ScriptParams is null.");
|
||||
}
|
||||
String scriptPath = prepareScriptFile(taskBatchId, scriptParams);
|
||||
String scriptPath = prepareScriptFile(jobId, scriptParams);
|
||||
logInfo("Generate executable file successfully, path: {}", scriptPath);
|
||||
|
||||
if (SnailJobSystemUtil.isOsWindows() && SH_SHELL.equals(getRunCommand())) {
|
||||
@ -56,46 +53,41 @@ public abstract class AbstractScriptExecutor {
|
||||
return executeScript(scriptPath);
|
||||
}
|
||||
|
||||
private String prepareScriptFile(Long taskBatchId, String processorInfo) {
|
||||
String scriptPath = WORKER_DIR + getScriptName(taskBatchId);
|
||||
private String prepareScriptFile(Long jobId, ScriptParams scriptParams) {
|
||||
String scriptPath = WORKER_DIR + getScriptName(jobId);
|
||||
File script = new File(scriptPath);
|
||||
scriptPath = script.getAbsolutePath();
|
||||
if (script.exists()) {
|
||||
return scriptPath;
|
||||
}
|
||||
|
||||
// 创建脚本目录
|
||||
ensureScriptDirectory(script);
|
||||
|
||||
switch (scriptParams.getMethod()) {
|
||||
case SCRIPT_LOCAL_SCRIPT_METHOD:
|
||||
// 是否是本地目录
|
||||
if (processorInfo.startsWith(READ_PATH)) {
|
||||
return handleLocalScript(script, scriptPath, processorInfo);
|
||||
}
|
||||
|
||||
return handleLocalScript(script, scriptPath, scriptParams.getScriptParams());
|
||||
case SCRIPT_DOWNLOAD_METHOD:
|
||||
// 是否为下载
|
||||
// 如果是下载链接,则从网络获取
|
||||
Matcher matcher = PROTOCOL_PATTERN.matcher(processorInfo);
|
||||
if (matcher.find()) {
|
||||
try {
|
||||
SnailJobFileUtil.downloadFile(processorInfo, script, 5000, 300000);
|
||||
SnailJobFileUtil.downloadFile(scriptParams.getScriptParams(), script, 5000, 300000);
|
||||
} catch (IOException e) {
|
||||
throw new SnailJobInnerExecutorException("[snail-job] Script download failed", e);
|
||||
}
|
||||
return scriptPath;
|
||||
}
|
||||
|
||||
// 写入脚本
|
||||
case SCRIPT_SCRIPT_CODE_METHOD:
|
||||
// 是否直接写入代码
|
||||
try {
|
||||
writeScriptContent(script, processorInfo);
|
||||
writeScriptContent(script, scriptParams.getScriptParams());
|
||||
} catch (IOException e) {
|
||||
throw new SnailJobInnerExecutorException("[snail-job] Failed to write script", e);
|
||||
}
|
||||
return scriptPath;
|
||||
default:
|
||||
throw new SnailJobInnerExecutorException("[snail-job] Please correctly choose the script execution method.");
|
||||
}
|
||||
}
|
||||
|
||||
private String handleLocalScript(File script, String scriptPath, String processorInfo) {
|
||||
// 去掉 "READPATH:" 前缀
|
||||
String newProcessorInfo = processorInfo.substring(READ_PATH.length()).trim();
|
||||
File routhFile = new File(newProcessorInfo);
|
||||
File routhFile = new File(processorInfo);
|
||||
|
||||
// 判断文件是否存在
|
||||
if (routhFile.exists()) {
|
||||
@ -113,7 +105,7 @@ public abstract class AbstractScriptExecutor {
|
||||
}
|
||||
return scriptPath;
|
||||
} else {
|
||||
throw new SnailJobInnerExecutorException("File not found: {" + newProcessorInfo + "}");
|
||||
throw new SnailJobInnerExecutorException("File not found: {" + processorInfo + "}");
|
||||
}
|
||||
}
|
||||
|
||||
@ -147,7 +139,7 @@ public abstract class AbstractScriptExecutor {
|
||||
}
|
||||
|
||||
private ExecuteResult executeScript(String scriptPath) {
|
||||
ProcessBuilder pb = getProcessBuilder(scriptPath);
|
||||
ProcessBuilder pb = getScriptProcessBuilder(scriptPath);
|
||||
|
||||
Process process = null;
|
||||
ExecuteResult executeResult;
|
||||
@ -178,19 +170,29 @@ public abstract class AbstractScriptExecutor {
|
||||
|
||||
}
|
||||
|
||||
private ProcessBuilder getProcessBuilder(String scriptPath) {
|
||||
return getRunCommand().equals(CMD_SHELL) ?
|
||||
new ProcessBuilder(getRunCommand(), "/c", scriptPath) :
|
||||
new ProcessBuilder(getRunCommand(), scriptPath);
|
||||
}
|
||||
|
||||
private ExecuteResult captureOutput(Process process) throws InterruptedException {
|
||||
StringBuilder inputBuilder = new StringBuilder();
|
||||
StringBuilder errorBuilder = new StringBuilder();
|
||||
// 使用CountDownLatch来确保输入流和错误流被捕获后再进行判断
|
||||
CountDownLatch latch = new CountDownLatch(2);
|
||||
|
||||
// 启动线程捕获标准输出
|
||||
new Thread(() -> {
|
||||
captureStream(process.getInputStream(), inputBuilder);
|
||||
latch.countDown();
|
||||
}).start();
|
||||
|
||||
// 启动线程捕获错误输出
|
||||
new Thread(() -> {
|
||||
captureStream(process.getErrorStream(), errorBuilder);
|
||||
latch.countDown();
|
||||
}).start();
|
||||
|
||||
// 等待子进程完成
|
||||
boolean success = process.waitFor() == 0;
|
||||
|
||||
captureStream(process.getInputStream(), inputBuilder);
|
||||
captureStream(process.getErrorStream(), errorBuilder);
|
||||
// 等待输入输出线程完成
|
||||
latch.await();
|
||||
|
||||
String result = formatResult(inputBuilder, errorBuilder);
|
||||
logInfo(result);
|
||||
@ -233,6 +235,8 @@ public abstract class AbstractScriptExecutor {
|
||||
return StandardCharsets.UTF_8;
|
||||
}
|
||||
|
||||
protected abstract ProcessBuilder getScriptProcessBuilder(String scriptPath);
|
||||
|
||||
// Logging methods
|
||||
private void logInfo(String msg, Object... params) {
|
||||
SnailJobLog.REMOTE.info("[snail-job] " + msg, params);
|
||||
@ -261,4 +265,15 @@ public abstract class AbstractScriptExecutor {
|
||||
return userHome;
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class ScriptParams {
|
||||
/**
|
||||
* 1.DOWNLOAD 需下载脚本
|
||||
* 2.SCRIPT_CODE 脚本代码
|
||||
* 3.LOCAL_SCRIPT 使用本地脚本
|
||||
*/
|
||||
private String method;
|
||||
private String scriptParams;
|
||||
}
|
||||
}
|
||||
|
@ -6,8 +6,8 @@ import java.nio.charset.Charset;
|
||||
public class CMDExecutor extends AbstractScriptExecutor {
|
||||
|
||||
@Override
|
||||
protected String getScriptName(Long taskBatchId) {
|
||||
return String.format("cmd_%d.bat", taskBatchId);
|
||||
protected String getScriptName(Long jobId) {
|
||||
return String.format("cmd_%d.bat", jobId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -19,4 +19,9 @@ public class CMDExecutor extends AbstractScriptExecutor {
|
||||
protected Charset getCharset() {
|
||||
return Charset.defaultCharset();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ProcessBuilder getScriptProcessBuilder(String scriptPath) {
|
||||
return new ProcessBuilder(getRunCommand(), "/c", scriptPath);
|
||||
}
|
||||
}
|
||||
|
@ -6,8 +6,8 @@ import java.nio.charset.Charset;
|
||||
public class PowerShellExecutor extends AbstractScriptExecutor {
|
||||
|
||||
@Override
|
||||
protected String getScriptName(Long taskBatchId) {
|
||||
return String.format("powershell_%d.ps1", taskBatchId);
|
||||
protected String getScriptName(Long jobId) {
|
||||
return String.format("powershell_%d.ps1", jobId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -19,4 +19,9 @@ public class PowerShellExecutor extends AbstractScriptExecutor {
|
||||
protected Charset getCharset() {
|
||||
return Charset.defaultCharset();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ProcessBuilder getScriptProcessBuilder(String scriptPath) {
|
||||
return new ProcessBuilder(getRunCommand(), "-ExecutionPolicy", "Bypass", "-File", scriptPath);
|
||||
}
|
||||
}
|
||||
|
@ -4,12 +4,17 @@ package com.aizuda.snailjob.client.job.core.executor;
|
||||
public class ShellExecutor extends AbstractScriptExecutor {
|
||||
|
||||
@Override
|
||||
protected String getScriptName(Long taskBatchId) {
|
||||
return String.format("shell_%d.sh", taskBatchId);
|
||||
protected String getScriptName(Long jobId) {
|
||||
return String.format("shell_%d.sh", jobId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRunCommand() {
|
||||
return SH_SHELL;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ProcessBuilder getScriptProcessBuilder(String scriptPath) {
|
||||
return new ProcessBuilder("sh", scriptPath);
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ package com.aizuda.snailjob.client.job.core.executor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ -11,8 +12,9 @@ import org.springframework.stereotype.Component;
|
||||
public class SnailJobCMDJobExecutor extends CMDExecutor {
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) {
|
||||
String scriptParam = (String) jobArgs.getJobParams();
|
||||
return process(jobArgs.getTaskBatchId(), scriptParam);
|
||||
Object jobParams = jobArgs.getJobParams();
|
||||
ScriptParams scriptParams = JsonUtil.parseObject((String) jobParams, ScriptParams.class);
|
||||
return process(jobArgs.getJobId(), scriptParams);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ public class SnailJobHttpExecutor extends AbstractHttpExecutor {
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) {
|
||||
Object jobParams = jobArgs.getJobParams();
|
||||
HttpParams httpParams = JsonUtil.parseObject((String) jobParams, HttpParams.class);
|
||||
httpParams.setMethod(httpParams.getMethod().toUpperCase());
|
||||
Map<String, String> hashMap = new HashMap<>(3);
|
||||
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP, snailJobProperties.getGroup());
|
||||
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP_TOKEN, snailJobProperties.getToken());
|
||||
|
@ -4,6 +4,7 @@ package com.aizuda.snailjob.client.job.core.executor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ -11,8 +12,9 @@ import org.springframework.stereotype.Component;
|
||||
public class SnailJobPowerShellJobExecutor extends PowerShellExecutor {
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) {
|
||||
String scriptParam = (String) jobArgs.getJobParams();
|
||||
return process(jobArgs.getTaskBatchId(), scriptParam);
|
||||
Object jobParams = jobArgs.getJobParams();
|
||||
ScriptParams scriptParams = JsonUtil.parseObject((String) jobParams, ScriptParams.class);
|
||||
return process(jobArgs.getJobId(), scriptParams);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ package com.aizuda.snailjob.client.job.core.executor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ -11,8 +12,9 @@ import org.springframework.stereotype.Component;
|
||||
public class SnailJobShellJobExecutor extends ShellExecutor {
|
||||
|
||||
public ExecuteResult jobExecute(JobArgs jobArgs) {
|
||||
String scriptParam = (String) jobArgs.getJobParams();
|
||||
return process(jobArgs.getTaskBatchId(), scriptParam);
|
||||
Object jobParams = jobArgs.getJobParams();
|
||||
ScriptParams scriptParams = JsonUtil.parseObject((String) jobParams, ScriptParams.class);
|
||||
return process(jobArgs.getJobId(), scriptParams);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user