feat:(1.2.0-beta2): 1、内置脚本执行器支持自定义编码;2、内置http执行器支持工作流任务在请求头中透传工作流上下文内容。

This commit is contained in:
srzou 2024-10-19 19:08:50 +08:00
parent bde37ee3e8
commit 5f355bb1d8
3 changed files with 44 additions and 16 deletions

View File

@ -11,7 +11,9 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import lombok.Data; import lombok.Data;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public abstract class AbstractHttpExecutor { public abstract class AbstractHttpExecutor {
@ -93,7 +95,7 @@ public abstract class AbstractHttpExecutor {
private void setDefaultTimeout(HttpParams httpParams) { private void setDefaultTimeout(HttpParams httpParams) {
// 使用milliseconds // 使用milliseconds
httpParams.setTimeout(httpParams.getTimeout() == null ? DEFAULT_TIMEOUT * 1000 : httpParams.getTimeout() * 1000); httpParams.setTimeout(Objects.isNull(httpParams.getTimeout()) ? DEFAULT_TIMEOUT * 1000 : httpParams.getTimeout() * 1000);
} }
@ -114,11 +116,18 @@ public abstract class AbstractHttpExecutor {
break; break;
} }
if (httpParams.getHeaders() != null) { if (Objects.nonNull(httpParams.getHeaders())) {
httpParams.getHeaders().forEach(request::header); httpParams.getHeaders().forEach(request::header);
} }
// 有上下文时在请求中透传上下文;即工作流中支持上下文的传递
if ( Objects.nonNull(httpParams.getWfContext())) {
httpParams.getWfContext().forEach((key, value) -> {
String headerValue = (value instanceof String) ? (String) value : JsonUtil.toJsonString(value);
request.header(key, headerValue);
});
}
if (httpParams.getBody() != null && (httpParams.getMethod().equals(POST_REQUEST_METHOD) if (Objects.nonNull(httpParams.getBody()) && (httpParams.getMethod().equals(POST_REQUEST_METHOD)
|| httpParams.getMethod().equals(PUT_REQUEST_METHOD) || httpParams.getMethod().equals(PUT_REQUEST_METHOD)
|| httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) { || httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) {
request.body(httpParams.getBody(), httpParams.getMediaType()); request.body(httpParams.getBody(), httpParams.getMediaType());
@ -137,6 +146,7 @@ public abstract class AbstractHttpExecutor {
private String body; private String body;
private Map<String, String> headers; private Map<String, String> headers;
private Integer timeout; private Integer timeout;
private Map<String, Object> wfContext;
} }
// Logging methods // Logging methods

View File

@ -50,7 +50,7 @@ public abstract class AbstractScriptExecutor {
setScriptPermissions(scriptPath); setScriptPermissions(scriptPath);
} }
return executeScript(scriptPath); return executeScript(scriptPath, scriptParams);
} }
private String prepareScriptFile(Long jobId, ScriptParams scriptParams) { private String prepareScriptFile(Long jobId, ScriptParams scriptParams) {
@ -76,7 +76,7 @@ public abstract class AbstractScriptExecutor {
case SCRIPT_SCRIPT_CODE_METHOD: case SCRIPT_SCRIPT_CODE_METHOD:
// 是否直接写入代码 // 是否直接写入代码
try { try {
writeScriptContent(script, scriptParams.getScriptParams()); writeScriptContent(script, scriptParams);
} catch (IOException e) { } catch (IOException e) {
throw new SnailJobInnerExecutorException("[snail-job] Failed to write script", e); throw new SnailJobInnerExecutorException("[snail-job] Failed to write script", e);
} }
@ -121,13 +121,25 @@ public abstract class AbstractScriptExecutor {
} }
} }
private void writeScriptContent(File script, String processorInfo) throws IOException { private void writeScriptContent(File script, ScriptParams scriptParams) throws IOException {
try (BufferedWriter writer = Files.newBufferedWriter(script.toPath(), getCharset())) { try (BufferedWriter writer = Files.newBufferedWriter(script.toPath(), getScriptChartset(scriptParams))) {
writer.write(processorInfo); writer.write(scriptParams.getScriptParams());
logInfo("Script content written successfully to: {}", script.getAbsolutePath()); logInfo("Script content written successfully to: {}", script.getAbsolutePath());
} }
} }
private Charset getScriptChartset(ScriptParams scriptParams) {
String charsetName = scriptParams.getCharset();
if (StrUtil.isNotBlank(charsetName)) {
try {
return Charset.forName(charsetName);
} catch (Exception e) {
logWarn("[snail-job] Invalid charset:{} . Using default charset.", charsetName);
}
}
return getCharset();
}
private void setScriptPermissions(String scriptPath) { private void setScriptPermissions(String scriptPath) {
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
try { try {
@ -138,14 +150,14 @@ public abstract class AbstractScriptExecutor {
logInfo("chmod 755 authorization complete, ready to start execution~"); logInfo("chmod 755 authorization complete, ready to start execution~");
} }
private ExecuteResult executeScript(String scriptPath) { private ExecuteResult executeScript(String scriptPath, ScriptParams scriptParams) {
ProcessBuilder pb = getScriptProcessBuilder(scriptPath); ProcessBuilder pb = getScriptProcessBuilder(scriptPath);
Process process = null; Process process = null;
ExecuteResult executeResult; ExecuteResult executeResult;
try { try {
process = pb.start(); process = pb.start();
executeResult = captureOutput(process); executeResult = captureOutput(process, scriptParams);
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
throw new SnailJobInnerExecutorException("[snail-job] Script execution failed", e); throw new SnailJobInnerExecutorException("[snail-job] Script execution failed", e);
} finally { } finally {
@ -170,21 +182,22 @@ public abstract class AbstractScriptExecutor {
} }
private ExecuteResult captureOutput(Process process) throws InterruptedException { private ExecuteResult captureOutput(Process process, ScriptParams scriptParams) throws InterruptedException {
StringBuilder inputBuilder = new StringBuilder(); StringBuilder inputBuilder = new StringBuilder();
StringBuilder errorBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder();
// 使用CountDownLatch来确保输入流和错误流被捕获后再进行判断 // 使用CountDownLatch来确保输入流和错误流被捕获后再进行判断
CountDownLatch latch = new CountDownLatch(2); CountDownLatch latch = new CountDownLatch(2);
Charset scriptChartset = getScriptChartset(scriptParams);
// 启动线程捕获标准输出 // 启动线程捕获标准输出
new Thread(() -> { new Thread(() -> {
captureStream(process.getInputStream(), inputBuilder); captureStream(process.getInputStream(), inputBuilder, scriptChartset);
latch.countDown(); latch.countDown();
}).start(); }).start();
// 启动线程捕获错误输出 // 启动线程捕获错误输出
new Thread(() -> { new Thread(() -> {
captureStream(process.getErrorStream(), errorBuilder); captureStream(process.getErrorStream(), errorBuilder, scriptChartset);
latch.countDown(); latch.countDown();
}).start(); }).start();
@ -200,8 +213,8 @@ public abstract class AbstractScriptExecutor {
return success ? ExecuteResult.success("Script executed successfully.") : ExecuteResult.failure("Script execution failed."); return success ? ExecuteResult.success("Script executed successfully.") : ExecuteResult.failure("Script execution failed.");
} }
private void captureStream(InputStream is, StringBuilder sb) { private void captureStream(InputStream is, StringBuilder sb, Charset charset) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, getCharset()))) { try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
String line; String line;
while ((line = br.readLine()) != null) { while ((line = br.readLine()) != null) {
sb.append(line).append(System.lineSeparator()); sb.append(line).append(System.lineSeparator());
@ -275,5 +288,6 @@ public abstract class AbstractScriptExecutor {
*/ */
private String method; private String method;
private String scriptParams; private String scriptParams;
private String charset;
} }
} }

View File

@ -10,6 +10,7 @@ import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
@Component @Component
@JobExecutor(name = "snailJobHttpExecutor") @JobExecutor(name = "snailJobHttpExecutor")
@ -18,12 +19,15 @@ public class SnailJobHttpExecutor extends AbstractHttpExecutor {
public ExecuteResult jobExecute(JobArgs jobArgs) { public ExecuteResult jobExecute(JobArgs jobArgs) {
Object jobParams = jobArgs.getJobParams(); Object jobParams = jobArgs.getJobParams();
HttpParams httpParams = JsonUtil.parseObject((String) jobParams, HttpParams.class); HttpParams httpParams = JsonUtil.parseObject((String) jobParams, HttpParams.class);
if (Objects.nonNull(jobArgs.getWfContext())){
httpParams.setWfContext(jobArgs.getWfContext());
}
httpParams.setMethod(httpParams.getMethod().toUpperCase()); httpParams.setMethod(httpParams.getMethod().toUpperCase());
Map<String, String> hashMap = new HashMap<>(3); Map<String, String> hashMap = new HashMap<>(3);
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP, snailJobProperties.getGroup()); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP, snailJobProperties.getGroup());
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP_TOKEN, snailJobProperties.getToken()); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP_TOKEN, snailJobProperties.getToken());
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_NAMESPACE, snailJobProperties.getNamespace()); hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_NAMESPACE, snailJobProperties.getNamespace());
Map<String, String> headers = (httpParams.getHeaders() == null || httpParams.getHeaders().isEmpty()) ? new HashMap<>() : httpParams.getHeaders(); Map<String, String> headers = (Objects.isNull(httpParams.getHeaders()) || httpParams.getHeaders().isEmpty()) ? new HashMap<>() : httpParams.getHeaders();
headers.putAll(hashMap); headers.putAll(hashMap);
httpParams.setHeaders(headers); httpParams.setHeaders(headers);
return process(httpParams); return process(httpParams);