Merge remote-tracking branch 'upstream/1.2.0-beta2' into client-jdk8

# Conflicts:
#	pom.xml
This commit is contained in:
dhb52 2024-10-26 23:21:13 +08:00
commit b7dbae6a7d
204 changed files with 7697 additions and 4792 deletions

93
doc/script/snail-job.bat Normal file
View File

@ -0,0 +1,93 @@
@echo off
REM snail-job.bat start|stop|restart|status
set AppName=snail-job-server-exec.jar
REM JVM options
set JVM_OPTS=-Dname=%AppName% -Duser.timezone=Asia/Shanghai -XX:+HeapDumpOnOutOfMemoryError -XX:+UseZGC
set APP_HOME=%cd%
REM Check if an action is passed, default to "start" if empty
if "%1"=="" (
echo No action provided, default to: start
set ACTION=start
) else (
set ACTION=%1
)
if "%AppName%"=="" (
echo No application name provided
exit /b 1
)
REM Check the action before executing
if /i "%ACTION%"=="start" (
call :Start
) else if /i "%ACTION%"=="stop" (
call :Stop
) else if /i "%ACTION%"=="restart" (
call :Restart
) else if /i "%ACTION%"=="status" (
call :Status
) else (
echo Invalid action. Valid actions are: {start|stop|restart|status}
exit /b 1
)
goto :eof
:Start
REM Check if the program is already running using jps
for /f "tokens=1" %%i in ('jps -l ^| findstr %AppName%') do set PID=%%i
if defined PID (
echo %AppName% is already running with PID %PID%...
) else (
start "" /b javaw %JVM_OPTS% -jar %AppName%
echo Start %AppName% success...
)
goto :eof
:Stop
set "PID="
REM Find the process using jps and stop it
for /f "tokens=1" %%i in ('jps -l ^| findstr %AppName%') do (
set "PID=%%i"
)
REM Removing any extra spaces from PID (just in case)
for /f "tokens=* delims= " %%i in ("%PID%") do set "PID=%%i"
REM Verify if PID is defined
if defined PID (
REM Using TASKKILL to kill the process
taskkill /PID %PID% /T /F
REM Check if taskkill was successful
if %errorlevel% NEQ 0 (
echo Failed to stop %AppName% with PID %PID%. Error level: %errorlevel%
) else (
echo %AppName% with PID %PID% has been stopped.
)
) else (
echo %AppName% is already stopped or not running.
)
goto :eof
:Restart
call :Stop
timeout /t 2 >nul
call :Start
goto :eof
:Status
REM Check if the program is running using jps
for /f "tokens=1" %%i in ('jps -l ^| findstr %AppName%') do set PID=%%i
if defined PID (
echo %AppName% is running with PID %PID%...
) else (
echo %AppName% is not running...
)
goto :eof

82
doc/script/snail-job.sh Normal file
View File

@ -0,0 +1,82 @@
#!/bin/sh
# ./snail-job.sh start|空 启动 stop 停止 restart 重启 status 状态
AppName=snail-job-server-exec.jar
# JVM参数
JVM_OPTS="-Dname=$AppName -Duser.timezone=Asia/Shanghai -XX:+HeapDumpOnOutOfMemoryError -XX:+UseZGC"
APP_HOME=`pwd`
# 检查传入的操作名参数,如果为空则默认设置为 "start"
if [ "$1" = "" ]; then
echo -e "\033[0;34m 未输入操作名,默认操作为: start \033[0m"
ACTION="start"
else
ACTION=$1
fi
if [ "$AppName" = "" ]; then
echo -e "\033[0;31m 未输入应用名 \033[0m"
exit 1
fi
function start() {
PID=`ps -ef |grep java|grep $AppName|grep -v grep|awk '{print $2}'`
if [ x"$PID" != x"" ]; then
echo "$AppName is running..."
else
nohup java $JVM_OPTS -jar $AppName > /dev/null 2>&1 &
echo "Start $AppName success..."
fi
}
function stop() {
echo "Stop $AppName"
PID=""
query(){
PID=`ps -ef |grep java|grep $AppName|grep -v grep|awk '{print $2}'`
}
query
if [ x"$PID" != x"" ]; then
kill -TERM $PID
echo "$AppName (pid:$PID) exiting..."
while [ x"$PID" != x"" ]
do
sleep 1
query
done
echo "$AppName exited."
else
echo "$AppName already stopped."
fi
}
function restart() {
stop
sleep 2
start
}
function status() {
PID=`ps -ef |grep java|grep $AppName|grep -v grep|wc -l`
if [ $PID != 0 ];then
echo "$AppName is running..."
else
echo "$AppName is not running..."
fi
}
case $ACTION in
start)
start;;
stop)
stop;;
restart)
restart;;
status)
status;;
*)
echo -e "\033[0;31m 无效的操作名 \033[0m \033[0;34m {start|stop|restart|status} \033[0m";;
esac

15
pom.xml
View File

@ -21,7 +21,7 @@
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<revision>1.2.0-jdk8-beta1</revision>
<revision>1.2.0-jdk8-beta2</revision>
<netty-all.version>4.1.94.Final</netty-all.version>
<hutool-all.version>5.8.25</hutool-all.version>
<mybatis-plus.version>3.5.7</mybatis-plus.version>
@ -232,9 +232,9 @@
<developers>
<developer>
<name>byteblogs</name>
<email>byteblogs@aliyun.com</email>
<url>https://github.com/byteblogs168</url>
<name>opensnail</name>
<email>snailjob@opensnail.com</email>
<url>https://gitee.com/opensnail</url>
</developer>
</developers>
@ -298,6 +298,13 @@
<goals>
<goal>sign</goal>
</goals>
<configuration>
<!-- Prevent `gpg` from using pinentry programs -->
<gpgArguments>
<arg>--pinentry-mode</arg>
<arg>loopback</arg>
</gpgArguments>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -18,6 +18,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hibernate.verion>8.0.1.Final</hibernate.verion>
</properties>
<dependencies>
<dependency>

View File

@ -1,6 +1,5 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.event.SnailChannelReconnectEvent;
import com.aizuda.snailjob.client.common.handler.ClientRegister;
import com.aizuda.snailjob.common.core.constant.SystemConstants.BEAT;

View File

@ -18,6 +18,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hibernate.verion>8.0.1.Final</hibernate.verion>
</properties>
<dependencies>
<dependency>
@ -50,6 +51,15 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>${hibernate.verion}</version>
</dependency>
</dependencies>
</project>

View File

@ -25,27 +25,31 @@ public class ThreadPoolCache {
public static ThreadPoolExecutor createThreadPool(Long taskBatchId, int parallelNum) {
if (CACHE_THREAD_POOL.containsKey(taskBatchId)) {
ThreadPoolExecutor cacheThreadPool = CACHE_THREAD_POOL.get(taskBatchId);
if (cacheThreadPool.getCorePoolSize() == parallelNum) {
// 大于1说明已经更新了线程池的线程数为了防止后面任务执行过程任务并行度改变影响已经产生的批次这里不再做更新操作
if (cacheThreadPool.getCorePoolSize() > 1) {
return cacheThreadPool;
}
// 若能执行到这里只有分片任务(静态分片MAPMapReduce)才会需要多线程支持
cacheThreadPool.setCorePoolSize(parallelNum);
cacheThreadPool.setMaximumPoolSize(parallelNum);
cacheThreadPool.setCorePoolSize(Math.min(parallelNum, cacheThreadPool.getMaximumPoolSize()));
return cacheThreadPool;
}
Supplier<ThreadPoolExecutor> supplier = () -> {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 默认情况先只设置一个线程, 只有分片任务(静态分片MAPMapReduce)才会需要多线程支持
1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
1, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000),
new CustomizableThreadFactory(MessageFormat.format("snail-job-job-{0}-", taskBatchId)));
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
};
ThreadPoolExecutor threadPoolExecutor = supplier.get();
CACHE_THREAD_POOL.putIfAbsent(taskBatchId, threadPoolExecutor);
ThreadPoolExecutor cacheThreadPoolExecutor = CACHE_THREAD_POOL.putIfAbsent(taskBatchId, threadPoolExecutor);
if (Objects.nonNull(cacheThreadPoolExecutor) && cacheThreadPoolExecutor != threadPoolExecutor) {
cacheThreadPoolExecutor.setCorePoolSize(Math.min(parallelNum, cacheThreadPoolExecutor.getMaximumPoolSize()));
return cacheThreadPoolExecutor;
}
return threadPoolExecutor;
}

View File

