diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java index a9261c5f5..c0c846e9a 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java @@ -27,6 +27,8 @@ public class JobArgs { private Long taskBatchId; + private Long jobId; + private Map wfContext; private Map changeWfContext; diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java index 44bbb984d..6aa337406 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java @@ -118,7 +118,9 @@ public abstract class AbstractHttpExecutor { httpParams.getHeaders().forEach(request::header); } - if (httpParams.getBody() != null && (httpParams.getMethod().equals(POST_REQUEST_METHOD) || httpParams.getMethod().equals(PUT_REQUEST_METHOD) || httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) { + if (httpParams.getBody() != null && (httpParams.getMethod().equals(POST_REQUEST_METHOD) + || httpParams.getMethod().equals(PUT_REQUEST_METHOD) + || httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) { request.body(httpParams.getBody(), httpParams.getMediaType()); } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 03144958c..3f4d72b77 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -71,6 +71,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { jobArgs.setWfContext(jobContext.getWfContext()); jobArgs.setChangeWfContext(jobContext.getChangeWfContext()); + jobArgs.setJobId(jobContext.getJobId()); try { // 初始化调度信息(日志上报LogUtil) diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java index 2aa50b47a..d8efdb692 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java @@ -7,41 +7,38 @@ import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.exception.SnailJobInnerExecutorException; import com.aizuda.snailjob.common.log.SnailJobLog; -import com.google.common.collect.Sets; import io.micrometer.common.util.StringUtils; +import lombok.Data; import java.io.*; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public abstract class AbstractScriptExecutor { - private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); - // 将所有协议组合成一个正则表达式,使用 "|" 分隔表示或操作 - private static final String PROTOCOLS_PATTERN = String.join("|", DOWNLOAD_PROTOCOL); - // 编译正则表达式,创建 Pattern 对象,作为常量使用 - private static final Pattern PROTOCOL_PATTERN = Pattern.compile("^(" + PROTOCOLS_PATTERN + ")"); - protected static final String SH_SHELL = "/bin/sh"; - protected static final String CMD_SHELL = "cmd.exe"; - - private static final String READ_PATH = "READPATH:"; - private static final String WORKER_DIR = SnailFileUtils.workspace() + "/script_processor/"; - protected ExecuteResult process(Long taskBatchId, String scriptParams) { + // 下载脚本模式 + private static final String SCRIPT_DOWNLOAD_METHOD = "DOWNLOAD"; + + // 直接传入脚本代码 + private static final String SCRIPT_SCRIPT_CODE_METHOD = "SCRIPT_CODE"; + + // 读取本地现成的脚本代码 + private static final String SCRIPT_LOCAL_SCRIPT_METHOD = "LOCAL_SCRIPT"; + + protected ExecuteResult process(Long jobId, ScriptParams scriptParams) { logInfo("ScriptProcessor start to process, params: {}", scriptParams); if (scriptParams == null) { logWarn("ScriptParams is null, please check jobParam configuration."); return ExecuteResult.failure("ScriptParams is null."); } - String scriptPath = prepareScriptFile(taskBatchId, scriptParams); + String scriptPath = prepareScriptFile(jobId, scriptParams); logInfo("Generate executable file successfully, path: {}", scriptPath); if (SnailJobSystemUtil.isOsWindows() && SH_SHELL.equals(getRunCommand())) { @@ -56,46 +53,41 @@ public abstract class AbstractScriptExecutor { return executeScript(scriptPath); } - private String prepareScriptFile(Long taskBatchId, String processorInfo) { - String scriptPath = WORKER_DIR + getScriptName(taskBatchId); + private String prepareScriptFile(Long jobId, ScriptParams scriptParams) { + String scriptPath = WORKER_DIR + getScriptName(jobId); File script = new File(scriptPath); scriptPath = script.getAbsolutePath(); - if (script.exists()) { - return scriptPath; - } + // 创建脚本目录 ensureScriptDirectory(script); - // 是否是本地目录 - if (processorInfo.startsWith(READ_PATH)) { - return handleLocalScript(script, scriptPath, processorInfo); + switch (scriptParams.getMethod()) { + case SCRIPT_LOCAL_SCRIPT_METHOD: + // 是否是本地目录 + return handleLocalScript(script, scriptPath, scriptParams.getScriptParams()); + case SCRIPT_DOWNLOAD_METHOD: + // 是否为下载 + try { + SnailJobFileUtil.downloadFile(scriptParams.getScriptParams(), script, 5000, 300000); + } catch (IOException e) { + throw new SnailJobInnerExecutorException("[snail-job] Script download failed", e); + } + return scriptPath; + case SCRIPT_SCRIPT_CODE_METHOD: + // 是否直接写入代码 + try { + writeScriptContent(script, scriptParams.getScriptParams()); + } catch (IOException e) { + throw new SnailJobInnerExecutorException("[snail-job] Failed to write script", e); + } + return scriptPath; + default: + throw new SnailJobInnerExecutorException("[snail-job] Please correctly choose the script execution method."); } - - // 是否为下载 - // 如果是下载链接,则从网络获取 - Matcher matcher = PROTOCOL_PATTERN.matcher(processorInfo); - if (matcher.find()) { - try { - SnailJobFileUtil.downloadFile(processorInfo, script, 5000, 300000); - } catch (IOException e) { - throw new SnailJobInnerExecutorException("[snail-job] Script download failed", e); - } - return scriptPath; - } - - // 写入脚本 - try { - writeScriptContent(script, processorInfo); - } catch (IOException e) { - throw new SnailJobInnerExecutorException("[snail-job] Failed to write script", e); - } - return scriptPath; } private String handleLocalScript(File script, String scriptPath, String processorInfo) { - // 去掉 "READPATH:" 前缀 - String newProcessorInfo = processorInfo.substring(READ_PATH.length()).trim(); - File routhFile = new File(newProcessorInfo); + File routhFile = new File(processorInfo); // 判断文件是否存在 if (routhFile.exists()) { @@ -113,7 +105,7 @@ public abstract class AbstractScriptExecutor { } return scriptPath; } else { - throw new SnailJobInnerExecutorException("File not found: {" + newProcessorInfo + "}"); + throw new SnailJobInnerExecutorException("File not found: {" + processorInfo + "}"); } } @@ -147,7 +139,7 @@ public abstract class AbstractScriptExecutor { } private ExecuteResult executeScript(String scriptPath) { - ProcessBuilder pb = getProcessBuilder(scriptPath); + ProcessBuilder pb = getScriptProcessBuilder(scriptPath); Process process = null; ExecuteResult executeResult; @@ -178,19 +170,29 @@ public abstract class AbstractScriptExecutor { } - private ProcessBuilder getProcessBuilder(String scriptPath) { - return getRunCommand().equals(CMD_SHELL) ? - new ProcessBuilder(getRunCommand(), "/c", scriptPath) : - new ProcessBuilder(getRunCommand(), scriptPath); - } - private ExecuteResult captureOutput(Process process) throws InterruptedException { StringBuilder inputBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder(); + // 使用CountDownLatch来确保输入流和错误流被捕获后再进行判断 + CountDownLatch latch = new CountDownLatch(2); + + // 启动线程捕获标准输出 + new Thread(() -> { + captureStream(process.getInputStream(), inputBuilder); + latch.countDown(); + }).start(); + + // 启动线程捕获错误输出 + new Thread(() -> { + captureStream(process.getErrorStream(), errorBuilder); + latch.countDown(); + }).start(); + + // 等待子进程完成 boolean success = process.waitFor() == 0; - captureStream(process.getInputStream(), inputBuilder); - captureStream(process.getErrorStream(), errorBuilder); + // 等待输入输出线程完成 + latch.await(); String result = formatResult(inputBuilder, errorBuilder); logInfo(result); @@ -233,6 +235,8 @@ public abstract class AbstractScriptExecutor { return StandardCharsets.UTF_8; } + protected abstract ProcessBuilder getScriptProcessBuilder(String scriptPath); + // Logging methods private void logInfo(String msg, Object... params) { SnailJobLog.REMOTE.info("[snail-job] " + msg, params); @@ -261,4 +265,15 @@ public abstract class AbstractScriptExecutor { return userHome; } } + + @Data + public static class ScriptParams { + /** + * 1.DOWNLOAD 需下载脚本 + * 2.SCRIPT_CODE 脚本代码 + * 3.LOCAL_SCRIPT 使用本地脚本 + */ + private String method; + private String scriptParams; + } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/CMDExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/CMDExecutor.java index 069eb0894..94fe75056 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/CMDExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/CMDExecutor.java @@ -6,8 +6,8 @@ import java.nio.charset.Charset; public class CMDExecutor extends AbstractScriptExecutor { @Override - protected String getScriptName(Long taskBatchId) { - return String.format("cmd_%d.bat", taskBatchId); + protected String getScriptName(Long jobId) { + return String.format("cmd_%d.bat", jobId); } @Override @@ -19,4 +19,9 @@ public class CMDExecutor extends AbstractScriptExecutor { protected Charset getCharset() { return Charset.defaultCharset(); } + + @Override + protected ProcessBuilder getScriptProcessBuilder(String scriptPath) { + return new ProcessBuilder(getRunCommand(), "/c", scriptPath); + } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/PowerShellExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/PowerShellExecutor.java index cc975f972..06b506b10 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/PowerShellExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/PowerShellExecutor.java @@ -6,8 +6,8 @@ import java.nio.charset.Charset; public class PowerShellExecutor extends AbstractScriptExecutor { @Override - protected String getScriptName(Long taskBatchId) { - return String.format("powershell_%d.ps1", taskBatchId); + protected String getScriptName(Long jobId) { + return String.format("powershell_%d.ps1", jobId); } @Override @@ -19,4 +19,9 @@ public class PowerShellExecutor extends AbstractScriptExecutor { protected Charset getCharset() { return Charset.defaultCharset(); } + + @Override + protected ProcessBuilder getScriptProcessBuilder(String scriptPath) { + return new ProcessBuilder(getRunCommand(), "-ExecutionPolicy", "Bypass", "-File", scriptPath); + } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/ShellExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/ShellExecutor.java index 131788f4a..9d196011c 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/ShellExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/ShellExecutor.java @@ -4,12 +4,17 @@ package com.aizuda.snailjob.client.job.core.executor; public class ShellExecutor extends AbstractScriptExecutor { @Override - protected String getScriptName(Long taskBatchId) { - return String.format("shell_%d.sh", taskBatchId); + protected String getScriptName(Long jobId) { + return String.format("shell_%d.sh", jobId); } @Override protected String getRunCommand() { return SH_SHELL; } + + @Override + protected ProcessBuilder getScriptProcessBuilder(String scriptPath) { + return new ProcessBuilder("sh", scriptPath); + } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobCMDJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobCMDJobExecutor.java index ba3ba3bec..02fa3bf85 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobCMDJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobCMDJobExecutor.java @@ -4,6 +4,7 @@ package com.aizuda.snailjob.client.job.core.executor; import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; import com.aizuda.snailjob.client.job.core.dto.JobArgs; import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; import org.springframework.stereotype.Component; @Component @@ -11,8 +12,9 @@ import org.springframework.stereotype.Component; public class SnailJobCMDJobExecutor extends CMDExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) { - String scriptParam = (String) jobArgs.getJobParams(); - return process(jobArgs.getTaskBatchId(), scriptParam); + Object jobParams = jobArgs.getJobParams(); + ScriptParams scriptParams = JsonUtil.parseObject((String) jobParams, ScriptParams.class); + return process(jobArgs.getJobId(), scriptParams); } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java index a9c9fd23f..41db0bc05 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java @@ -18,6 +18,7 @@ public class SnailJobHttpExecutor extends AbstractHttpExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) { Object jobParams = jobArgs.getJobParams(); HttpParams httpParams = JsonUtil.parseObject((String) jobParams, HttpParams.class); + httpParams.setMethod(httpParams.getMethod().toUpperCase()); Map hashMap = new HashMap<>(3); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP, snailJobProperties.getGroup()); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP_TOKEN, snailJobProperties.getToken()); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobPowerShellJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobPowerShellJobExecutor.java index c0a84bf87..89ac9cf8a 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobPowerShellJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobPowerShellJobExecutor.java @@ -4,6 +4,7 @@ package com.aizuda.snailjob.client.job.core.executor; import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; import com.aizuda.snailjob.client.job.core.dto.JobArgs; import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; import org.springframework.stereotype.Component; @Component @@ -11,8 +12,9 @@ import org.springframework.stereotype.Component; public class SnailJobPowerShellJobExecutor extends PowerShellExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) { - String scriptParam = (String) jobArgs.getJobParams(); - return process(jobArgs.getTaskBatchId(), scriptParam); + Object jobParams = jobArgs.getJobParams(); + ScriptParams scriptParams = JsonUtil.parseObject((String) jobParams, ScriptParams.class); + return process(jobArgs.getJobId(), scriptParams); } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobShellJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobShellJobExecutor.java index fbff6cd46..c826c76ee 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobShellJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobShellJobExecutor.java @@ -4,6 +4,7 @@ package com.aizuda.snailjob.client.job.core.executor; import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; import com.aizuda.snailjob.client.job.core.dto.JobArgs; import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; import org.springframework.stereotype.Component; @Component @@ -11,8 +12,9 @@ import org.springframework.stereotype.Component; public class SnailJobShellJobExecutor extends ShellExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) { - String scriptParam = (String) jobArgs.getJobParams(); - return process(jobArgs.getTaskBatchId(), scriptParam); + Object jobParams = jobArgs.getJobParams(); + ScriptParams scriptParams = JsonUtil.parseObject((String) jobParams, ScriptParams.class); + return process(jobArgs.getJobId(), scriptParams); } }