diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java index 6aa337406..ca5309f6d 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractHttpExecutor.java @@ -11,7 +11,9 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import lombok.Data; import org.springframework.util.StringUtils; +import java.util.Iterator; import java.util.Map; +import java.util.Objects; public abstract class AbstractHttpExecutor { @@ -93,7 +95,7 @@ public abstract class AbstractHttpExecutor { private void setDefaultTimeout(HttpParams httpParams) { // 使用milliseconds - httpParams.setTimeout(httpParams.getTimeout() == null ? DEFAULT_TIMEOUT * 1000 : httpParams.getTimeout() * 1000); + httpParams.setTimeout(Objects.isNull(httpParams.getTimeout()) ? DEFAULT_TIMEOUT * 1000 : httpParams.getTimeout() * 1000); } @@ -114,11 +116,18 @@ public abstract class AbstractHttpExecutor { break; } - if (httpParams.getHeaders() != null) { + if (Objects.nonNull(httpParams.getHeaders())) { httpParams.getHeaders().forEach(request::header); } + // 有上下文时,在请求中透传上下文;即工作流中支持上下文的传递 + if ( Objects.nonNull(httpParams.getWfContext())) { + httpParams.getWfContext().forEach((key, value) -> { + String headerValue = (value instanceof String) ? (String) value : JsonUtil.toJsonString(value); + request.header(key, headerValue); + }); + } - if (httpParams.getBody() != null && (httpParams.getMethod().equals(POST_REQUEST_METHOD) + if (Objects.nonNull(httpParams.getBody()) && (httpParams.getMethod().equals(POST_REQUEST_METHOD) || httpParams.getMethod().equals(PUT_REQUEST_METHOD) || httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) { request.body(httpParams.getBody(), httpParams.getMediaType()); @@ -137,6 +146,7 @@ public abstract class AbstractHttpExecutor { private String body; private Map headers; private Integer timeout; + private Map wfContext; } // Logging methods diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java index 4be5e59ea..02ec4c625 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractScriptExecutor.java @@ -50,7 +50,7 @@ public abstract class AbstractScriptExecutor { setScriptPermissions(scriptPath); } - return executeScript(scriptPath); + return executeScript(scriptPath, scriptParams); } private String prepareScriptFile(Long jobId, ScriptParams scriptParams) { @@ -76,7 +76,7 @@ public abstract class AbstractScriptExecutor { case SCRIPT_SCRIPT_CODE_METHOD: // 是否直接写入代码 try { - writeScriptContent(script, scriptParams.getScriptParams()); + writeScriptContent(script, scriptParams); } catch (IOException e) { throw new SnailJobInnerExecutorException("[snail-job] Failed to write script", e); } @@ -121,13 +121,25 @@ public abstract class AbstractScriptExecutor { } } - private void writeScriptContent(File script, String processorInfo) throws IOException { - try (BufferedWriter writer = Files.newBufferedWriter(script.toPath(), getCharset())) { - writer.write(processorInfo); + private void writeScriptContent(File script, ScriptParams scriptParams) throws IOException { + try (BufferedWriter writer = Files.newBufferedWriter(script.toPath(), getScriptChartset(scriptParams))) { + writer.write(scriptParams.getScriptParams()); logInfo("Script content written successfully to: {}", script.getAbsolutePath()); } } + private Charset getScriptChartset(ScriptParams scriptParams) { + String charsetName = scriptParams.getCharset(); + if (StrUtil.isNotBlank(charsetName)) { + try { + return Charset.forName(charsetName); + } catch (Exception e) { + logWarn("[snail-job] Invalid charset:{} . Using default charset.", charsetName); + } + } + return getCharset(); + } + private void setScriptPermissions(String scriptPath) { ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); try { @@ -138,14 +150,14 @@ public abstract class AbstractScriptExecutor { logInfo("chmod 755 authorization complete, ready to start execution~"); } - private ExecuteResult executeScript(String scriptPath) { + private ExecuteResult executeScript(String scriptPath, ScriptParams scriptParams) { ProcessBuilder pb = getScriptProcessBuilder(scriptPath); Process process = null; ExecuteResult executeResult; try { process = pb.start(); - executeResult = captureOutput(process); + executeResult = captureOutput(process, scriptParams); } catch (IOException | InterruptedException e) { throw new SnailJobInnerExecutorException("[snail-job] Script execution failed", e); } finally { @@ -170,21 +182,22 @@ public abstract class AbstractScriptExecutor { } - private ExecuteResult captureOutput(Process process) throws InterruptedException { + private ExecuteResult captureOutput(Process process, ScriptParams scriptParams) throws InterruptedException { StringBuilder inputBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder(); // 使用CountDownLatch来确保输入流和错误流被捕获后再进行判断 CountDownLatch latch = new CountDownLatch(2); + Charset scriptChartset = getScriptChartset(scriptParams); // 启动线程捕获标准输出 new Thread(() -> { - captureStream(process.getInputStream(), inputBuilder); + captureStream(process.getInputStream(), inputBuilder, scriptChartset); latch.countDown(); }).start(); // 启动线程捕获错误输出 new Thread(() -> { - captureStream(process.getErrorStream(), errorBuilder); + captureStream(process.getErrorStream(), errorBuilder, scriptChartset); latch.countDown(); }).start(); @@ -200,8 +213,8 @@ public abstract class AbstractScriptExecutor { return success ? ExecuteResult.success("Script executed successfully.") : ExecuteResult.failure("Script execution failed."); } - private void captureStream(InputStream is, StringBuilder sb) { - try (BufferedReader br = new BufferedReader(new InputStreamReader(is, getCharset()))) { + private void captureStream(InputStream is, StringBuilder sb, Charset charset) { + try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { String line; while ((line = br.readLine()) != null) { sb.append(line).append(System.lineSeparator()); @@ -275,5 +288,6 @@ public abstract class AbstractScriptExecutor { */ private String method; private String scriptParams; + private String charset; } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java index 41db0bc05..8a0d44975 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/SnailJobHttpExecutor.java @@ -10,6 +10,7 @@ import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; +import java.util.Objects; @Component @JobExecutor(name = "snailJobHttpExecutor") @@ -18,12 +19,15 @@ public class SnailJobHttpExecutor extends AbstractHttpExecutor { public ExecuteResult jobExecute(JobArgs jobArgs) { Object jobParams = jobArgs.getJobParams(); HttpParams httpParams = JsonUtil.parseObject((String) jobParams, HttpParams.class); + if (Objects.nonNull(jobArgs.getWfContext())){ + httpParams.setWfContext(jobArgs.getWfContext()); + } httpParams.setMethod(httpParams.getMethod().toUpperCase()); Map hashMap = new HashMap<>(3); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP, snailJobProperties.getGroup()); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP_TOKEN, snailJobProperties.getToken()); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_NAMESPACE, snailJobProperties.getNamespace()); - Map headers = (httpParams.getHeaders() == null || httpParams.getHeaders().isEmpty()) ? new HashMap<>() : httpParams.getHeaders(); + Map headers = (Objects.isNull(httpParams.getHeaders()) || httpParams.getHeaders().isEmpty()) ? new HashMap<>() : httpParams.getHeaders(); headers.putAll(hashMap); httpParams.setHeaders(headers); return process(httpParams);