@ -114,6 +114,8 @@ public class JobEndPoint {
private static JobContext buildJobContext(DispatchJobRequest dispatchJob) {
JobContext jobContext = new JobContext();
jobContext.setJobId(dispatchJob.getJobId());
jobContext.setShardingTotal(dispatchJob.getShardingTotal());
jobContext.setShardingIndex(dispatchJob.getShardingIndex());
jobContext.setNamespaceId(dispatchJob.getNamespaceId());
jobContext.setTaskId(dispatchJob.getTaskId());
jobContext.setTaskBatchId(dispatchJob.getTaskBatchId());

View File

@ -0,0 +1,117 @@
package com.aizuda.snailjob.client.job.core.dto;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author opensnail
* @date 2023-10-11 22:30:00
* @since 2.4.0
*/
@Data
public class JobResponseVO {
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 重试状态 0关闭1开启
*/
private Integer jobStatus;
/**
* 执行器路由策略
*/
private Integer routeKey;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorInfo;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
/**
* 任务类型
*/
private Integer taskType;
/**
* 并行数
*/
private Integer parallelNum;
/**
* 描述
*/
private String description;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 修改时间
*/
private LocalDateTime updateDt;
}

View File

@ -0,0 +1,114 @@
package com.aizuda.snailjob.client.job.core.dto;
import com.aizuda.snailjob.client.job.core.handler.add.Add;
import com.aizuda.snailjob.client.job.core.handler.update.Update;
import com.aizuda.snailjob.client.job.core.handler.update.UpdateHandler;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@Data
public class RequestAddOrUpdateJobDTO {
@NotNull(message = "id 不能为空", groups = Update.class)
private Long id;
/**
* 名称
*/
@NotBlank(message = "jobName 不能为空", groups = Add.class)
private String jobName;
/**
* 重试状态 0关闭1开启
* {@link StatusEnum}
*/
@NotNull(message = "jobStatus 不能为空", groups = Add.class)
private Integer jobStatus;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private Integer argsType;
/**
* 执行器路由策略
*/
@NotNull(message = "routeKey 不能为空", groups = Add.class)
private Integer routeKey;
/**
* 执行器类型
* {@link ExecutorTypeEnum}
*/
@NotNull(message = "executorType 不能为空", groups = Add.class)
private Integer executorType;
/**
* 执行器名称
*/
@NotBlank(message = "executorInfo 不能为空", groups = Add.class)
private String executorInfo;
/**
* 触发类型 2. 固定时间 3.CRON 表达式 99.工作流
*/
@NotNull(message = "triggerType 不能为空", groups = Add.class)
private Integer triggerType;
/**
* 间隔时长
*/
@NotNull(message = "triggerInterval 不能为空", groups = Add.class)
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
@NotNull(message = "blockStrategy 不能为空", groups = Add.class)
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
@NotNull(message = "executorTimeout 不能为空", groups = Add.class)
private Integer executorTimeout;
/**
* 最大重试次数
*/
@NotNull(message = "maxRetryTimes 不能为空", groups = Add.class)
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
@NotNull(message = "retryInterval 不能为空", groups = Add.class)
private Integer retryInterval;
/**
* 任务类型
* {@link JobTaskTypeEnum}
*/
@NotNull(message = "taskType 不能为空", groups = Add.class)
private Integer taskType;
/**
* 并行数
*/
@NotNull(message = "parallelNum 不能为空", groups = Add.class)
private Integer parallelNum;
/**
* 描述
*/
private String description;
}

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.client.job.core.dto;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* @author opensnail
* @date 2023-10-15 16:06:20
* @since 2.4.0
*/
@Data
public class RequestUpdateStatusDTO {
@NotNull(message = "id 不能为空")
private Long id;
@NotNull(message = "jobStatus 不能为空")
private Integer jobStatus;
}

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.client.job.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum AllocationAlgorithmEnum {
// Hash
CONSISTENT_HASH(1),
// 随机
RANDOM(2),
// LRU
LRU(3),
// 轮询
ROUND(4),
// 匹配第一个
FIRST(5),
// 匹配最后一个
LAST(6);
private final int type;
}

View File

@ -0,0 +1,14 @@
package com.aizuda.snailjob.client.job.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum TriggerTypeEnum {
SCHEDULED_TIME(2),
CRON(3),
WORK_FLOW(99);
private final int type;
}

View File

@ -11,7 +11,9 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import lombok.Data;
import org.springframework.util.StringUtils;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
public abstract class AbstractHttpExecutor {
@ -93,7 +95,7 @@ public abstract class AbstractHttpExecutor {
private void setDefaultTimeout(HttpParams httpParams) {
// 使用milliseconds
httpParams.setTimeout(httpParams.getTimeout() == null ? DEFAULT_TIMEOUT * 1000 : httpParams.getTimeout() * 1000);
httpParams.setTimeout(Objects.isNull(httpParams.getTimeout()) ? DEFAULT_TIMEOUT * 1000 : httpParams.getTimeout() * 1000);
}
@ -114,11 +116,18 @@ public abstract class AbstractHttpExecutor {
break;
}
if (httpParams.getHeaders() != null) {
if (Objects.nonNull(httpParams.getHeaders())) {
httpParams.getHeaders().forEach(request::header);
}
// 有上下文时在请求中透传上下文;即工作流中支持上下文的传递
if ( Objects.nonNull(httpParams.getWfContext())) {
httpParams.getWfContext().forEach((key, value) -> {
String headerValue = (value instanceof String) ? (String) value : JsonUtil.toJsonString(value);
request.header(key, headerValue);
});
}
if (httpParams.getBody() != null && (httpParams.getMethod().equals(POST_REQUEST_METHOD)
if (Objects.nonNull(httpParams.getBody()) && (httpParams.getMethod().equals(POST_REQUEST_METHOD)
|| httpParams.getMethod().equals(PUT_REQUEST_METHOD)
|| httpParams.getMethod().equals(DELETE_REQUEST_METHOD))) {
request.body(httpParams.getBody(), httpParams.getMediaType());
@ -137,6 +146,7 @@ public abstract class AbstractHttpExecutor {
private String body;
private Map<String, String> headers;
private Integer timeout;
private Map<String, Object> wfContext;
}
// Logging methods

View File

@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -43,7 +44,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
public void jobExecute(JobContext jobContext) {
// 创建可执行的任务
ThreadPoolExecutor threadPool = ThreadPoolCache.createThreadPool(jobContext.getTaskBatchId(), jobContext.getParallelNum());
Integer parallelNum = Optional.ofNullable(jobContext.getParallelNum()).orElse(1);
ThreadPoolExecutor threadPool = ThreadPoolCache.createThreadPool(jobContext.getTaskBatchId(), Math.max(1, parallelNum));
ListeningExecutorService decorator = MoreExecutors.listeningDecorator(threadPool);
// 将任务添加到时间轮中到期停止任务

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.job.core.executor;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
@ -18,6 +19,7 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
@Override
public ExecuteResult doJobExecute(final JobArgs jobArgs) {
JobContext jobContext = JobContextManager.getJobContext();
Assert.notNull(jobContext.getMrStage(), "请确认服务器当前定时任务的[任务类型]为MapReduce");
if (jobContext.getMrStage().equals(MapReduceStageEnum.MAP.getStage())) {
return super.doJobExecute(jobArgs);
} else if (jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) {

View File

@ -50,7 +50,7 @@ public abstract class AbstractScriptExecutor {
setScriptPermissions(scriptPath);
}
return executeScript(scriptPath);
return executeScript(scriptPath, scriptParams);
}
private String prepareScriptFile(Long jobId, ScriptParams scriptParams) {
@ -76,7 +76,7 @@ public abstract class AbstractScriptExecutor {
case SCRIPT_SCRIPT_CODE_METHOD:
// 是否直接写入代码
try {
writeScriptContent(script, scriptParams.getScriptParams());
writeScriptContent(script, scriptParams);
} catch (IOException e) {
throw new SnailJobInnerExecutorException("[snail-job] Failed to write script", e);
}
@ -121,13 +121,25 @@ public abstract class AbstractScriptExecutor {
}
}
private void writeScriptContent(File script, String processorInfo) throws IOException {
try (BufferedWriter writer = Files.newBufferedWriter(script.toPath(), getCharset())) {
writer.write(processorInfo);
private void writeScriptContent(File script, ScriptParams scriptParams) throws IOException {
try (BufferedWriter writer = Files.newBufferedWriter(script.toPath(), getScriptChartset(scriptParams))) {
writer.write(scriptParams.getScriptParams());
logInfo("Script content written successfully to: {}", script.getAbsolutePath());
}
}
private Charset getScriptChartset(ScriptParams scriptParams) {
String charsetName = scriptParams.getCharset();
if (StrUtil.isNotBlank(charsetName)) {
try {
return Charset.forName(charsetName);
} catch (Exception e) {
logWarn("[snail-job] Invalid charset:{} . Using default charset.", charsetName);
}
}
return getCharset();
}
private void setScriptPermissions(String scriptPath) {
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
try {
@ -138,14 +150,14 @@ public abstract class AbstractScriptExecutor {
logInfo("chmod 755 authorization complete, ready to start execution~");
}
private ExecuteResult executeScript(String scriptPath) {
private ExecuteResult executeScript(String scriptPath, ScriptParams scriptParams) {
ProcessBuilder pb = getScriptProcessBuilder(scriptPath);
Process process = null;
ExecuteResult executeResult;
try {
process = pb.start();
executeResult = captureOutput(process);
executeResult = captureOutput(process, scriptParams);
} catch (IOException | InterruptedException e) {
throw new SnailJobInnerExecutorException("[snail-job] Script execution failed", e);
} finally {
@ -170,21 +182,22 @@ public abstract class AbstractScriptExecutor {
}
private ExecuteResult captureOutput(Process process) throws InterruptedException {
private ExecuteResult captureOutput(Process process, ScriptParams scriptParams) throws InterruptedException {
StringBuilder inputBuilder = new StringBuilder();
StringBuilder errorBuilder = new StringBuilder();
// 使用CountDownLatch来确保输入流和错误流被捕获后再进行判断
CountDownLatch latch = new CountDownLatch(2);
Charset scriptChartset = getScriptChartset(scriptParams);
// 启动线程捕获标准输出
new Thread(() -> {
captureStream(process.getInputStream(), inputBuilder);
captureStream(process.getInputStream(), inputBuilder, scriptChartset);
latch.countDown();
}).start();
// 启动线程捕获错误输出
new Thread(() -> {
captureStream(process.getErrorStream(), errorBuilder);
captureStream(process.getErrorStream(), errorBuilder, scriptChartset);
latch.countDown();
}).start();
@ -200,8 +213,8 @@ public abstract class AbstractScriptExecutor {
return success ? ExecuteResult.success("Script executed successfully.") : ExecuteResult.failure("Script execution failed.");
}
private void captureStream(InputStream is, StringBuilder sb) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, getCharset()))) {
private void captureStream(InputStream is, StringBuilder sb, Charset charset) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
String line;
while ((line = br.readLine()) != null) {
sb.append(line).append(System.lineSeparator());
@ -275,5 +288,6 @@ public abstract class AbstractScriptExecutor {
*/
private String method;
private String scriptParams;
private String charset;
}
}

View File

@ -103,6 +103,10 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
@Override
public void onFailure(final Throwable t) {
if (t instanceof CancellationException) {
SnailJobLog.LOCAL.debug("任务已经被取消,不做状态回传");
return;
}
ExecuteResult failure = ExecuteResult.failure();
try {
// 初始化调度信息日志上报LogUtil
@ -110,12 +114,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
// 上报执行失败
SnailJobLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
if (t instanceof CancellationException) {
failure.setMessage("任务被取消");
} else {
failure.setMessage(t.getMessage());
}
failure.setMessage(t.getMessage());
CLIENT.dispatchResult(
buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus())

View File

@ -84,7 +84,7 @@ public final class MapInvokeHandler implements InvocationHandler {
// 2. 同步发送请求
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);
if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) {
if (StatusEnum.YES.getStatus() == result.getStatus() || result.getData()) {
SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId());
} else {
throw new SnailJobMapReduceException("map failed for task: " + nextTaskName);

View File

@ -10,6 +10,7 @@ import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Component
@JobExecutor(name = "snailJobHttpExecutor")
@ -18,12 +19,15 @@ public class SnailJobHttpExecutor extends AbstractHttpExecutor {
public ExecuteResult jobExecute(JobArgs jobArgs) {
Object jobParams = jobArgs.getJobParams();
HttpParams httpParams = JsonUtil.parseObject((String) jobParams, HttpParams.class);
if (Objects.nonNull(jobArgs.getWfContext())){
httpParams.setWfContext(jobArgs.getWfContext());
}
httpParams.setMethod(httpParams.getMethod().toUpperCase());
Map<String, String> hashMap = new HashMap<>(3);
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP, snailJobProperties.getGroup());
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_GROUP_TOKEN, snailJobProperties.getToken());
hashMap.put(SystemConstants.SNAIL_JOB_CLIENT_NAMESPACE, snailJobProperties.getNamespace());
Map<String, String> headers = (httpParams.getHeaders() == null || httpParams.getHeaders().isEmpty()) ? new HashMap<>() : httpParams.getHeaders();
Map<String, String> headers = (Objects.isNull(httpParams.getHeaders()) || httpParams.getHeaders().isEmpty()) ? new HashMap<>() : httpParams.getHeaders();
headers.putAll(hashMap);
httpParams.setHeaders(headers);
return process(httpParams);

View File

@ -0,0 +1,254 @@
package com.aizuda.snailjob.client.job.core.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.RequestAddOrUpdateJobDTO;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum;
import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import lombok.Getter;
import lombok.Setter;
import org.checkerframework.checker.units.qual.C;
import java.util.HashMap;
import java.util.Map;
import static com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum.*;
/**
* @author opensnail
* @date 2024-10-19 22:34:38
* @since sj_1.2.0
*/
public abstract class AbstractParamsHandler<H, R> extends AbstractRequestHandler<R> {
protected static final String SHARD_NUM = "shardNum";
@Getter
private final RequestAddOrUpdateJobDTO reqDTO;
@Setter
private H r;
public AbstractParamsHandler(JobTaskTypeEnum taskType) {
this.reqDTO = new RequestAddOrUpdateJobDTO();
// 默认创建就开启
reqDTO.setJobStatus(StatusEnum.YES.getStatus());
// 设置任务类型
reqDTO.setTaskType(taskType.getType());
// 默认java
reqDTO.setExecutorType(ExecutorTypeEnum.JAVA.getType());
}
protected H setId(Long id) {
reqDTO.setId(id);
return r;
}
/**
* 修改时会直接覆盖之前的任务参数
* 修改参数
*
* @param argsStr
* @return
*/
private H setArgsStr(Map<String, Object> argsStr) {
Map<String, Object> args = new HashMap<>();
if (StrUtil.isNotBlank(reqDTO.getArgsStr())) {
args = JsonUtil.parseHashMap(reqDTO.getArgsStr());
}
args.putAll(argsStr);
reqDTO.setArgsStr(JsonUtil.toJsonString(args));
reqDTO.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
return r;
}
/**
* 修改Reduce的分片数
* 只允许MAP_REDUCE设置
*
* @param shardNum
* @return
*/
protected H setShardNum(Integer shardNum) {
// 设置分片
if (shardNum != null) {
Map<String, Object> map = new HashMap<>(1);
map.put(SHARD_NUM, shardNum);
setArgsStr(map);
}
return r;
}
/**
* 设置任务名
*
* @param jobName 任务名
* @return
*/
public H setJobName(String jobName) {
reqDTO.setJobName(jobName);
return r;
}
/**
* 添加参数可支持多次添加
* 静态分片不可使用该方法
*
* @param argsKey 参数名
* @param argsValue 参数值
* @return
*/
protected H addArgsStr(String argsKey, Object argsValue) {
Map<String, Object> map = new HashMap<>();
if (StrUtil.isNotBlank(reqDTO.getArgsStr())) {
map = JsonUtil.parseHashMap(reqDTO.getArgsStr());
}
map.put(argsKey, argsValue);
reqDTO.setArgsStr(JsonUtil.toJsonString(map));
reqDTO.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
return r;
}
/**
* 添加静态分片相关参数
*
* @param shardingValue 分片参数
* @return r
*/
protected H addShardingArgs(String... shardingValue) {
reqDTO.setArgsStr(JsonUtil.toJsonString(shardingValue));
reqDTO.setArgsType(JobArgsTypeEnum.TEXT.getArgsType());
return r;
}
/**
* 设置路由
*
* @param algorithmEnum 路由算法
* @return r
*/
protected H setRouteKey(AllocationAlgorithmEnum algorithmEnum) {
reqDTO.setRouteKey(algorithmEnum.getType());
return r;
}
/**
* 设置执行器信息
*
* @param executorInfo 执行器信息
* @return r
*/
public H setExecutorInfo(String executorInfo) {
reqDTO.setExecutorInfo(executorInfo);
return r;
}
/**
* 设置调度类型
*
* @param triggerType 触发类型
* @return r
*/
public H setTriggerType(TriggerTypeEnum triggerType) {
reqDTO.setTriggerType(triggerType.getType());
return r;
}
/**
* 设置触发间隔
* 单位
* 注意: 此方法必须满足triggerType==SCHEDULED_TIME
*
* @param triggerInterval 触发间隔
* @return r
*/
public H setTriggerInterval(Integer triggerInterval) {
Assert.isTrue(reqDTO.getTriggerType() == SCHEDULED_TIME.getType(),
() -> new SnailJobClientException("此方法只限制固定时间使用"));
setTriggerInterval(String.valueOf(triggerInterval));
return r;
}
/**
* 设置触发间隔
* 单位
* 工作流无需配置
*
* @param triggerInterval 触发间隔
* @return r
*/
public H setTriggerInterval(String triggerInterval) {
// 若是工作流则没有调度时间
Assert.isFalse(reqDTO.getTriggerType() == WORK_FLOW.getType(),
() -> new SnailJobClientException("工作流无需配置"));
reqDTO.setTriggerInterval(triggerInterval);
return r;
}
/**
* 设置阻塞策略
*
* @param blockStrategy 阻塞策略
* @return r
*/
public H setBlockStrategy(BlockStrategyEnum blockStrategy) {
reqDTO.setBlockStrategy(blockStrategy.getBlockStrategy());
return r;
}
/**
* 设置执行器超时时间
*
* @param executorTimeout 超时时间(单位:)
* @return r
*/
public H setExecutorTimeout(Integer executorTimeout) {
reqDTO.setExecutorTimeout(executorTimeout);
return r;
}
/**
* 设置任务最大重试次数
*
* @param maxRetryTimes 最大超时时间
* @return r
*/
public H setMaxRetryTimes(Integer maxRetryTimes) {
reqDTO.setMaxRetryTimes(maxRetryTimes);
return r;
}
/**
* 设置重试间隔
*
* @param retryInterval 重试间隔
* @return r
*/
public H setRetryInterval(Integer retryInterval) {
reqDTO.setRetryInterval(retryInterval);
return r;
}
/**
* 设置并发数量
*
* @param parallelNum 并发数量
* @return r
*/
protected H setParallelNum(Integer parallelNum) {
reqDTO.setParallelNum(parallelNum);
return r;
}
/**
* 设置定时任务描述
*
* @param description 任务描述
* @return r
*/
public H setDescription(String description) {
reqDTO.setDescription(description);
return r;
}
}

View File

@ -0,0 +1,38 @@
package com.aizuda.snailjob.client.job.core.handler;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
/**
* @author opensnail
* @date 2024-09-29 20:40:10
* @since sj_1.1.0
*/
public abstract class AbstractRequestHandler<R> implements RequestHandler<R> {
protected static final String SHARD_NUM = "shardNum";
/**
* 具体调用
* @return
*/
@Override
public R execute() {
Pair<Boolean, String> checked = checkRequest();
if (checked.getKey()) {
beforeExecute();
R r = doExecute();
afterExecute(r);
return r;
} else {
throw new SnailJobClientException("snail job openapi check error. [{}]", checked.getValue());
}
}
protected abstract void afterExecute(R r);
protected abstract void beforeExecute();
protected abstract R doExecute();
protected abstract Pair<Boolean, String> checkRequest();
}

View File

@ -0,0 +1,16 @@
package com.aizuda.snailjob.client.job.core.handler;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.job.core.openapi.OpenApiClient;
import com.aizuda.snailjob.common.core.model.NettyResult;
public interface RequestHandler<R> {
OpenApiClient client = RequestBuilder.<OpenApiClient, NettyResult>newBuilder()
.client(OpenApiClient.class)
.async(false)
.build();
R execute();
}

View File

@ -0,0 +1,9 @@
package com.aizuda.snailjob.client.job.core.handler.add;
/**
* @author opensnail
* @date 2024-10-20 11:59:02
* @since sj_1.2.0
*/
public interface Add {
}

View File

@ -0,0 +1,48 @@
package com.aizuda.snailjob.client.job.core.handler.add;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.handler.AbstractParamsHandler;
import com.aizuda.snailjob.client.job.core.util.ValidatorUtils;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import static com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum.WORK_FLOW;
public abstract class AddHandler<H> extends AbstractParamsHandler<H, Long> {
public AddHandler(JobTaskTypeEnum taskType) {
super(taskType);
}
@Override
protected Long doExecute() {
Result<Object> result = client.addJob(getReqDTO());
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
String data = JsonUtil.toJsonString(result.getData());
return Long.valueOf(data);
}
@Override
protected void beforeExecute() {
// 此次是兜底覆盖,工作流是没有调度时间
if (getReqDTO().getTriggerType() == WORK_FLOW.getType()) {
setTriggerInterval("*");
}
}
@Override
protected void afterExecute(Long id) {
}
@Override
protected Pair<Boolean, String> checkRequest() {
return ValidatorUtils.validateEntity(Add.class, getReqDTO());
}
}

View File

@ -0,0 +1,30 @@
package com.aizuda.snailjob.client.job.core.handler.add;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
/**
* @author opensnail
* @date 2024-10-19 12:25:49
* @since sj_1.1.0
*/
public class BroadcastAddHandler extends AddHandler<BroadcastAddHandler> {
public BroadcastAddHandler() {
this(JobTaskTypeEnum.BROADCAST);
}
public BroadcastAddHandler(JobTaskTypeEnum taskType) {
super(taskType);
// 广播模式只允许并发为 1
setParallelNum(1);
// 广播模式采用轮询模式
setRouteKey(AllocationAlgorithmEnum.ROUND);
setR(this);
}
@Override
public BroadcastAddHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,34 @@
package com.aizuda.snailjob.client.job.core.handler.add;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
/**
* @author opensnail
* @date 2024-10-19 12:25:49
* @since sj_1.2.0
*/
public class ClusterAddHandler extends AddHandler<ClusterAddHandler> {
public ClusterAddHandler() {
this(JobTaskTypeEnum.CLUSTER);
}
public ClusterAddHandler(JobTaskTypeEnum taskType) {
super(taskType);
// 集群模式只允许并发为 1
setParallelNum(1);
setR(this);
}
@Override
public ClusterAddHandler setRouteKey(AllocationAlgorithmEnum algorithmEnum) {
super.setRouteKey(algorithmEnum);
return this;
}
@Override
public ClusterAddHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,32 @@
package com.aizuda.snailjob.client.job.core.handler.add;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
/**
* @author opensnail
* @date 2024-10-19 12:25:49
* @since sj_1.2.0
*/
public class MapAddHandler extends AddHandler<MapAddHandler> {
public MapAddHandler() {
this(JobTaskTypeEnum.MAP);
}
public MapAddHandler(JobTaskTypeEnum taskType) {
super(taskType);
setRouteKey(AllocationAlgorithmEnum.ROUND);
setR(this);
}
@Override
public MapAddHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
@Override
public MapAddHandler setParallelNum(Integer parallelNum) {
return super.setParallelNum(parallelNum);
}
}

View File

@ -0,0 +1,38 @@
package com.aizuda.snailjob.client.job.core.handler.add;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.client.job.core.handler.update.MapReduceUpdateHandler;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
/**
* @author opensnail
* @date 2024-10-19 12:25:49
* @since sj_1.2.0
*/
public class MapReduceAddHandler extends AddHandler<MapReduceAddHandler> {
public MapReduceAddHandler() {
this(JobTaskTypeEnum.MAP_REDUCE);
}
public MapReduceAddHandler(JobTaskTypeEnum taskType) {
super(taskType);
setRouteKey(AllocationAlgorithmEnum.ROUND);
setR(this);
}
@Override
public MapReduceAddHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
@Override
public MapReduceAddHandler setParallelNum(Integer parallelNum) {
return super.setParallelNum(parallelNum);
}
@Override
public MapReduceAddHandler setShardNum(Integer shardNum) {
return super.setShardNum(shardNum);
}
}

View File

@ -0,0 +1,32 @@
package com.aizuda.snailjob.client.job.core.handler.add;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
/**
* @author opensnail
* @date 2024-10-19 12:25:49
* @since sj_1.2.0
*/
public class ShardingAddHandler extends AddHandler<ShardingAddHandler> {
public ShardingAddHandler() {
this(JobTaskTypeEnum.SHARDING);
}
public ShardingAddHandler(JobTaskTypeEnum taskType) {
super(taskType);
setRouteKey(AllocationAlgorithmEnum.ROUND);
setR(this);
}
@Override
public ShardingAddHandler addShardingArgs(String... shardingValue) {
return super.addShardingArgs(shardingValue);
}
@Override
public ShardingAddHandler setParallelNum(Integer parallelNum) {
return super.setParallelNum(parallelNum);
}
}

View File

@ -0,0 +1,47 @@
package com.aizuda.snailjob.client.job.core.handler.query;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.JobResponseVO;
import com.aizuda.snailjob.client.job.core.handler.AbstractRequestHandler;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import java.util.Objects;
public class RequestQueryHandler extends AbstractRequestHandler<JobResponseVO> {
private final Long queryJobId;
public RequestQueryHandler(Long queryJobId) {
this.queryJobId = queryJobId;
}
@Override
protected void afterExecute(JobResponseVO jobResponseVO) {
}
@Override
protected void beforeExecute() {
}
@Override
protected JobResponseVO doExecute() {
Result<Object> result = client.getJobDetail(queryJobId);
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
Object data = result.getData();
Assert.isTrue(Objects.nonNull(data), () -> new SnailJobClientException("获取[{}]任务详情失败", queryJobId));
return JsonUtil.parseObject(JsonUtil.toJsonString(data), JobResponseVO.class);
}
@Override
protected Pair<Boolean, String> checkRequest() {
return Pair.of(queryJobId != null && !Long.valueOf(0).equals(queryJobId), "queryJobId不能为null并且必须大于0");
}
}

View File

@ -0,0 +1,40 @@
package com.aizuda.snailjob.client.job.core.handler.trigger;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.handler.AbstractRequestHandler;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
public class TriggerJobHandler extends AbstractRequestHandler<Boolean> {
private final Long triggerJobId;
public TriggerJobHandler(Long triggerJobId) {
this.triggerJobId = triggerJobId;
}
@Override
protected void afterExecute(Boolean aBoolean) {
}
@Override
protected void beforeExecute() {
}
@Override
protected Boolean doExecute() {
Result<Object> result = client.triggerJob(triggerJobId);
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
return (Boolean)result.getData();
}
@Override
protected Pair<Boolean, String> checkRequest() {
return Pair.of(triggerJobId != null && !Long.valueOf(0).equals(triggerJobId), "triggerJobId不能为null并且必须大于0");
}
}

View File

@ -0,0 +1,39 @@
package com.aizuda.snailjob.client.job.core.handler.trigger;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.handler.AbstractRequestHandler;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
public class TriggerWorkflowHandler extends AbstractRequestHandler<Boolean> {
private final Long triggerJobId;
public TriggerWorkflowHandler(Long triggerJobId) {
this.triggerJobId = triggerJobId;
}
@Override
protected void afterExecute(Boolean aBoolean) {
}
@Override
protected void beforeExecute() {
}
@Override
protected Boolean doExecute() {
Result<Object> result = client.triggerWorkFlow(triggerJobId);
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
return (Boolean) result.getData();
}
@Override
protected Pair<Boolean, String> checkRequest() {
return Pair.of(triggerJobId != null && !Long.valueOf(0).equals(triggerJobId), "triggerJobId不能为null并且必须大于0");
}
}

View File

@ -0,0 +1,16 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
public class BroadcastUpdateHandler extends UpdateHandler<BroadcastUpdateHandler>{
public BroadcastUpdateHandler(Long jobId) {
super(JobTaskTypeEnum.BROADCAST, jobId);
setR(this);
}
@Override
public BroadcastUpdateHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,22 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
public class ClusterUpdateHandler extends UpdateHandler<ClusterUpdateHandler> {
public ClusterUpdateHandler(Long jobId) {
super(JobTaskTypeEnum.CLUSTER, jobId);
setR(this);
}
@Override
public ClusterUpdateHandler setRouteKey(AllocationAlgorithmEnum algorithmEnum) {
return super.setRouteKey(algorithmEnum);
}
@Override
public ClusterUpdateHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
}

View File

@ -0,0 +1,26 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
public class MapReduceUpdateHandler extends UpdateHandler<MapReduceUpdateHandler> {
public MapReduceUpdateHandler(Long jobId) {
super(JobTaskTypeEnum.MAP_REDUCE, jobId);
setR(this);
}
@Override
public MapReduceUpdateHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
@Override
public MapReduceUpdateHandler setParallelNum(Integer parallelNum) {
return super.setParallelNum(parallelNum);
}
@Override
public MapReduceUpdateHandler setShardNum(Integer shardNum) {
return super.setShardNum(shardNum);
}
}

View File

@ -0,0 +1,22 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
public class MapUpdateHandler extends UpdateHandler<MapUpdateHandler> {
public MapUpdateHandler(Long jobId) {
super(JobTaskTypeEnum.MAP, jobId);
setR(this);
}
@Override
public MapUpdateHandler addArgsStr(String argsKey, Object argsValue) {
return super.addArgsStr(argsKey, argsValue);
}
@Override
public MapUpdateHandler setParallelNum(Integer parallelNum) {
return super.setParallelNum(parallelNum);
}
}

View File

@ -0,0 +1,21 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
public class ShardingUpdateHandler extends UpdateHandler<ShardingUpdateHandler>{
public ShardingUpdateHandler(Long jobId) {
super(JobTaskTypeEnum.SHARDING, jobId);
setR(this);
}
@Override
public ShardingUpdateHandler addShardingArgs(String... shardingValue) {
return super.addShardingArgs(shardingValue);
}
@Override
public ShardingUpdateHandler setParallelNum(Integer parallelNum) {
return super.setParallelNum(parallelNum);
}
}

View File

@ -0,0 +1,9 @@
package com.aizuda.snailjob.client.job.core.handler.update;
/**
* @author opensnail
* @date 2024-10-20 11:59:02
* @since sj_1.2.0
*/
public interface Update {
}

View File

@ -0,0 +1,49 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum;
import com.aizuda.snailjob.client.job.core.handler.AbstractParamsHandler;
import com.aizuda.snailjob.client.job.core.util.ValidatorUtils;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
public abstract class UpdateHandler<H> extends AbstractParamsHandler<H, Boolean> {
public UpdateHandler(JobTaskTypeEnum typeEnum, Long jobId) {
super(typeEnum);
// 更新必须要id
setId(jobId);
}
@Override
protected void afterExecute(Boolean aBoolean) {
}
@Override
protected void beforeExecute() {
if (getReqDTO().getTriggerType() == TriggerTypeEnum.WORK_FLOW.getType()) {
// 工作流没有调度时间
setTriggerInterval("*");
}
}
@Override
protected Boolean doExecute() {
Result<Object> result = client.updateJob(getReqDTO());
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
return (Boolean) result.getData();
}
@Override
protected Pair<Boolean, String> checkRequest() {
return ValidatorUtils.validateEntity(Update.class, getReqDTO());
}
}

View File

@ -0,0 +1,65 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO;
import com.aizuda.snailjob.client.job.core.handler.AbstractRequestHandler;
import com.aizuda.snailjob.client.job.core.util.ValidatorUtils;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
public class UpdateJobStatusHandler extends AbstractRequestHandler<Boolean> {
private final RequestUpdateStatusDTO statusDTO;
public UpdateJobStatusHandler(Long id) {
this.statusDTO = new RequestUpdateStatusDTO();
setId(id);
}
@Override
protected void afterExecute(Boolean aBoolean) {
}
@Override
protected void beforeExecute() {
}
@Override
protected Boolean doExecute() {
Result<Object> result = client.updateJobStatus(statusDTO);
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
return (Boolean) result.getData();
}
@Override
protected Pair<Boolean, String> checkRequest() {
return ValidatorUtils.validateEntity(statusDTO);
}
/**
* 设置任务/工作流ID
*
* @param id
* @return
*/
private UpdateJobStatusHandler setId(Long id) {
this.statusDTO.setId(id);
return this;
}
/**
* 设置状态
*
* @param status
* @return
*/
public UpdateJobStatusHandler setStatus(StatusEnum status) {
this.statusDTO.setJobStatus(status.getStatus());
return this;
}
}

View File

@ -0,0 +1,65 @@
package com.aizuda.snailjob.client.job.core.handler.update;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO;
import com.aizuda.snailjob.client.job.core.handler.AbstractRequestHandler;
import com.aizuda.snailjob.client.job.core.util.ValidatorUtils;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.Result;
public class UpdateWorkflowStatusHandler extends AbstractRequestHandler<Boolean> {
private final RequestUpdateStatusDTO statusDTO;
public UpdateWorkflowStatusHandler(Long id) {
this.statusDTO = new RequestUpdateStatusDTO();
setId(id);
}
@Override
protected void afterExecute(Boolean aBoolean) {
}
@Override
protected void beforeExecute() {
}
@Override
protected Boolean doExecute() {
Result<Object> result = client.updateWorkFlowStatus(statusDTO);
Assert.isTrue(StatusEnum.YES.getStatus() == result.getStatus(),
() -> new SnailJobClientException(result.getMessage()));
return (Boolean) result.getData();
}
@Override
protected Pair<Boolean, String> checkRequest() {
return ValidatorUtils.validateEntity(statusDTO);
}
/**
* 设置任务/工作流ID
*
* @param id
* @return
*/
private UpdateWorkflowStatusHandler setId(Long id) {
this.statusDTO.setId(id);
return this;
}
/**
* 设置状态
*
* @param status
* @return
*/
public UpdateWorkflowStatusHandler setStatus(StatusEnum status) {
this.statusDTO.setJobStatus(status.getStatus());
return this;
}
}

View File

@ -0,0 +1,30 @@
package com.aizuda.snailjob.client.job.core.openapi;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.job.core.dto.RequestAddOrUpdateJobDTO;
import com.aizuda.snailjob.client.job.core.dto.RequestUpdateStatusDTO;
import com.aizuda.snailjob.common.core.model.Result;
public interface OpenApiClient {
@Mapping(method = RequestMethod.POST, path = "/api/job/add")
Result<Object> addJob(RequestAddOrUpdateJobDTO requestAddOrUpdateJobDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/update")
Result<Object> updateJob(RequestAddOrUpdateJobDTO requestUpdateJobDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/getJobDetail")
Result<Object> getJobDetail(Long jobId);
@Mapping(method = RequestMethod.POST, path = "/api/job/triggerJob")
Result<Object> triggerJob(Long triggerId);
@Mapping(method = RequestMethod.POST, path = "/api/job/triggerWorkFlow")
Result<Object> triggerWorkFlow(Long triggerId);
@Mapping(method = RequestMethod.POST, path = "/api/job/updateJobStatus")
Result<Object> updateJobStatus(RequestUpdateStatusDTO statusDTO);
@Mapping(method = RequestMethod.POST, path = "/api/job/updateWorkFlowStatus")
Result<Object> updateWorkFlowStatus(RequestUpdateStatusDTO statusDTO);
}

View File

@ -0,0 +1,162 @@
package com.aizuda.snailjob.client.job.core.openapi;
import com.aizuda.snailjob.client.job.core.handler.add.*;
import com.aizuda.snailjob.client.job.core.handler.query.RequestQueryHandler;
import com.aizuda.snailjob.client.job.core.handler.trigger.TriggerJobHandler;
import com.aizuda.snailjob.client.job.core.handler.trigger.TriggerWorkflowHandler;
import com.aizuda.snailjob.client.job.core.handler.update.*;
/**
* @author opensnail
* @date 2024-09-21 21:35:34
* @since sj_1.1.0
*/
public final class SnailJobOpenApi {
private SnailJobOpenApi() {
}
/**
* 添加集群定时任务
*
* @return {@link ClusterAddHandler}
*/
public static ClusterAddHandler addClusterJob() {
return new ClusterAddHandler();
}
/**
* 添加广播定时任务
*
* @return {@link BroadcastAddHandler}
*/
public static BroadcastAddHandler addBroadcastJob() {
return new BroadcastAddHandler();
}
/**
* 添加固定分片定时任务
*
* @return {@link ShardingAddHandler}
*/
public static ShardingAddHandler addShardingJob() {
return new ShardingAddHandler();
}
/**
* 添加Map定时任务
*
* @return {@link MapAddHandler}
*/
public static MapAddHandler addMapJob() {
return new MapAddHandler();
}
/**
* 添加MapReduce定时任务
*
* @return {@link MapReduceAddHandler}
*/
public static MapReduceAddHandler addMapReduceJob() {
return new MapReduceAddHandler();
}
/**
* 更新广播定时任务
*
* @param jobId 定时任务ID
* @return {@link BroadcastUpdateHandler}
*/
public static BroadcastUpdateHandler updateBroadcastJob(Long jobId) {
return new BroadcastUpdateHandler(jobId);
}
/**
* 更新集群定时任务
*
* @param jobId 定时任务ID
* @return {@link ClusterUpdateHandler}
*/
public static ClusterUpdateHandler updateClusterJob(Long jobId) {
return new ClusterUpdateHandler(jobId);
}
/**
* 更新MapReduce定时任务
*
* @param jobId 定时任务ID
* @return {@link MapReduceUpdateHandler}
*/
public static MapReduceUpdateHandler updateMapReduceJob(Long jobId) {
return new MapReduceUpdateHandler(jobId);
}
/**
* 更新Map定时任务
*
* @param jobId 定时任务ID
* @return {@link MapUpdateHandler}
*/
public static MapUpdateHandler updateMapJob(Long jobId) {
return new MapUpdateHandler(jobId);
}
/**
* 更新静态分片定时任务
*
* @param jobId 定时任务ID
* @return {@link ShardingUpdateHandler}
*/
public static ShardingUpdateHandler updateShardingJob(Long jobId) {
return new ShardingUpdateHandler(jobId);
}
/**
* 获取定时任务详情
*
* @param jobId 定时任务ID
* @return {@link RequestQueryHandler}
*/
public static RequestQueryHandler getJobDetail(Long jobId) {
return new RequestQueryHandler(jobId);
}
/**
* 手动触发定时任务
*
* @param jobId 定时任务ID
* @return {@link TriggerJobHandler}
*/
public static TriggerJobHandler triggerJob(Long jobId) {
return new TriggerJobHandler(jobId);
}
/**
* 手动触发工作流任务
*
* @param id 工作流任务ID
* @return {@link TriggerWorkflowHandler}
*/
public static TriggerWorkflowHandler triggerWorkFlow(Long id) {
return new TriggerWorkflowHandler(id);
}
/**
* 更新定时任务状态
*
* @param jobId 任务ID
* @return {@link UpdateJobStatusHandler}
*/
public static UpdateJobStatusHandler updateJobStatus(Long jobId) {
return new UpdateJobStatusHandler(jobId);
}
/**
* 更新工作流任务状态
*
* @param workFlowId 工作流ID
* @return {@link UpdateJobStatusHandler}
*/
public static UpdateWorkflowStatusHandler updateWorkFlowStatus(Long workFlowId) {
return new UpdateWorkflowStatusHandler(workFlowId);
}
}

View File

@ -8,12 +8,9 @@ import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor;
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.*;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
@ -25,11 +22,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
/**
* @author: opensnail
@ -56,8 +49,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
Map<Method, JobExecutor> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
(MethodIntrospector.MetadataLookup<JobExecutor>) method -> AnnotatedElementUtils
.findMergedAnnotation(method, JobExecutor.class));
(MethodIntrospector.MetadataLookup<JobExecutor>) method -> AnnotatedElementUtils
.findMergedAnnotation(method, JobExecutor.class));
} catch (Throwable ex) {
SnailJobLog.LOCAL.error("{} JobExecutor加载异常{}", beanDefinitionName, ex);
}
@ -69,8 +62,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
if (IJobExecutor.class.isAssignableFrom(bean.getClass())) {
if (!JobExecutorInfoCache.isExisted(executorClassName)) {
jobExecutorInfoList.add(new JobExecutorInfo(executorClassName,
ReflectionUtils.findMethod(bean.getClass(), "jobExecute"),
null,null, null, bean));
ReflectionUtils.findMethod(bean.getClass(), "jobExecute"),
null, null, null, bean));
}
}
@ -79,7 +72,15 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
if (Objects.nonNull(jobExecutor)) {
String executorName = jobExecutor.name();
if (!JobExecutorInfoCache.isExisted(executorName)) {
Method method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method(), JobArgs.class);
List<Class<? extends JobArgs>> classes = Lists.newArrayList(ShardingJobArgs.class, JobArgs.class);
Method method = null;
for (Class<? extends JobArgs> clazz : classes) {
method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method(), clazz);
if (Objects.nonNull(method)) {
break;
}
}
if (method == null) {
method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method());
}
@ -93,36 +94,36 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
Class<?>[] parameterTypes = method1.getParameterTypes();
MapExecutor mapExecutor = method1.getAnnotation(MapExecutor.class);
if (Objects.nonNull(mapExecutor)
&& parameterTypes.length >0
&& parameterTypes[0].isAssignableFrom(MapArgs.class)) {
&& parameterTypes.length > 0
&& parameterTypes[0].isAssignableFrom(MapArgs.class)) {
mapExecutorMethodMap.put(mapExecutor.taskName(), method1);
}
ReduceExecutor reduceExecutorAnno = method1.getAnnotation(ReduceExecutor.class);
if (Objects.nonNull(reduceExecutorAnno)
&& parameterTypes.length >0
&& parameterTypes[0].isAssignableFrom(ReduceArgs.class)) {
&& parameterTypes.length > 0
&& parameterTypes[0].isAssignableFrom(ReduceArgs.class)) {
reduceExecutor = method1;
continue;
}
MergeReduceExecutor mergeReduceExecutorAnno = method1.getAnnotation(MergeReduceExecutor.class);
if (Objects.nonNull(mergeReduceExecutorAnno)
&& parameterTypes.length >0
&& parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) {
&& parameterTypes.length > 0
&& parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) {
mergeReduceExecutor = method1;
}
}
JobExecutorInfo jobExecutorInfo =
new JobExecutorInfo(
executorName,
method,
mapExecutorMethodMap,
reduceExecutor,
mergeReduceExecutor,
bean
);
new JobExecutorInfo(
executorName,
method,
mapExecutorMethodMap,
reduceExecutor,
mergeReduceExecutor,
bean
);
jobExecutorInfoList.add(jobExecutorInfo);
}
@ -141,12 +142,12 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
}
JobExecutorInfo jobExecutorInfo =
new JobExecutorInfo(
jobExecutor.name(),
executeMethod,
null,null, null,
bean
);
new JobExecutorInfo(
jobExecutor.name(),
executeMethod,
null, null, null,
bean
);
jobExecutorInfoList.add(jobExecutorInfo);
}
}

View File

@ -0,0 +1,45 @@
package com.aizuda.snailjob.client.job.core.util;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import java.util.Set;
public class ValidatorUtils {
private static final Validator validator;
static {
validator = Validation.buildDefaultValidatorFactory().getValidator();
}
public static Pair<Boolean, String> validateEntity(Object object) {
Set<ConstraintViolation<Object>> constraintViolations = validator.validate(object);
return validateEntity(constraintViolations, object);
}
public static Pair<Boolean, String> validateEntity(Class<?> group, Object object) {
Set<ConstraintViolation<Object>> constraintViolations = validator.validate(object, group);
return validateEntity(constraintViolations, object);
}
/**
* 校验对象
*
* @param object 待校验对象
* @throws SnailJobClientException 校验不通过则报SnailJobClientException异常
*/
public static Pair<Boolean, String> validateEntity( Set<ConstraintViolation<Object>> constraintViolations, Object object) {
if (!constraintViolations.isEmpty()) {
StringBuilder msg = new StringBuilder();
for (ConstraintViolation<Object> constraint : constraintViolations) {
msg.append(constraint.getMessage()).append("\n");
}
return Pair.of(Boolean.FALSE, msg.toString());
} else {
return Pair.of(Boolean.TRUE, null);
}
}
}

View File

@ -125,6 +125,20 @@ public interface SystemConstants {
*/
String RETRY_GENERATE_IDEM_ID = "/retry/generate/idempotent-id/v1";
String OPENAPI_ADD_JOB = "/api/job/add";
String OPENAPI_UPDATE_JOB = "/api/job/update";
String OPENAPI_GET_JOB_DETAIL = "/api/job/getJobDetail";
String OPENAPI_TRIGGER_JOB = "/api/job/triggerJob";
String OPENAPI_TRIGGER_WORKFLOW = "/api/job/triggerWorkFlow";
String OPENAPI_UPDATE_JOB_STATUS = "/api/job/updateJobStatus";
String OPENAPI_UPDATE_WORKFLOW_STATUS = "/api/job/updateWorkFlowStatus";
}
String LOGO =

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.enums;
package com.aizuda.snailjob.common.core.enums;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.common.core.exception.SnailJobCommonException;
import lombok.AllArgsConstructor;
import lombok.Getter;
@ -43,7 +43,7 @@ public enum BlockStrategyEnum {
}
}
throw new SnailJobServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
throw new SnailJobCommonException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
}

View File

@ -1,10 +1,7 @@
package com.aizuda.snailjob.common.log;
import com.aizuda.snailjob.common.log.factory.GlobalLogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* @author: xiaowoniu
@ -14,10 +11,4 @@ import org.springframework.core.env.Environment;
@Configuration
@ComponentScan("com.aizuda.snailjob.common.log.*")
public class CommonLogConfigure {
@Autowired
public void setEnvironment(Environment environment) {
GlobalLogFactory.setEnvironment(environment);
}
}

View File

@ -17,13 +17,6 @@ public class GlobalLogFactory {
private static volatile LogFactory currentLogFactory;
private static final Object lock = new Object();
private static Environment environment;
public static void setEnvironment(final Environment environment) {
GlobalLogFactory.environment = environment;
}
/**
* 获取单例日志工厂类如果不存在创建之
*
@ -72,17 +65,4 @@ public class GlobalLogFactory {
return currentLogFactory;
}
/**
* 获取全局的日志开关
*
* @return
*/
public static Boolean logSwitch() {
if (Objects.nonNull(environment)) {
return environment.getProperty("snail-job.log.status", Boolean.class, Boolean.TRUE);
}
return Boolean.TRUE;
}
}

View File

@ -30,10 +30,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void trace(String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
trace(LogFactory.get(LogCaller.getCallerCaller()), format, arguments);
}
@ -45,10 +41,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void trace(Log log, String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
log.trace(isRemote, FQCN, format, arguments);
}
@ -61,10 +53,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void debug(String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
debug(LogFactory.get(LogCaller.getCallerCaller()), format, arguments);
}
@ -76,10 +64,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void debug(Log log, String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
log.debug(isRemote, FQCN, format, arguments);
}
@ -92,10 +76,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void info(String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
info(LogFactory.get(LogCaller.getCallerCaller()), format, arguments);
}
@ -107,10 +87,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void info(Log log, String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
log.info(isRemote, FQCN, format, arguments);
}
@ -123,10 +99,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void warn(String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
warn(LogFactory.get(LogCaller.getCallerCaller()), format, arguments);
}
@ -138,10 +110,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void warn(Log log, String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
log.warn(isRemote, FQCN, format, arguments);
}
@ -153,10 +121,6 @@ public abstract class AbstractLog {
* @param e 需在日志中堆栈打印的异常
*/
public void error(Throwable e) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
error(LogFactory.get(LogCaller.getCallerCaller()), e);
}
@ -167,10 +131,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void error(String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
error(LogFactory.get(LogCaller.getCallerCaller()), format, arguments);
}
@ -181,10 +141,6 @@ public abstract class AbstractLog {
* @param e 需在日志中堆栈打印的异常
*/
public void error(Log log, Throwable e) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
log.error(false, e);
}
@ -196,10 +152,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void error(Log log, String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
log.error(isRemote, FQCN, format, arguments);
}
@ -213,10 +165,6 @@ public abstract class AbstractLog {
* @param arguments 变量对应的参数
*/
public void log(Level level, Boolean remote, String format, Object... arguments) {
if (!GlobalLogFactory.logSwitch()) {
return;
}
LogFactory.get(LogCaller.getCallerCaller()).log(level, remote, FQCN, format, arguments);
}
}

View File

@ -30,7 +30,7 @@
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_job_log_message (namespace_id, group_name, job_id, task_batch_id, task_id,
log_num, message, create_dt, real_time)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId},
#{item.groupName},
#{item.jobId},
#{item.taskBatchId},
#{item.taskId},
#{item.logNum},
#{item.message},
#{item.createDt},
#{item.realTime}
)
</foreach>
</insert>
</mapper>

View File

@ -2,6 +2,28 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.JobSummaryMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_job_summary (namespace_id, group_name, business_id, trigger_at, system_task_type, success_num,
fail_num, fail_reason, stop_num, stop_reason, cancel_num, cancel_reason)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId},
#{item.groupName},
#{item.businessId},
#{item.triggerAt},
#{item.systemTaskType},
#{item.successNum},
#{item.failNum},
#{item.failReason},
#{item.stopNum},
#{item.stopReason},
#{item.cancelNum},
#{item.cancelReason}
)
</foreach>
</insert>
<update id="updateBatch" parameterType="java.util.List">
UPDATE sj_job_summary AS rt
SET success_num = tt.success_num,

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_job_task (namespace_id, group_name, job_id, task_batch_id, parent_id, task_status,
retry_count, mr_stage, leaf, task_name, client_info, wf_context, args_str,
result_message, args_type, ext_attrs, create_dt, update_dt)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId},
#{item.groupName},
#{item.jobId},
#{item.taskBatchId},
#{item.parentId},
#{item.taskStatus},
#{item.retryCount},
#{item.mrStage},
#{item.leaf},
#{item.taskName},
#{item.clientInfo},
#{item.wfContext},
#{item.argsStr},
#{item.resultMessage},
#{item.argsType},
#{item.extAttrs},
#{item.createDt},
#{item.updateDt}
)
</foreach>
</insert>
</mapper>

View File

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.RetryDeadLetterMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_retry_dead_letter (namespace_id, unique_id, group_name, scene_name, idempotent_id, biz_no,
executor_name, args_str, ext_attrs, create_dt)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId,jdbcType=VARCHAR},
#{item.uniqueId,jdbcType=VARCHAR},
#{item.groupName,jdbcType=VARCHAR},
#{item.sceneName,jdbcType=VARCHAR},
#{item.idempotentId,jdbcType=VARCHAR},
#{item.bizNo,jdbcType=VARCHAR},
#{item.executorName,jdbcType=VARCHAR},
#{item.argsStr,jdbcType=VARCHAR},
#{item.extAttrs,jdbcType=VARCHAR},
#{item.createDt,jdbcType=TIMESTAMP}
)
</foreach>
</insert>
</mapper>

View File

@ -2,6 +2,24 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.RetrySummaryMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_retry_summary (namespace_id, group_name, scene_name, trigger_at,
running_num, finish_num, max_count_num, suspend_num)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId},
#{item.groupName},
#{item.sceneName},
#{item.triggerAt},
#{item.runningNum},
#{item.finishNum},
#{item.maxCountNum},
#{item.suspendNum}
)
</foreach>
</insert>
<update id="updateBatch" parameterType="java.util.List">
UPDATE sj_retry_summary AS rt
SET running_num = tt.running_num,

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_retry_task_log (unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name,
args_str, ext_attrs, task_type, create_dt, namespace_id)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.uniqueId},
#{item.groupName},
#{item.sceneName},
#{item.idempotentId},
#{item.bizNo},
#{item.executorName},
#{item.argsStr},
#{item.extAttrs},
#{item.taskType},
#{item.createDt},
#{item.namespaceId}
)
</foreach>
</insert>
</mapper>

View File

@ -2,6 +2,23 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper">
<insert id="insertBatch" parameterType="java.util.List">
INSERT INTO sj_retry_task_log_message (namespace_id, group_name, unique_id, log_num, message,
create_dt, real_time)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId},
#{item.groupName},
#{item.uniqueId},
#{item.logNum},
#{item.message},
#{item.createDt},
#{item.realTime}
)
</foreach>
</insert>
<update id="updateBatch" parameterType="java.util.List">
UPDATE sj_retry_task_log_message rt,
(

View File

@ -2,6 +2,21 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper">
<!-- 定义批量新增的 SQL 映射 -->
<insert id="insertBatch" parameterType="java.util.List">
INSERT INTO sj_retry_task (namespace_id, unique_id, group_name, scene_name,
idempotent_id, biz_no, executor_name, args_str, ext_attrs,
next_trigger_at, task_type, retry_status, create_dt)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId}, #{item.uniqueId}, #{item.groupName},
#{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr},
#{item.extAttrs}, #{item.nextTriggerAt}, #{item.taskType}, #{item.retryStatus}, #{item.createDt}
)
</foreach>
</insert>
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
UPDATE sj_retry_task_${partition} AS rt
SET next_trigger_at = tt.next_trigger_at,
@ -15,5 +30,4 @@
) AS tt
WHERE rt.id = tt.id
</update>
</mapper>

View File

@ -2,6 +2,25 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO sj_server_node (namespace_id, group_name, host_id, host_ip, host_port,
expire_at, node_type, ext_attrs, create_dt)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(
#{item.namespaceId,jdbcType=VARCHAR},
#{item.groupName,jdbcType=VARCHAR},
#{item.hostId,jdbcType=VARCHAR},
#{item.hostIp,jdbcType=VARCHAR},
#{item.hostPort,jdbcType=INTEGER},
#{item.expireAt,jdbcType=TIMESTAMP},
#{item.nodeType,jdbcType=TINYINT},
#{item.extAttrs,jdbcType=VARCHAR},
#{item.createDt,jdbcType=TIMESTAMP}
)
</foreach>
</insert>
<update id="updateBatchExpireAt" parameterType="java.util.List">
UPDATE sj_server_node AS rt
SET expire_at = tt.expire_at,

View File

@ -27,6 +27,7 @@
<java.version>17</java.version>
<org.mapstruct.version>1.5.3.Final</org.mapstruct.version>
<akka.version>2.6.21</akka.version>
<scala.version>2.13.9</scala.version>
<java-jwt.version>4.4.0</java-jwt.version>
<perf4j.version>0.9.16</perf4j.version>
<guava.version>33.2.0-jre</guava.version>
@ -63,13 +64,30 @@
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_2.13</artifactId>
<version>${akka.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-testkit-typed_2.13</artifactId>
<version>${akka.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

View File

@ -92,6 +92,10 @@
<groupId>com.aizuda</groupId>
<artifactId>snail-job-common-log</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.server.common.convert;
import com.aizuda.snailjob.server.common.vo.JobRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @author: opensnail
* @date : 2023-10-12 09:40
* @since : 2.4.0
*/
@Mapper
public interface JobConverter {
JobConverter INSTANCE = Mappers.getMapper(JobConverter.class);
Job convert(JobRequestVO jobRequestVO);
List<JobRequestVO> convertList(List<Job> jobs);
}

View File

@ -0,0 +1,42 @@
package com.aizuda.snailjob.server.common.convert;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.vo.JobResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
/**
* @author opensnail
* @date 2023-10-11 22:50:40
* @since 2.4.0
*/
@Mapper
public interface JobResponseVOConverter {
JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class);
// @Mappings({
// @Mapping(source = "nextTriggerAt", target = "nextTriggerAt", expression = "java(DateUtils.toLocalDateTime())")
// })
List<JobResponseVO> convertList(List<Job> jobs);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))")
})
JobResponseVO convert(Job job);
static LocalDateTime toLocalDateTime(Long nextTriggerAt) {
if (Objects.isNull(nextTriggerAt) || nextTriggerAt == 0) {
return null;
}
return DateUtils.toLocalDateTime(nextTriggerAt);
}
}

View File

@ -52,7 +52,7 @@ public class NettyChannel {
* @param body 请求的消息体
* @throws InterruptedException
*/
public static void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException {
public static synchronized void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException {
Channel channel = CHANNEL_MAP.get(Pair.of(hostId, hostIp));
if (Objects.isNull(channel) || !channel.isActive()) {
@ -100,7 +100,8 @@ public class NettyChannel {
if (notTimeout) {
// 连接成功
if (channel != null && channel.isActive()) {
SnailJobLog.LOCAL.info("netty client started {} connect to server", channel.localAddress());
SnailJobLog.LOCAL.info("netty client started {} connect to server id:[{}] ip:[{}] channel:[{}]",
channel.localAddress(), hostId, ip, channel);
NettyChannel.setChannel(hostId, ip, channel);
return channel;
}

View File

@ -1,6 +1,8 @@
package com.aizuda.snailjob.server.common.util;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.util.CronExpression;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import java.text.ParseException;
import java.time.Duration;
@ -38,6 +40,7 @@ public class CronUtils {
public static long getExecuteInterval(String cron) {
List<String> executeTimeByCron = getExecuteTimeByCron(cron, 2);
Assert.isTrue(!executeTimeByCron.isEmpty(), () -> new SnailJobServerException("[{}]表达式解析有误", cron));
LocalDateTime first = LocalDateTime.parse(executeTimeByCron.get(0), DateUtils.NORM_DATETIME_PATTERN);
LocalDateTime second = LocalDateTime.parse(executeTimeByCron.get(1), DateUtils.NORM_DATETIME_PATTERN);
Duration duration = Duration.between(first, second);

View File

@ -0,0 +1,124 @@
package com.aizuda.snailjob.server.common.vo;
import com.aizuda.snailjob.common.core.enums.ExecutorTypeEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Data;
/**
* @author opensnail
* @date 2023-10-11 22:37:55
* @since 2.4.0
*/
@Data
public class JobRequestVO {
private Long id;
/**
* 组名称
*/
@NotBlank(message = "groupName 不能为空")
@Pattern(regexp = "^[A-Za-z0-9_-]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母、下划线和短横线")
private String groupName;
/**
* 名称
*/
@NotBlank(message = "jobName 不能为空")
private String jobName;
/**
* 重试状态 0关闭1开启
* {@link StatusEnum}
*/
@NotNull(message = "jobStatus 不能为空")
private Integer jobStatus;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private Integer argsType;
/**
* 执行器路由策略
*/
@NotNull(message = "routeKey 不能为空")
private Integer routeKey;
/**
* 执行器类型
* {@link ExecutorTypeEnum}
*/
@NotNull(message = "executorType 不能为空")
private Integer executorType;
/**
* 执行器名称
*/
@NotBlank(message = "executorInfo 不能为空")
private String executorInfo;
/**
* 触发类型 2. 固定时间 3.CRON 表达式 99.工作流
*/
@NotNull(message = "triggerType 不能为空")
private Integer triggerType;
/**
* 间隔时长
*/
@NotNull(message = "triggerInterval 不能为空")
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
@NotNull(message = "blockStrategy 不能为空")
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
@NotNull(message = "executorTimeout 不能为空")
private Integer executorTimeout;
/**
* 最大重试次数
*/
@NotNull(message = "maxRetryTimes 不能为空")
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
@NotNull(message = "retryInterval 不能为空")
private Integer retryInterval;
/**
* 任务类型
* {@link JobTaskTypeEnum}
*/
@NotNull(message = "taskType 不能为空")
private Integer taskType;
/**
* 并行数
*/
@NotNull(message = "parallelNum 不能为空")
private Integer parallelNum;
/**
* 描述
*/
private String description;
}

View File

@ -0,0 +1,132 @@
package com.aizuda.snailjob.server.common.vo;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author opensnail
* @date 2023-10-11 22:30:00
* @since 2.4.0
*/
@Data
public class JobResponseVO {
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private String argsType;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
/**
* 重试状态 0关闭1开启
*/
private Integer jobStatus;
/**
* 执行器路由策略
*/
private Integer routeKey;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorInfo;
/**
* 触发类型 1.CRON 表达式 2. 固定时间
*/
private Integer triggerType;
/**
* 间隔时长
*/
private String triggerInterval;
/**
* 阻塞策略 1丢弃 2覆盖 3并行
*/
private Integer blockStrategy;
/**
* 任务执行超时时间单位秒
*/
private Integer executorTimeout;
/**
* 最大重试次数
*/
private Integer maxRetryTimes;
/**
* 重试间隔(s)
*/
private Integer retryInterval;
/**
* 任务类型
*/
private Integer taskType;
/**
* 并行数
*/
private Integer parallelNum;
/**
* bucket
*/
private Integer bucketIndex;
/**
* 描述
*/
private String description;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 修改时间
*/
private LocalDateTime updateDt;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
}

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.server.common.vo;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* @author opensnail
* @date 2023-10-15 16:06:20
* @since 2.4.0
*/
@Data
public class JobStatusUpdateRequestVO {
@NotNull(message = "id 不能为空")
private Long id;
@NotNull(message = "jobStatus 不能为空")
private Integer jobStatus;
}

View File

@ -22,4 +22,15 @@ public class TaskExecuteDTO {
*/
private Integer taskExecutorScene;
public TaskExecuteDTO() {
}
public TaskExecuteDTO(Long jobId, Long taskBatchId, Long workflowTaskBatchId, Long workflowNodeId, Integer taskExecutorScene) {
this.jobId = jobId;
this.taskBatchId = taskBatchId;
this.workflowTaskBatchId = workflowTaskBatchId;
this.workflowNodeId = workflowNodeId;
this.taskExecutorScene = taskExecutorScene;
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import org.springframework.beans.factory.InitializingBean;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import java.util.concurrent.ConcurrentHashMap;

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGenerator;

View File

@ -5,12 +5,11 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
@ -20,12 +19,10 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Stream;
/**
* 重新触发执行失败的任务
@ -71,7 +68,7 @@ public class RecoveryBlockStrategy extends AbstracJobBlockStrategy {
jobExecutor.execute(buildJobExecutorContext(context, job,
StreamUtils.filter(jobTasks,
(jobTask) -> JobTaskStatusEnum.NOT_SUCCESS.contains(jobTask.getTaskStatus())
|| JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))));
)));
}
@Override

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext;
import org.springframework.beans.factory.InitializingBean;

View File

@ -1,7 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
@ -9,8 +8,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: shuguang.zhang
* @date : 2023-12-26

View File

@ -2,8 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
@ -11,8 +10,6 @@ import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: xiaowoniu
* @date : 2023-12-26

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.block.workflow;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import java.util.concurrent.ConcurrentHashMap;

View File

@ -9,6 +9,8 @@ import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.RetryJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -22,6 +24,7 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.util.Objects;
/**
@ -45,28 +48,32 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
// 判定是否需要重试
boolean needRetry = isNeedRetry(context);
if (needRetry) {
// 更新重试次数
if (updateRetryCount(context)) {
Job job = context.getJob();
JobTask jobTask = context.getJobTask();
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(
if (needRetry && updateRetryCount(context)) {
Job job = context.getJob();
JobTask jobTask = context.getJobTask();
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(
JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo()));
realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId());
realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
realJobExecutor.setTaskName(jobTask.getTaskName());
// 这里统一收口传递上下文
if (StrUtil.isBlank(realJobExecutor.getWfContext())) {
realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId()));
}
realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo()));
realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId());
realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
realJobExecutor.setTaskName(jobTask.getTaskName());
// 这里统一收口传递上下文
if (StrUtil.isBlank(realJobExecutor.getWfContext())) {
realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId()));
}
if (JobRetrySceneEnum.MANUAL.getRetryScene().equals(context.getRetryScene())) {
// 手动重试, 则即时重试
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
return;
} else {
// 注册重试任务重试间隔时间轮
JobTimerWheel.registerWithJob(() -> new RetryJobTimerTask(realJobExecutor), Duration.ofSeconds(job.getRetryInterval()));
}
return;
}
// 不需要重试执行回调

View File

@ -0,0 +1,105 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.convert.JobConverter;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.CronUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.server.common.vo.JobRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* OPENAPI
* 新增定时任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiAddJobRequestHandler extends PostHttpRequestHandler {
private final SystemProperties systemProperties;
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_ADD_JOB.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Add job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
JobRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobRequestVO.class);
if(StrUtil.isBlank(jobRequestVO.getGroupName())){
jobRequestVO.setGroupName(HttpHeaderUtil.getGroupName(headers));
}
// 判断常驻任务
Job job = JobConverter.INSTANCE.convert(jobRequestVO);
job.setResident(isResident(jobRequestVO));
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName())
% systemProperties.getBucketTotal());
job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
job.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
job.setId(null);
Assert.isTrue(1 == jobMapper.insert(job), ()-> new SnailJobServerException("新增任务失败"));
return JsonUtil.toJsonString(new NettyResult(job.getId(), retryRequest.getReqId()));
}
private Integer isResident(JobRequestVO jobRequestVO) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return StatusEnum.NO.getStatus();
}
if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) {
if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) {
return StatusEnum.YES.getStatus();
}
} else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) {
return StatusEnum.YES.getStatus();
}
} else {
throw new SnailJobServerException("未知触发类型");
}
return StatusEnum.NO.getStatus();
}
private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return 0L;
}
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(time);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
}

View File

@ -0,0 +1,53 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.convert.JobResponseVOConverter;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.vo.JobResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* OPENAPI
* 获取定时任务详情
*/
@Component
@RequiredArgsConstructor
public class OpenApiGetJobDetailRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_GET_JOB_DETAIL.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Update job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long jobId = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Assert.notNull(jobId, () -> new SnailJobServerException("id 不能为空"));
Job job = jobMapper.selectById(jobId);
JobResponseVO convert = JobResponseVOConverter.INSTANCE.convert(job);
return JsonUtil.toJsonString(new NettyResult(convert, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,80 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobPrepareHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* OPENAPI
* 调度定时任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiTriggerJobRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
private final AccessTemplate accessTemplate;
private final JobPrepareHandler terminalJobPrepareHandler;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_TRIGGER_JOB.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long jobId = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Job job = jobMapper.selectById(jobId);
if (Objects.isNull(job)){
SnailJobLog.LOCAL.warn("job can not be null.");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
if (count <= 0){
SnailJobLog.LOCAL.warn("组:[{}]已经关闭,不支持手动执行.", job.getGroupName());
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
// 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
// 创建批次
terminalJobPrepareHandler.handle(jobTaskPrepare);
return JsonUtil.toJsonString(new NettyResult(true, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,97 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.net.url.UrlQuery;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
/**
* OPENAPI
* 新增工作流任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiTriggerWorkFlowRequestHandler extends PostHttpRequestHandler {
private final WorkflowMapper workflowMapper;
private final AccessTemplate accessTemplate;
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_TRIGGER_WORKFLOW.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Trigger job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
Long id = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), Long.class);
Workflow workflow = workflowMapper.selectById(id);
if (Objects.isNull(workflow)){
SnailJobLog.LOCAL.warn("workflow can not be null.");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
// 将字符串反序列化为 Set
if (StrUtil.isNotBlank(workflow.getGroupName())) {
Set<String> namesSet = new HashSet<>(Arrays.asList(workflow.getGroupName().split(", ")));
// 判断任务节点相关组有无关闭存在关闭组则停止执行工作流执行
if (CollectionUtil.isNotEmpty(namesSet)) {
for (String groupName : namesSet) {
long count = accessTemplate.getGroupConfigAccess().count(
new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, groupName)
.eq(GroupConfig::getNamespaceId, workflow.getNamespaceId())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
if (count <= 0){
SnailJobLog.LOCAL.warn("组:[{}]已经关闭,不支持手动执行.", workflow.getGroupName());
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
}
}
}
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
// 设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
terminalWorkflowPrepareHandler.handler(prepareDTO);
return JsonUtil.toJsonString(new NettyResult(true, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,131 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.convert.JobConverter;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.CronUtils;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.server.common.vo.JobRequestVO;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
/**
* OPENAPI
* 更新定时任务
*/
@Component
@RequiredArgsConstructor
public class OpenApiUpdateJobRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_UPDATE_JOB.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Update job content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
String namespace = HttpHeaderUtil.getNamespace(headers);
JobRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobRequestVO.class);
if (Objects.isNull(jobRequestVO.getId())){
SnailJobLog.LOCAL.warn("id不能为空更新失败");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
Job job = jobMapper.selectById(jobRequestVO.getId());
if (Objects.isNull(job)){
SnailJobLog.LOCAL.warn("job为空更新失败");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
// 判断常驻任务
Job updateJob = JobConverter.INSTANCE.convert(jobRequestVO);
updateJob.setResident(isResident(jobRequestVO));
updateJob.setNamespaceId(namespace);
// 工作流任务
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
job.setNextTriggerAt(0L);
// 非常驻任务 > 非常驻任务
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
updateJob.getResident(),
StatusEnum.NO.getStatus())) {
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.NO.getStatus())) {
// 常驻任务的触发时间
long time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId()))
.orElse(DateUtils.toNowMilli());
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time));
// 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.YES.getStatus())) {
updateJob.setNextTriggerAt(DateUtils.toNowMilli());
}
// 禁止更新组
updateJob.setGroupName(null);
boolean insert = 1 == jobMapper.updateById(updateJob);
return JsonUtil.toJsonString(new NettyResult(insert, retryRequest.getReqId()));
}
private Integer isResident(JobRequestVO jobRequestVO) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return StatusEnum.NO.getStatus();
}
if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) {
if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) {
return StatusEnum.YES.getStatus();
}
} else if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(jobRequestVO.getTriggerInterval()) < 10 * 1000) {
return StatusEnum.YES.getStatus();
}
} else {
throw new SnailJobServerException("未知触发类型");
}
return StatusEnum.NO.getStatus();
}
private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) {
if (Objects.equals(jobRequestVO.getTriggerType(), SystemConstants.WORKFLOW_TRIGGER_TYPE)) {
return 0L;
}
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(time);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
}

View File

@ -0,0 +1,55 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.vo.JobStatusUpdateRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* OPENAPI
* 更新定时任务状态
*/
@Component
@RequiredArgsConstructor
public class OpenApiUpdateJobStatusRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_UPDATE_JOB_STATUS.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
JobStatusUpdateRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobStatusUpdateRequestVO.class);
Long count = jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getId, jobRequestVO.getId()));
if (1 != count){
SnailJobLog.LOCAL.warn("更新任务失败");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
Job job = new Job();
job.setId(jobRequestVO.getId());
job.setJobStatus(jobRequestVO.getJobStatus());
boolean update = 1 == jobMapper.updateById(job);
return JsonUtil.toJsonString(new NettyResult(update, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,60 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.vo.JobStatusUpdateRequestVO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* OPENAPI
* 更新工作流状态
*/
@Component
@RequiredArgsConstructor
public class OpenApiUpdateWorkFlowStatusRequestHandler extends PostHttpRequestHandler {
private final WorkflowMapper workflowMapper;
@Override
public boolean supports(String path) {
return HTTP_PATH.OPENAPI_UPDATE_WORKFLOW_STATUS.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
JobStatusUpdateRequestVO jobRequestVO = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), JobStatusUpdateRequestVO.class);
Workflow workflow = workflowMapper.selectOne(
new LambdaQueryWrapper<Workflow>()
.select(Workflow::getId)
.eq(Workflow::getId, jobRequestVO.getId()));
if (Objects.isNull(workflow)){
SnailJobLog.LOCAL.warn("工作流不存在");
return JsonUtil.toJsonString(new NettyResult(false, retryRequest.getReqId()));
}
workflow.setWorkflowStatus(jobRequestVO.getJobStatus());
boolean update = 1 == workflowMapper.updateById(workflow);
return JsonUtil.toJsonString(new NettyResult(update, retryRequest.getReqId()));
}
}

View File

@ -301,7 +301,7 @@ public class WorkflowBatchHandler {
}
public void openNextNode(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(taskExecuteDTO.getParentId()) || Objects.isNull(taskExecuteDTO.getWorkflowTaskBatchId())) {
if (Objects.isNull(taskExecuteDTO.getParentId()) || Objects.isNull(taskExecuteDTO.getWorkflowTaskBatchId()) || Long.valueOf(0).equals(taskExecuteDTO.getWorkflowTaskBatchId())) {
return;
}

View File

@ -7,7 +7,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;

View File

@ -6,7 +6,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.common.core.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.BlockStrategy;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;

View File

@ -123,4 +123,12 @@ public class MapReduceJobExecutorHandler extends AbstractJobExecutorResultHandle
return false;
}
@Override
protected void openNextWorkflowNode(JobExecutorResultContext context) {
if (context.isCreateReduceTask()){
// 任务暂未完成无需开启后续节点更新上下文
return;
}
super.openNextWorkflowNode(context);
}
}

View File

@ -123,7 +123,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
if (CollectionUtils.isEmpty(ids)) {
return;
}
Lists.partition(ids, 500).forEach(partIds -> jobTaskBatchMapper.deleteByIds(ids));
Lists.partition(ids, 500).forEach(jobTaskBatchMapper::deleteByIds);
// Waiting for deletion JobTaskList
List<JobTask> jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>().in(JobTask::getTaskBatchId, ids));
@ -131,7 +131,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
return;
}
List<Long> jobTaskListIds = StreamUtils.toList(jobTaskList, JobTask::getId);
Lists.partition(jobTaskListIds, 500).forEach(partIds -> jobTaskMapper.deleteByIds(partIds));
Lists.partition(jobTaskListIds, 500).forEach(jobTaskMapper::deleteByIds);
// Waiting for deletion JobLogMessageList
List<JobLogMessage> jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper<JobLogMessage>().in(JobLogMessage::getTaskBatchId, ids));
@ -139,7 +139,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
return;
}
List<Long> jobLogMessageListIds = StreamUtils.toList(jobLogMessageList, JobLogMessage::getId);
Lists.partition(jobLogMessageListIds, 500).forEach(partIds -> jobTaskMapper.deleteByIds(jobLogMessageListIds));
Lists.partition(jobLogMessageListIds, 500).forEach(jobLogMessageMapper::deleteByIds);
}
});
}

View File

@ -68,6 +68,8 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
jobExecutorResultDTO.setMessage("任务停止成功");
jobExecutorResultDTO.setJobOperationReason(context.getJobOperationReason());
jobExecutorResultDTO.setTaskType(getTaskType().getType());
jobExecutorResultDTO.setWorkflowNodeId(context.getWorkflowNodeId());
jobExecutorResultDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
actorRef.tell(jobExecutorResultDTO, actorRef);
}

View File

@ -49,6 +49,13 @@ public class TaskStopJobContext {
private boolean forceStop;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
private Long workflowNodeId;
protected List<JobTask> getJobTasks() {
return jobTasks;
}

View File

@ -36,6 +36,7 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
@Override
public void run(Timeout timeout) throws Exception {
JobTimerWheel.clearCache(idempotentKey());
JobTaskBatchMapper jobTaskBatchMapper = SnailSpringContext.getBean(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId);
if (Objects.isNull(jobTaskBatch)) {
@ -62,6 +63,8 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
stopJobContext.setForceStop(Boolean.TRUE);
stopJobContext.setTaskBatchId(taskBatchId);
stopJobContext.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId());
stopJobContext.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId());
instanceInterrupt.stop(stopJobContext);
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskBatchId));

View File

@ -1,13 +1,13 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime;
@ -18,7 +18,6 @@ import java.time.LocalDateTime;
* @since 2.4.0
*/
@AllArgsConstructor
@Slf4j
public class JobTimerTask implements TimerTask<String> {
public static final String IDEMPOTENT_KEY_PREFIX = "job_{0}";
private JobTimerTaskDTO jobTimerTaskDTO;
@ -26,7 +25,7 @@ public class JobTimerTask implements TimerTask<String> {
@Override
public void run(final Timeout timeout) throws Exception {
// 执行任务调度
log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
SnailJobLog.LOCAL.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
try {
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
@ -39,7 +38,7 @@ public class JobTimerTask implements TimerTask<String> {
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
SnailJobLog.LOCAL.error("任务调度执行失败", e);
}
}

Some files were not shown because too many files have changed in this diff Show More