Compare commits

..

1 Commits

Author SHA1 Message Date
srzou
a4f1c8193c feat(1.5.0-beta2): 优化脚本执行器工作目录初始化 2025-04-30 10:01:42 +08:00
61 changed files with 43 additions and 780 deletions

View File

@ -1,22 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<trees>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java" title="工作流执行"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobTaskPrepareActor.java" title="调度任务准备阶段"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java" title="处理工作流中各种节点类型"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java" title="回调节点"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java" title="条件节点"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java" title="任务节点"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowWorkflowExecutor.java" title="工作流节点"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow" title="工作流节点"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java" title="集群任务"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/BroadcastTaskJobExecutor.java" title="广播任务"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapJobExecutor.java" title="Map任务"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java" title="MapReduce任务"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/RequestClientActor.java" title="调用客户端执行任务"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ShardingJobExecutor.java" title="分片任务"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job" title="任务执行"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java" title="工作流任务超时检查"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java" title="客户端执行任务完成回调"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClusterClientCallbackHandler.java" title="集群任务执行结果回调"/>
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java" title="任务执行结果处理"/>
</trees>

View File

@ -311,8 +311,7 @@ CREATE TABLE `sj_job`
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间', `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长', `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
`block_strategy` tinyint(4) NOT NULL DEFAULT 1 COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行 4、恢复', `block_strategy` tinyint(4) NOT NULL DEFAULT 1 COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行 4、恢复',
`executor_timeout_front` int(11) NOT NULL DEFAULT 60 COMMENT '(front)任务执行超时时间,单位秒', `executor_timeout` int(11) NOT NULL DEFAULT 0 COMMENT '任务执行超时时间,单位秒',
`executor_timeout` int(11) NOT NULL DEFAULT 60 COMMENT '任务执行超时时间,单位秒',
`max_retry_times` int(11) NOT NULL DEFAULT 0 COMMENT '最大重试次数', `max_retry_times` int(11) NOT NULL DEFAULT 0 COMMENT '最大重试次数',
`parallel_num` int(11) NOT NULL DEFAULT 1 COMMENT '并行数', `parallel_num` int(11) NOT NULL DEFAULT 1 COMMENT '并行数',
`retry_interval` int(11) NOT NULL DEFAULT 0 COMMENT '重试间隔(s)', `retry_interval` int(11) NOT NULL DEFAULT 0 COMMENT '重试间隔(s)',
@ -532,37 +531,3 @@ CREATE TABLE `sj_workflow_task_batch`
) ENGINE = InnoDB ) ENGINE = InnoDB
AUTO_INCREMENT = 0 AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流批次'; DEFAULT CHARSET = utf8mb4 COMMENT ='工作流批次';
create table sj_workflow_history
(
id bigint not null comment '主键',
workflow_name varchar(64) not null comment '工作流名称',
namespace_id varchar(64) default '764d604ec6fc45f68cd92514c40e9e1a' not null comment '命名空间id',
group_name varchar(64) not null comment '组名称',
workflow_status tinyint default 1 not null comment '工作流状态 0、关闭、1、开启',
trigger_type tinyint not null comment '触发类型 1.CRON 表达式 2. 固定时间',
trigger_interval varchar(255) not null comment '间隔时长',
next_trigger_at bigint not null comment '下次触发时间',
block_strategy tinyint default 1 not null comment '阻塞策略 1、丢弃 2、覆盖 3、并行',
executor_timeout int default 0 not null comment '任务执行超时时间,单位秒',
description varchar(256) default '' not null comment '描述',
flow_info text null comment '流程信息',
wf_context text null comment '上下文',
notify_ids varchar(128) default '' not null comment '通知告警场景配置id列表',
bucket_index int default 0 not null comment 'bucket',
version int not null comment '版本号',
ext_attrs varchar(256) default '' null comment '扩展字段',
deleted tinyint default 0 not null comment '逻辑删除 1、删除',
create_dt datetime default CURRENT_TIMESTAMP not null comment '创建时间',
update_dt datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间',
primary key (id, version)
)ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4
comment '工作流履历表';
create index idx_create_dt
on sj_workflow_history (create_dt);
create index idx_namespace_id_group_name
on sj_workflow_history (namespace_id, group_name);

View File

@ -9,6 +9,7 @@ import com.aizuda.snailjob.common.core.util.SnailJobFileUtil;
import com.aizuda.snailjob.common.core.util.SnailJobSystemUtil; import com.aizuda.snailjob.common.core.util.SnailJobSystemUtil;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import lombok.Data; import lombok.Data;
import org.springframework.beans.factory.InitializingBean;
import java.io.*; import java.io.*;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -17,11 +18,11 @@ import java.nio.file.Files;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public abstract class AbstractScriptExecutor { public abstract class AbstractScriptExecutor implements InitializingBean {
protected static final String SH_SHELL = "/bin/sh"; protected static final String SH_SHELL = "/bin/sh";
private static final String WORKER_DIR = SnailFileUtils.workspace() + "/script_processor/"; private static String WORKER_DIR;
// 下载脚本模式 // 下载脚本模式
private static final String SCRIPT_DOWNLOAD_METHOD = "DOWNLOAD"; private static final String SCRIPT_DOWNLOAD_METHOD = "DOWNLOAD";
@ -290,4 +291,9 @@ public abstract class AbstractScriptExecutor {
private String scriptParams; private String scriptParams;
private String charset; private String charset;
} }
@Override
public void afterPropertiesSet() throws Exception {
WORKER_DIR = SnailFileUtils.workspace() + "/script_processor/";
}
} }

View File

@ -225,11 +225,6 @@ public interface SystemConstants {
*/ */
Long CALLBACK_JOB_ID = -2000L; Long CALLBACK_JOB_ID = -2000L;
/**
* 系统内置的工作流任务ID
*/
Long WORKFLOW_JOB_ID = -3000L;
/** /**
* 客户端返回的非json对象单值比如 "aa", 123等 * 客户端返回的非json对象单值比如 "aa", 123等
*/ */

View File

@ -18,89 +18,26 @@ import java.util.List;
@Getter @Getter
public enum JobOperationReasonEnum { public enum JobOperationReasonEnum {
/*
* 定义任务执行状态枚举
*/
/**
* 任务执行状态
*/
NONE(0, StrUtil.EMPTY), NONE(0, StrUtil.EMPTY),
/**
* 任务执行状态任务执行超时
*/
TASK_EXECUTION_TIMEOUT(1, "Task execution timeout"), TASK_EXECUTION_TIMEOUT(1, "Task execution timeout"),
/**
* 任务执行状态没有客户端节点
*/
NOT_CLIENT(2, "No client nodes"), NOT_CLIENT(2, "No client nodes"),
/**
* 任务执行状态任务已关闭
*/
JOB_CLOSED(3, "JOB closed"), JOB_CLOSED(3, "JOB closed"),
/**
* 任务执行状态任务被丢弃
*/
JOB_DISCARD(4, "Task discarded"), JOB_DISCARD(4, "Task discarded"),
/**
* 任务执行状态任务被覆盖
*/
JOB_OVERLAY(5, "Task overridden"), JOB_OVERLAY(5, "Task overridden"),
/**
* 任务执行状态没有可执行的任务项
*/
NOT_EXECUTION_TASK(6, "No executable task items"), NOT_EXECUTION_TASK(6, "No executable task items"),
/**
* 任务执行状态任务执行过程中发生意外异常
*/
TASK_EXECUTION_ERROR(7, "Unexpected exception occurred during task execution"), TASK_EXECUTION_ERROR(7, "Unexpected exception occurred during task execution"),
/**
* 任务执行状态手动停止
*/
MANNER_STOP(8, "Manual stop"), MANNER_STOP(8, "Manual stop"),
/**
* 工作流条件节点执行异常
*/
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR(9, "Condition node execution exception"), WORKFLOW_CONDITION_NODE_EXECUTION_ERROR(9, "Condition node execution exception"),
/**
* 任务被中断
*/
JOB_TASK_INTERRUPTED(10, "Task interrupted"), JOB_TASK_INTERRUPTED(10, "Task interrupted"),
/**
* 工作流回调节点执行异常
*/
WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR(11, "Callback node execution exception"), WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR(11, "Callback node execution exception"),
/**
* 工作流节点无需操作
*/
WORKFLOW_NODE_NO_REQUIRED(12, "No action required"), WORKFLOW_NODE_NO_REQUIRED(12, "No action required"),
/**
* 工作流节点已关闭跳过执行
*/
WORKFLOW_NODE_CLOSED_SKIP_EXECUTION(13, "Node closed, skipped execution"), WORKFLOW_NODE_CLOSED_SKIP_EXECUTION(13, "Node closed, skipped execution"),
/**
* 工作流决策未通过
*/
WORKFLOW_DECISION_FAILED(14, "Judgment not passed"), WORKFLOW_DECISION_FAILED(14, "Judgment not passed"),
/**
* 手动调用
*/
MANUAL_TRIGGER(15, "Manual call"),
/**
* 由工作流中被调用
*/
WORKFLOW_CALLED(16, "Called by workflow"),
; ;
/**
* 原因
*/
private final int reason; private final int reason;
/**
* 描述
*/
private final String desc; private final String desc;
/** /**

View File

@ -50,9 +50,6 @@ public enum JobTaskBatchStatusEnum {
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status); public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
/**
* 任务完成 状态 包含 SUCCESS 3, FAIL 4, STOP 5, CANCEL 6
*/
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status); public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status);
public static final List<Integer> NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status, CANCEL.status); public static final List<Integer> NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status, CANCEL.status);

View File

@ -8,37 +8,15 @@ import lombok.Getter;
* @author opensnail * @author opensnail
* @date 2023-10-02 10:39:22 * @date 2023-10-02 10:39:22
* @since 2.4.0 * @since 2.4.0
*
* 任务类型
* 1. 集群任务 ClusterJobExecutor
* 2. 广播任务 BroadcastTaskJobExecutor
* 3. 静态分片任务 ShardingJobExecutor
* 4. Map 任务 MapJobExecutor
* 5. MapReduce 任务 MapReduceJobExecutor
*/ */
@AllArgsConstructor @AllArgsConstructor
@Getter @Getter
public enum JobTaskTypeEnum { public enum JobTaskTypeEnum {
/**
* 集群任务
*/
CLUSTER(1), CLUSTER(1),
/**
* 广播任务
*/
BROADCAST(2), BROADCAST(2),
/**
* 静态分片任务
*/
SHARDING(3), SHARDING(3),
/**
* Map 任务
*/
MAP(4), MAP(4),
/**
* MapReduce 任务
*/
MAP_REDUCE(5), MAP_REDUCE(5),
; ;

View File

@ -13,13 +13,7 @@ import lombok.Getter;
@Getter @Getter
public enum StatusEnum { public enum StatusEnum {
/**
* 0关闭1开启
*/
NO(0), NO(0),
/**
* 0关闭1开启
*/
YES(1); YES(1);
private final Integer status; private final Integer status;

View File

@ -4,7 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
/** /**
* 1任务节点 2条件节点 3回调节点 4工作流节点 * 1任务节点 2条件节点 3回调节点
* *
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:13:43 * @date 2023-12-24 08:13:43
@ -13,22 +13,9 @@ import lombok.Getter;
@AllArgsConstructor @AllArgsConstructor
@Getter @Getter
public enum WorkflowNodeTypeEnum { public enum WorkflowNodeTypeEnum {
/**
* 任务节点
*/
JOB_TASK(1, "JOB task"), JOB_TASK(1, "JOB task"),
/**
* 条件节点
*/
DECISION(2, "Decision node"), DECISION(2, "Decision node"),
/**
* 回调节点
*/
CALLBACK(3, "Callback node"), CALLBACK(3, "Callback node"),
/**
* 工作流节点
*/
WORKFLOW(4, "Workflow node"),
; ;
private final int type; private final int type;

View File

@ -1,15 +0,0 @@
package com.aizuda.snailjob.template.datasource.persistence.mapper;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* @author zjw
* @description: TODO
* @date 2025/05/19
*/
@Mapper
public interface WorkflowHistoryMapper extends BaseMapper<WorkflowHistory> {
}

View File

@ -102,11 +102,6 @@ public class Job extends CreateUpdateDt {
*/ */
private Integer executorTimeout; private Integer executorTimeout;
/**
* 前端设置的任务执行超时时间单位秒
*/
private Integer executorTimeoutFront;
/** /**
* 最大重试次数 * 最大重试次数
*/ */

View File

@ -1,112 +0,0 @@
package com.aizuda.snailjob.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* * 工作流履历表
* </p>
*
* @author : xiaowoniu
* @date : 2023-12-12
* @since : 2.6.0
*/
@Data
@TableName("sj_workflow_history")
@EqualsAndHashCode(callSuper=true)
public class WorkflowHistory extends CreateUpdateDt{
/**
* 主键
*/
@TableId(value = "id")
private Long id;
/**
* 工作流名称
*/
private String workflowName;
/**
* 命名空间id
*/
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 触发类型
*/
private Integer triggerType;
/**
* 阻塞策略
*/
private Integer blockStrategy;
/**
* 触发间隔
*/
private String triggerInterval;
/**
* 执行超时时间
*/
private Integer executorTimeout;
/**
* 工作流状态 0关闭1开启
*/
private Integer workflowStatus;
/**
* 任务执行时间
*/
private Long nextTriggerAt;
/**
* 流程信息
*/
private String flowInfo;
/**
* bucket
*/
private Integer bucketIndex;
/**
* 描述
*/
private String description;
/**
* 工作流上下文
*/
private String wfContext;
/**
* 版本号
*/
private Integer version;
/**
* 扩展字段
*/
private String extAttrs;
/**
* 逻辑删除 1删除
*/
private Integer deleted;
/**
* 通知告警场景配置id列表
*/
private String notifyIds;
}

View File

@ -7,7 +7,6 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.CallbackConfig; import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig; import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.vo.WorkflowBatchResponseVO; import com.aizuda.snailjob.server.common.vo.WorkflowBatchResponseVO;
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO; import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
@ -55,8 +54,7 @@ public interface WorkflowConverter {
@Mappings({ @Mappings({
@Mapping(target = "decision", expression = "java(WorkflowConverter.parseDecisionConfig(workflowNode))"), @Mapping(target = "decision", expression = "java(WorkflowConverter.parseDecisionConfig(workflowNode))"),
@Mapping(target = "callback", expression = "java(WorkflowConverter.parseCallbackConfig(workflowNode))"), @Mapping(target = "callback", expression = "java(WorkflowConverter.parseCallbackConfig(workflowNode))"),
@Mapping(target = "jobTask", expression = "java(WorkflowConverter.parseJobTaskConfig(workflowNode))"), @Mapping(target = "jobTask", expression = "java(WorkflowConverter.parseJobTaskConfig(workflowNode))")
@Mapping(target = "subWorkflow", expression = "java(WorkflowConverter.parseWorkflowConfig(workflowNode))")
}) })
WorkflowDetailResponseVO.NodeInfo convert(WorkflowNode workflowNode); WorkflowDetailResponseVO.NodeInfo convert(WorkflowNode workflowNode);
@ -112,13 +110,6 @@ public interface WorkflowConverter {
return null; return null;
} }
static WorkflowConfig parseWorkflowConfig(WorkflowNode workflowNode) {
if (WorkflowNodeTypeEnum.WORKFLOW.getType() == workflowNode.getNodeType()){
return JsonUtil.parseObject(workflowNode.getNodeInfo(), WorkflowConfig.class);
}
return null;
}
static Set<Long> toNotifyIds(String notifyIds) { static Set<Long> toNotifyIds(String notifyIds) {
if (StrUtil.isBlank(notifyIds)) { if (StrUtil.isBlank(notifyIds)) {
return new HashSet<>(); return new HashSet<>();

View File

@ -1,20 +0,0 @@
package com.aizuda.snailjob.server.common.dto;
import lombok.Data;
/**
* 工作流配置
*/
@Data
public class WorkflowConfig {
/**
* ID
*/
private Long id;
/**
* 名称
*/
private String name;
}

View File

@ -14,26 +14,10 @@ import lombok.Getter;
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum JobTaskExecutorSceneEnum { public enum JobTaskExecutorSceneEnum {
/**
* 自动触发任务
*/
AUTO_JOB(1, SyetemTaskTypeEnum.JOB), AUTO_JOB(1, SyetemTaskTypeEnum.JOB),
/**
* 手动触发任务
*/
MANUAL_JOB(2, SyetemTaskTypeEnum.JOB), MANUAL_JOB(2, SyetemTaskTypeEnum.JOB),
/**
* 自动触发工作流
*/
AUTO_WORKFLOW(3, SyetemTaskTypeEnum.WORKFLOW), AUTO_WORKFLOW(3, SyetemTaskTypeEnum.WORKFLOW),
/**
* 手动触发工作流
*/
MANUAL_WORKFLOW(4, SyetemTaskTypeEnum.WORKFLOW), MANUAL_WORKFLOW(4, SyetemTaskTypeEnum.WORKFLOW),
/**
* 工作流触发工作流
*/
WORKFLOW_WORKFLOW(5, SyetemTaskTypeEnum.WORKFLOW),
; ;
private final Integer type; private final Integer type;

View File

@ -8,7 +8,6 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.CallbackConfig; import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig; import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO; import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO;
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO; import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
@ -162,7 +161,9 @@ public class WorkflowHandler {
Assert.notNull(decision.getDefaultDecision(), () -> new SnailJobServerException("Default decision for [{}] cannot be empty", nodeInfo.getNodeName())); Assert.notNull(decision.getDefaultDecision(), () -> new SnailJobServerException("Default decision for [{}] cannot be empty", nodeInfo.getNodeName()));
Assert.notNull(decision.getExpressionType(), () -> new SnailJobServerException("Expression type for [{}] cannot be empty", nodeInfo.getNodeName())); Assert.notNull(decision.getExpressionType(), () -> new SnailJobServerException("Expression type for [{}] cannot be empty", nodeInfo.getNodeName()));
workflowNode.setNodeInfo(JsonUtil.toJsonString(decision)); workflowNode.setNodeInfo(JsonUtil.toJsonString(decision));
}else if (WorkflowNodeTypeEnum.CALLBACK.getType() == nodeConfig.getNodeType()) { }
if (WorkflowNodeTypeEnum.CALLBACK.getType() == nodeConfig.getNodeType()) {
workflowNode.setJobId(SystemConstants.CALLBACK_JOB_ID); workflowNode.setJobId(SystemConstants.CALLBACK_JOB_ID);
CallbackConfig callback = nodeInfo.getCallback(); CallbackConfig callback = nodeInfo.getCallback();
Assert.notNull(callback, () -> new SnailJobServerException("Configuration information for [{}] cannot be empty", nodeInfo.getNodeName())); Assert.notNull(callback, () -> new SnailJobServerException("Configuration information for [{}] cannot be empty", nodeInfo.getNodeName()));
@ -170,18 +171,13 @@ public class WorkflowHandler {
Assert.notNull(callback.getContentType(), () -> new SnailJobServerException("Request type for [{}] cannot be empty", nodeInfo.getNodeName())); Assert.notNull(callback.getContentType(), () -> new SnailJobServerException("Request type for [{}] cannot be empty", nodeInfo.getNodeName()));
Assert.notBlank(callback.getSecret(), () -> new SnailJobServerException("Secret key for [{}] cannot be empty", nodeInfo.getNodeName())); Assert.notBlank(callback.getSecret(), () -> new SnailJobServerException("Secret key for [{}] cannot be empty", nodeInfo.getNodeName()));
workflowNode.setNodeInfo(JsonUtil.toJsonString(callback)); workflowNode.setNodeInfo(JsonUtil.toJsonString(callback));
}else if (WorkflowNodeTypeEnum.JOB_TASK.getType() == nodeConfig.getNodeType()) { }
if (WorkflowNodeTypeEnum.JOB_TASK.getType() == nodeConfig.getNodeType()) {
JobTaskConfig jobTask = nodeInfo.getJobTask(); JobTaskConfig jobTask = nodeInfo.getJobTask();
Assert.notNull(jobTask, () -> new SnailJobServerException("Configuration information for [{}] cannot be empty", nodeInfo.getNodeName())); Assert.notNull(jobTask, () -> new SnailJobServerException("Configuration information for [{}] cannot be empty", nodeInfo.getNodeName()));
Assert.notNull(jobTask.getJobId(), () -> new SnailJobServerException("Associated task for [{}] cannot be empty", nodeInfo.getNodeName())); Assert.notNull(jobTask.getJobId(), () -> new SnailJobServerException("Associated task for [{}] cannot be empty", nodeInfo.getNodeName()));
workflowNode.setJobId(jobTask.getJobId()); workflowNode.setJobId(jobTask.getJobId());
}else if (WorkflowNodeTypeEnum.WORKFLOW.getType() == nodeConfig.getNodeType()) {
// TODO 工作流结点
WorkflowConfig workflow = nodeInfo.getSubWorkflow();
workflowNode.setJobId(SystemConstants.WORKFLOW_JOB_ID);
workflowNode.setNodeInfo(JsonUtil.toJsonString(workflow));
} else {
throw new SnailJobServerException("Unsupported node type [{}]", nodeConfig.getNodeType());
} }
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode),

View File

@ -230,7 +230,6 @@ public class ActorGenerator {
/** /**
* Job调度准备阶段actor * Job调度准备阶段actor
* *
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobTaskPrepareActor
* @return actor 引用 * @return actor 引用
*/ */
public static ActorRef jobTaskPrepareActor() { public static ActorRef jobTaskPrepareActor() {
@ -261,7 +260,7 @@ public class ActorGenerator {
/** /**
* Job任务执行阶段actor * Job任务执行阶段actor
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorActor *
* @return actor 引用 * @return actor 引用
*/ */
public static ActorRef jobTaskExecutorActor() { public static ActorRef jobTaskExecutorActor() {
@ -275,7 +274,6 @@ public class ActorGenerator {
/** /**
* Job任务执行阶段actor * Job任务执行阶段actor
* *
* @see com.aizuda.snailjob.server.job.task.support.dispatch.WorkflowExecutorActor
* @return actor 引用 * @return actor 引用
*/ */
public static ActorRef workflowTaskExecutorActor() { public static ActorRef workflowTaskExecutorActor() {
@ -288,7 +286,7 @@ public class ActorGenerator {
/** /**
* Job任务执行结果actor * Job任务执行结果actor
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorResultActor *
* @return actor 引用 * @return actor 引用
*/ */
public static ActorRef jobTaskExecutorResultActor() { public static ActorRef jobTaskExecutorResultActor() {
@ -300,8 +298,6 @@ public class ActorGenerator {
/** /**
* Job任务向客户端发起请求阶段actor * Job任务向客户端发起请求阶段actor
* *
* @see com.aizuda.snailjob.server.job.task.support.executor.job.RequestClientActor
*
* @return actor 引用 * @return actor 引用
*/ */
public static ActorRef jobRealTaskExecutorActor() { public static ActorRef jobRealTaskExecutorActor() {

View File

@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.common.vo;
import com.aizuda.snailjob.server.common.dto.CallbackConfig; import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig; import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import lombok.Data; import lombok.Data;
import java.util.List; import java.util.List;
@ -145,11 +144,6 @@ public class WorkflowDetailResponseVO {
*/ */
private JobTaskConfig jobTask; private JobTaskConfig jobTask;
/**
* 工作流配置
*/
private WorkflowConfig subWorkflow;
/** /**
* 定时任务批次信息 * 定时任务批次信息
*/ */

View File

@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.common.vo.request;
import com.aizuda.snailjob.server.common.dto.CallbackConfig; import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig; import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
@ -108,7 +107,7 @@ public class WorkflowRequestVO {
private Integer priorityLevel; private Integer priorityLevel;
/** /**
* 子节点下一个节点 * 子节点
*/ */
private NodeConfig childNode; private NodeConfig childNode;
@ -131,11 +130,6 @@ public class WorkflowRequestVO {
* 回调配置 * 回调配置
*/ */
private CallbackConfig callback; private CallbackConfig callback;
/**
* 工作流配置
*/
private WorkflowConfig subWorkflow;
} }
/** /**

View File

@ -19,9 +19,6 @@ public class WorkflowNodeTaskExecuteDTO {
*/ */
private Integer taskExecutorScene; private Integer taskExecutorScene;
/**
* 前一个节点 id
*/
private Long parentId; private Long parentId;
/** /**

View File

@ -1,6 +1,5 @@
package com.aizuda.snailjob.server.job.task.dto; package com.aizuda.snailjob.server.job.task.dto;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import lombok.Data; import lombok.Data;
/** /**
@ -16,7 +15,7 @@ public class WorkflowTaskPrepareDTO {
private Long workflowId; private Long workflowId;
/** /**
* 执行策略 1auto 2manual 3auto_workflow 4manual_workflow * 执行策略 1auto 2manual 3workflow
*/ */
private Integer taskExecutorScene; private Integer taskExecutorScene;
@ -65,11 +64,6 @@ public class WorkflowTaskPrepareDTO {
*/ */
private long nextTriggerAt; private long nextTriggerAt;
/**
* 操作原因
*/
private Integer operationReason;
/** /**
* 任务执行时间 * 任务执行时间
*/ */
@ -84,9 +78,4 @@ public class WorkflowTaskPrepareDTO {
* 工作流上下文 * 工作流上下文
*/ */
private String wfContext; private String wfContext;
/**
* 父工作流上下文
*/
private WorkflowExecutorContext parentWfContext;
} }

View File

@ -28,8 +28,6 @@ import java.time.Duration;
import java.util.Objects; import java.util.Objects;
/** /**
* 客户端执行任务完成回调处理
*
* @author opensnail * @author opensnail
* @date 2023-10-03 23:12:33 * @date 2023-10-03 23:12:33
* @since 2.4.0 * @since 2.4.0
@ -130,9 +128,6 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
} }
/**
* 判定是否需要重试
*/
private boolean isNeedRetry(ClientCallbackContext context) { private boolean isNeedRetry(ClientCallbackContext context) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId()); JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());

View File

@ -18,9 +18,6 @@ public class ClientCallbackFactory {
CACHE.put(taskInstanceType, callbackHandler); CACHE.put(taskInstanceType, callbackHandler);
} }
/**
* 根据任务类型获取任务执行器
*/
public static ClientCallbackHandler getClientCallback(Integer type) { public static ClientCallbackHandler getClientCallback(Integer type) {
return CACHE.get(JobTaskTypeEnum.valueOf(type)); return CACHE.get(JobTaskTypeEnum.valueOf(type));
} }

View File

@ -15,8 +15,6 @@ import org.springframework.stereotype.Component;
import java.util.Objects; import java.util.Objects;
/** /**
* 集群任务执行结果回调处理
*
* @author opensnail * @author opensnail
* @date 2023-10-03 23:12:12 * @date 2023-10-03 23:12:12
* @since 2.4.0 * @since 2.4.0
@ -46,10 +44,6 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
return ClientInfoUtils.generate(serverNode); return ClientInfoUtils.generate(serverNode);
} }
/**
* 调用Job任务执行结果actor
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorResultActor
*/
@Override @Override
protected void doCallback(ClientCallbackContext context) { protected void doCallback(ClientCallbackContext context) {

View File

@ -50,7 +50,6 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import scala.Int;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.Duration; import java.time.Duration;
@ -110,7 +109,6 @@ public class JobExecutorActor extends AbstractActor {
Job job = jobMapper.selectOne(queryWrapper.eq(Job::getId, taskExecute.getJobId())); Job job = jobMapper.selectOne(queryWrapper.eq(Job::getId, taskExecute.getJobId()));
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
try { try {
// 操作原因
int operationReason = JobOperationReasonEnum.NONE.getReason(); int operationReason = JobOperationReasonEnum.NONE.getReason();
if (Objects.isNull(job)) { if (Objects.isNull(job)) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();

View File

@ -22,8 +22,6 @@ import org.springframework.stereotype.Component;
import java.util.Objects; import java.util.Objects;
/** /**
* 任务执行结果处理
*
* @author opensnail * @author opensnail
* @date 2023-10-05 17:16:35 * @date 2023-10-05 17:16:35
* @since 2.4.0 * @since 2.4.0
@ -76,9 +74,6 @@ public class JobExecutorResultActor extends AbstractActor {
} }
/**
* 尝试完成任务
*/
private void tryCompleteAndStop(JobExecutorResultDTO jobExecutorResultDTO) { private void tryCompleteAndStop(JobExecutorResultDTO jobExecutorResultDTO) {
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(jobExecutorResultDTO); CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(jobExecutorResultDTO);
jobTaskBatchHandler.handleResult(completeJobBatchDTO); jobTaskBatchHandler.handleResult(completeJobBatchDTO);

View File

@ -53,7 +53,7 @@ public class JobTaskPrepareActor extends AbstractActor {
private void doPrepare(JobTaskPrepareDTO prepare) { private void doPrepare(JobTaskPrepareDTO prepare) {
LambdaQueryWrapper<JobTaskBatch> queryWrapper = new LambdaQueryWrapper<JobTaskBatch>() LambdaQueryWrapper<JobTaskBatch> queryWrapper = new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getJobId, prepare.getJobId()) .eq(JobTaskBatch::getJobId, prepare.getJobId())
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE);//未完成状态 .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE);
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get( JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
prepare.getTaskExecutorScene()); prepare.getTaskExecutorScene());

View File

@ -47,8 +47,6 @@ import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION; import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
/** /**
* 工作流执行器
*
* @author: xiaowoniu * @author: xiaowoniu
* @date : 2023-12-22 10:34 * @date : 2023-12-22 10:34
* @since : 2.6.0 * @since : 2.6.0
@ -93,7 +91,7 @@ public class WorkflowExecutorActor extends AbstractActor {
private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) { private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId()); WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("Task does not exist,WorkflowTaskBatchId:"+taskExecute.getWorkflowTaskBatchId())); Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("Task does not exist"));
if (SystemConstants.ROOT.equals(taskExecute.getParentId()) if (SystemConstants.ROOT.equals(taskExecute.getParentId())
&& JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) { && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {
@ -222,16 +220,6 @@ public class WorkflowExecutorActor extends AbstractActor {
} }
/**
* 填充父工作流节点的执行原因
* 该方法旨在处理父工作流节点的执行原因以决定是否继续执行后续节点
* 它通过检查是否存在不应跳过的执行原因来确定是否应该继续执行
*
* @param allJobTaskBatchList 包含所有作业任务批次的列表
* @param parentJobTaskBatchList 包含父工作流节点相关作业任务批次的列表
* @param parentWorkflowNode 父工作流节点
* @param context 工作流执行的上下文环境
*/
private static void fillParentOperationReason(final List<JobTaskBatch> allJobTaskBatchList, private static void fillParentOperationReason(final List<JobTaskBatch> allJobTaskBatchList,
final List<JobTaskBatch> parentJobTaskBatchList, final WorkflowNode parentWorkflowNode, final List<JobTaskBatch> parentJobTaskBatchList, final WorkflowNode parentWorkflowNode,
final WorkflowExecutorContext context) { final WorkflowExecutorContext context) {
@ -257,9 +245,6 @@ public class WorkflowExecutorActor extends AbstractActor {
} }
} }
/**
* 前置任务是否已完成
*/
private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors, private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors,
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode, Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode,
Map<Long, WorkflowNode> workflowNodeMap) { Map<Long, WorkflowNode> workflowNodeMap) {
@ -308,12 +293,6 @@ public class WorkflowExecutorActor extends AbstractActor {
return Boolean.TRUE; return Boolean.TRUE;
} }
/**
* 更新工作流任务批次
*
* @param taskStatus 任务批次状态
* @param operationReason 操作原因
*/
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) { private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();

View File

@ -10,15 +10,6 @@ import org.springframework.transaction.annotation.Transactional;
* @author opensnail * @author opensnail
* @date 2023-10-03 22:13:04 * @date 2023-10-03 22:13:04
* @since 2.4.0 * @since 2.4.0
*
* 任务类型
* 1. 集群任务 ClusterJobExecutor
* @see com.aizuda.snailjob.server.job.task.support.executor.job.ClusterJobExecutor
* 2. 广播任务 BroadcastTaskJobExecutor
* 3. 静态分片任务 ShardingJobExecutor
* 4. Map 任务 MapJobExecutor
* 5. MapReduce 任务 MapReduceJobExecutor
*
*/ */
public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean { public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean {

View File

@ -17,8 +17,6 @@ import java.util.List;
* @author opensnail * @author opensnail
* @date 2023-10-06 10:27:26 * @date 2023-10-06 10:27:26
* @since 2.4.0 * @since 2.4.0
*
* 广播任务执行器
*/ */
@Component @Component
@Slf4j @Slf4j

View File

@ -18,7 +18,6 @@ import java.util.List;
* @author opensnail * @author opensnail
* @date 2023-10-03 22:12:40 * @date 2023-10-03 22:12:40
* @since 2.4.0 * @since 2.4.0
* 集群任务执行器
*/ */
@Component @Component
public class ClusterJobExecutor extends AbstractJobExecutor { public class ClusterJobExecutor extends AbstractJobExecutor {

View File

@ -6,8 +6,6 @@ import org.springframework.stereotype.Component;
/** /**
* @author: shuguang.zhang * @author: shuguang.zhang
* @date : 2024-06-19 * @date : 2024-06-19
*
* Map 任务
*/ */
@Component @Component
public class MapJobExecutor extends MapReduceJobExecutor { public class MapJobExecutor extends MapReduceJobExecutor {

View File

@ -16,8 +16,6 @@ import java.util.List;
* @author: opensnail * @author: opensnail
* @date : 2024-06-12 * @date : 2024-06-12
* @since : sj_1.1.0 * @since : sj_1.1.0
*
* MapReduce 任务执行器
*/ */
@Component @Component
public class MapReduceJobExecutor extends AbstractJobExecutor { public class MapReduceJobExecutor extends AbstractJobExecutor {

View File

@ -40,8 +40,6 @@ import java.util.Objects;
* @author opensnail * @author opensnail
* @date 2023-10-06 16:42:08 * @date 2023-10-06 16:42:08
* @since 2.4.0 * @since 2.4.0
*
* 调用客户端执行任务
*/ */
@Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR) @Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

View File

@ -17,8 +17,6 @@ import java.util.List;
* @author opensnail * @author opensnail
* @date 2023-10-06 17:33:51 * @date 2023-10-06 17:33:51
* @since 2.4.0 * @since 2.4.0
*
* 分片任务执行器
*/ */
@Component @Component
@Slf4j @Slf4j

View File

@ -37,12 +37,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:15:19 * @date 2023-12-24 08:15:19
* @since 2.6.0 * @since 2.6.0
*
* 抽象工作流执行器
*<br/>
* 1.任务节点 JobTaskWorkflowExecutor
* 2.决策节点 @see com.aizuda.snailjob.server.job.task.support.executor.workflow.DecisionWorkflowExecutor
* 3.回调通知节点 @see com.aizuda.snailjob.server.job.task.support.executor.workflow.CallbackWorkflowExecutor
*/ */
@Slf4j @Slf4j
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean { public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {

View File

@ -34,8 +34,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:18:06 * @date 2023-12-24 08:18:06
* @since 2.6.0 * @since 2.6.0
*
* 回调通知节点
*/ */
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor

View File

@ -34,8 +34,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:17:11 * @date 2023-12-24 08:17:11
* @since 2.6.0 * @since 2.6.0
*
* 决策节点执行器
*/ */
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor

View File

@ -22,8 +22,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:09:14 * @date 2023-12-24 08:09:14
* @since 2.6.0 * @since 2.6.0
*
* 任务节点执行器
*/ */
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@ -75,9 +73,6 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
} }
/**
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobTaskPrepareActor
*/
private static void invokeJobTask(final WorkflowExecutorContext context) { private static void invokeJobTask(final WorkflowExecutorContext context) {
// 生成任务批次 // 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob(), context); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob(), context);

View File

@ -11,9 +11,6 @@ import lombok.Data;
@Data @Data
public class WorkflowExecutorContext { public class WorkflowExecutorContext {
/**
* 命名空间id
*/
private String namespaceId; private String namespaceId;
/** /**

View File

@ -12,9 +12,6 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class WorkflowExecutorFactory { public class WorkflowExecutorFactory {
/**
* Map<节点类型,节点执行类>
*/
private static final ConcurrentHashMap<WorkflowNodeTypeEnum, WorkflowExecutor> CACHE = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<WorkflowNodeTypeEnum, WorkflowExecutor> CACHE = new ConcurrentHashMap<>();
protected static void registerJobExecutor(WorkflowNodeTypeEnum workflowNodeTypeEnum, WorkflowExecutor executor) { protected static void registerJobExecutor(WorkflowNodeTypeEnum workflowNodeTypeEnum, WorkflowExecutor executor) {

View File

@ -1,104 +0,0 @@
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
@Component
@RequiredArgsConstructor
public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobTaskBatchHandler jobTaskBatchHandler;
private final JobMapper jobMapper;
private final WorkflowMapper workflowMapper;
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.WORKFLOW;
}
@Override
protected boolean doPreValidate(WorkflowExecutorContext context) {
return true;
}
@Override
protected void afterExecute(WorkflowExecutorContext context) {
}
@Override
protected void beforeExecute(WorkflowExecutorContext context) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
context.setOperationReason(JobOperationReasonEnum.NONE.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
JobTask jobTask = generateJobTask(context, jobTaskBatch);
JobLogMetaDTO jobLogMetaDTO = new JobLogMetaDTO();
jobLogMetaDTO.setNamespaceId(context.getNamespaceId());
jobLogMetaDTO.setGroupName(context.getGroupName());
jobLogMetaDTO.setTaskBatchId(jobTaskBatch.getId());
jobLogMetaDTO.setJobId(SystemConstants.WORKFLOW_JOB_ID);
jobLogMetaDTO.setTaskId(jobTask.getId());
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
SnailJobLog.REMOTE.info("Node [{}] workflow success.\nworkflow params: {} \nworkflow result: [{}] <|>{}<|>", context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
} else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) {
if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(context.getParentOperationReason())) {
SnailJobLog.REMOTE.warn("Node [{}] cancels workflow. Cancellation reason: Current task does not require processing <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO);
} else {
SnailJobLog.REMOTE.warn("Node [{}] cancels workflow. Cancellation reason: Task status is closed <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO);
}
} else {
SnailJobLog.REMOTE.error("Node [{}] fail to workflow.\nReason: {} <|>{}<|>", context.getWorkflowNodeId(), context.getLogMessage(), jobLogMetaDTO);
}
}
@Override
protected void doExecute(WorkflowExecutorContext context) {
WorkflowConfig nodeInfo = JsonUtil.parseObject(context.getNodeInfo(), WorkflowConfig.class);
Workflow workflow = workflowMapper.selectById(nodeInfo.getId());
Assert.notNull(workflow, () -> new SnailJobServerException("workflow can not be null."));
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
// 下次触发时间设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
// 执行策略工作流触发工作流
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.WORKFLOW_WORKFLOW.getType());
// 操作原因工作流调用
prepareDTO.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLED.getReason());
prepareDTO.setParentWfContext(context);
terminalWorkflowPrepareHandler.handler(prepareDTO);
}
}

View File

@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.job.task.support.generator.batch;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO;
@ -29,10 +28,6 @@ import java.util.Optional;
public class WorkflowBatchGenerator { public class WorkflowBatchGenerator {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
/**
* 生成工作流批次插入数据库
* 开始执行工作流
*/
public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) { public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) {
// 生成任务批次 // 生成任务批次
@ -40,7 +35,6 @@ public class WorkflowBatchGenerator {
workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus())); workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
workflowTaskBatch.setOperationReason(context.getOperationReason()); workflowTaskBatch.setOperationReason(context.getOperationReason());
workflowTaskBatch.setWfContext(context.getWfContext()); workflowTaskBatch.setWfContext(context.getWfContext());
workflowTaskBatch.setExtAttrs(JsonUtil.toJsonString(context.getParentWorkflowContext()));
Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new SnailJobServerException("Adding new scheduling task failed. [{}]", context.getWorkflowId())); Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new SnailJobServerException("Adding new scheduling task failed. [{}]", context.getWorkflowId()));

View File

@ -1,6 +1,5 @@
package com.aizuda.snailjob.server.job.task.support.generator.batch; package com.aizuda.snailjob.server.job.task.support.generator.batch;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import lombok.Data; import lombok.Data;
/** /**
@ -39,7 +38,7 @@ public class WorkflowTaskBatchGeneratorContext {
private Integer taskBatchStatus; private Integer taskBatchStatus;
/** /**
* 执行策略 1auto 2manual 3auto_workflow 4manual_workflow * 执行策略 1auto 2manual 3workflow
*/ */
private Integer taskExecutorScene; private Integer taskExecutorScene;
@ -53,9 +52,5 @@ public class WorkflowTaskBatchGeneratorContext {
*/ */
private String wfContext; private String wfContext;
/**
* 父工作流执行上下文
*/
private WorkflowExecutorContext parentWorkflowContext;
} }

View File

@ -54,9 +54,6 @@ public class JobTaskBatchHandler {
private final GroupConfigMapper groupConfigMapper; private final GroupConfigMapper groupConfigMapper;
private final List<JobExecutorResultHandler> resultHandlerList; private final List<JobExecutorResultHandler> resultHandlerList;
/**
* 处理任务批次结果
*/
@Transactional @Transactional
public boolean handleResult(CompleteJobBatchDTO completeJobBatchDTO) { public boolean handleResult(CompleteJobBatchDTO completeJobBatchDTO) {
Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null")); Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null"));

View File

@ -99,16 +99,10 @@ public class WorkflowBatchHandler {
return isNeedProcess; return isNeedProcess;
} }
/**
* 工作流执行完成
*/
public boolean complete(Long workflowTaskBatchId) { public boolean complete(Long workflowTaskBatchId) {
return complete(workflowTaskBatchId, null); return complete(workflowTaskBatchId, null);
} }
/**
* 工作流执行完成
*/
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) { public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) {
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
@ -181,15 +175,12 @@ public class WorkflowBatchHandler {
} }
/**
* 修改工作流任务批次状态
*/
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) { private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(workflowTaskBatchId); jobTaskBatch.setId(workflowTaskBatchId);
jobTaskBatch.setTaskBatchStatus(taskStatus); jobTaskBatch.setTaskBatchStatus(taskStatus);
// jobTaskBatch.setOperationReason(operationReason); jobTaskBatch.setOperationReason(operationReason);
workflowTaskBatchMapper.updateById(jobTaskBatch); workflowTaskBatchMapper.updateById(jobTaskBatch);
} }
@ -321,9 +312,6 @@ public class WorkflowBatchHandler {
} }
} }
/**
* 开启下一个工作流节点
*/
public void openNextNode(WorkflowNodeTaskExecuteDTO taskExecuteDTO) { public void openNextNode(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(taskExecuteDTO.getParentId()) || Objects.isNull(taskExecuteDTO.getWorkflowTaskBatchId()) || Long.valueOf(0).equals(taskExecuteDTO.getWorkflowTaskBatchId())) { if (Objects.isNull(taskExecuteDTO.getParentId()) || Objects.isNull(taskExecuteDTO.getWorkflowTaskBatchId()) || Long.valueOf(0).equals(taskExecuteDTO.getWorkflowTaskBatchId())) {
return; return;
@ -342,16 +330,12 @@ public class WorkflowBatchHandler {
} }
} }
/**
* 通知工作流执行器
* @see com.aizuda.snailjob.server.job.task.support.dispatch.WorkflowExecutorActor
*/
private void tellWorkflowTaskExecutor(WorkflowNodeTaskExecuteDTO taskExecuteDTO) { private void tellWorkflowTaskExecutor(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
try { try {
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("Task scheduling execution failed", e); //任务调度执行失败 SnailJobLog.LOCAL.error("Task scheduling execution failed", e);
} }
} }

View File

@ -15,7 +15,7 @@ import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
/** /**
* 处理处于已完成 {@link JobTaskBatchStatusEnum::COMPLETED} 完成状态的任务 * 处理处于已完成 {@link JobTaskBatchStatusEnum::COMPLETED} 状态的任务
* *
* @author opensnail * @author opensnail
* @date 2023-10-02 10:16:28 * @date 2023-10-02 10:16:28
@ -29,9 +29,6 @@ public class TerminalJobPrepareHandler extends AbstractJobPrepareHandler {
@Autowired @Autowired
private JobTaskBatchGenerator jobTaskBatchGenerator; private JobTaskBatchGenerator jobTaskBatchGenerator;
/**
* 任务完成状态
*/
@Override @Override
public boolean matches(Integer status) { public boolean matches(Integer status) {
return COMPLETED.contains(status); return COMPLETED.contains(status);

View File

@ -13,7 +13,7 @@ import java.text.MessageFormat;
import java.time.Duration; import java.time.Duration;
/** /**
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}待处理状态的任务 * 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务
* *
* @author opensnail * @author opensnail
* @date 2023-10-05 18:29:22 * @date 2023-10-05 18:29:22

View File

@ -32,9 +32,6 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
private final WorkflowBatchHandler workflowBatchHandler; private final WorkflowBatchHandler workflowBatchHandler;
/**
* 运行中的任务处理
*/
@Override @Override
public boolean matches(Integer status) { public boolean matches(Integer status) {
return Objects.nonNull(status) && JobTaskBatchStatusEnum.RUNNING.getStatus() == status; return Objects.nonNull(status) && JobTaskBatchStatusEnum.RUNNING.getStatus() == status;

View File

@ -14,7 +14,7 @@ import java.time.Duration;
import java.util.Objects; import java.util.Objects;
/** /**
* 处理处于{@link JobTaskBatchStatusEnum::WAITING}待处理 状态的任务 * 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务
* *
* @author xiaowoniu * @author xiaowoniu
* @date 2023-10-05 18:29:22 * @date 2023-10-05 18:29:22

View File

@ -23,8 +23,6 @@ import java.util.Optional;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT;
/** /**
* 客户端执行完成后上报结果
*
* @author opensnail * @author opensnail
* @date 2023-09-30 23:01:58 * @date 2023-09-30 23:01:58
* @since 2.4.0 * @since 2.4.0

View File

@ -84,7 +84,7 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
doHandleSuccess(context); doHandleSuccess(context);
} }
// 开启下一个工作流节点如果需要 // 开启下一个工作流节点
openNextWorkflowNode(context); openNextWorkflowNode(context);
boolean res = updateStatus(context, taskBatchStatus); boolean res = updateStatus(context, taskBatchStatus);
@ -104,9 +104,6 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
workflowBatchHandler.openNextNode(taskExecuteDTO); workflowBatchHandler.openNextNode(taskExecuteDTO);
} }
/**
* 更新任务批次状态
*/
protected boolean updateStatus(final JobExecutorResultContext context, final Integer taskBatchStatus) { protected boolean updateStatus(final JobExecutorResultContext context, final Integer taskBatchStatus) {
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(context.getTaskBatchId()); jobTaskBatch.setId(context.getTaskBatchId());
@ -138,9 +135,6 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
instanceInterrupt.stop(stopJobContext); instanceInterrupt.stop(stopJobContext);
} }
/**
* 成功除MapReduce任务外啥也没干
*/
protected abstract void doHandleSuccess(final JobExecutorResultContext context); protected abstract void doHandleSuccess(final JobExecutorResultContext context);
protected abstract void doHandleStop(final JobExecutorResultContext context); protected abstract void doHandleStop(final JobExecutorResultContext context);

View File

@ -18,7 +18,6 @@ import java.text.MessageFormat;
import java.util.Objects; import java.util.Objects;
/** /**
* 工作流任务超时检查任务
* @author opensnail * @author opensnail
* @date 2024-05-20 22:25:12 * @date 2024-05-20 22:25:12
* @since sj_1.0.0 * @since sj_1.0.0

View File

@ -33,7 +33,7 @@ public class WorkflowTimerTask implements TimerTask<String> {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(workflowTimerTaskDTO.getTaskExecutorScene());//执行策略 taskExecuteDTO.setTaskExecutorScene(workflowTimerTaskDTO.getTaskExecutorScene());
taskExecuteDTO.setParentId(SystemConstants.ROOT); taskExecuteDTO.setParentId(SystemConstants.ROOT);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);

View File

@ -4,18 +4,6 @@ server:
context-path: /snail-job context-path: /snail-job
spring: spring:
# mail:
# host: smtp.qq.com
# port: 465
# username: nianqing_16@qq.com
# password: yojeelyaxrvacihg
# protocol: smtp
# ssl:
# enable: true
# properties:
# mail.smtp.auth: true
# mail.smtp.starttls.enable: true
main: main:
banner-mode: off banner-mode: off
profiles: profiles:
@ -94,19 +82,3 @@ snail-job:
server-port: 17888 # 服务器端口 server-port: 17888 # 服务器端口
log-storage: 7 # 日志保存时间(单位: day) log-storage: 7 # 日志保存时间(单位: day)
rpc-type: grpc rpc-type: grpc
mail:
enabled: true
host: smtp.qq.com
port: 465
user: nianqing_16@qq.com
pass: yojeelyaxrvacihg
from: nianqing_16@qq.com
sslEnable: true
starttlsEnable: false
timeout: 5000
connectionTimeout: 5000
properties:
mail.smtp.auth: true
auth: true

View File

@ -12,7 +12,6 @@ import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
import com.aizuda.snailjob.server.web.service.WorkflowService; import com.aizuda.snailjob.server.web.service.WorkflowService;
import com.aizuda.snailjob.server.web.util.ExportUtils; import com.aizuda.snailjob.server.web.util.ExportUtils;
import com.aizuda.snailjob.server.web.util.ImportUtils; import com.aizuda.snailjob.server.web.util.ImportUtils;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotEmpty;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -49,36 +48,6 @@ public class WorkflowController {
return workflowService.listPage(queryVO); return workflowService.listPage(queryVO);
} }
/***
* 获取当前id历史版本
* @param id
* @return
*/
@GetMapping("/history/{id}")
@LoginRequired(role = RoleEnum.USER)
public List<WorkflowHistory> historyVos(@PathVariable("id") Long id) {
return workflowService.getWorkflowHistory(id);
}
/***
* 获取当前id历史版本 详细
* @param id
* @param version
* @return
*/
@GetMapping("/historyDetail/{id}")
@LoginRequired(role = RoleEnum.USER)
public WorkflowDetailResponseVO historyDetail(@PathVariable("id") Long id, @RequestParam("version") String version) {
return workflowService.getWorkflowHistoryDetail(id,version);
}
@GetMapping("/history/del/{id}")
@LoginRequired(role = RoleEnum.USER)
public Boolean deleteById(@PathVariable("id") Long id, @RequestParam("version") String version) {
return workflowService.deleteHistoryById(id,version);
}
@PutMapping @PutMapping
@LoginRequired(role = RoleEnum.USER) @LoginRequired(role = RoleEnum.USER)
public Boolean updateWorkflow(@RequestBody @Validated WorkflowRequestVO workflowRequestVO) { public Boolean updateWorkflow(@RequestBody @Validated WorkflowRequestVO workflowRequestVO) {

View File

@ -85,11 +85,6 @@ public class JobResponseVO {
*/ */
private Integer executorTimeout; private Integer executorTimeout;
/**
* 前置任务执行超时时间单位秒
*/
private Integer executorTimeoutFront;
/** /**
* 最大重试次数 * 最大重试次数
*/ */

View File

@ -6,7 +6,6 @@ import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.*; import com.aizuda.snailjob.server.web.model.request.*;
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO; import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO; import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotEmpty;
@ -43,10 +42,4 @@ public interface WorkflowService {
); );
Boolean deleteByIds(Set<Long> ids); Boolean deleteByIds(Set<Long> ids);
List<WorkflowHistory> getWorkflowHistory(Long id);
WorkflowDetailResponseVO getWorkflowHistoryDetail(Long id, String version);
Boolean deleteHistoryById(Long id, String version);
} }

View File

@ -24,24 +24,18 @@ public interface JobConverter {
JobConverter INSTANCE = Mappers.getMapper(JobConverter.class); JobConverter INSTANCE = Mappers.getMapper(JobConverter.class);
@Mappings({
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
})
List<JobRequestVO> convertList(List<Job> jobs); List<JobRequestVO> convertList(List<Job> jobs);
@Mappings({ @Mappings({
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIdsStr(jobRequestVO.getNotifyIds()))"), @Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
@Mapping(source = "executorTimeout", target = "executorTimeoutFront"),
@Mapping(target = "executorTimeout", expression = "java( (jobRequestVO.getExecutorTimeout() != null&&jobRequestVO.getExecutorTimeout() <= 0) ? Integer.MAX_VALUE : jobRequestVO.getExecutorTimeout())")
})
Job convert(JobRequestVO jobRequestVO);
@Mappings({
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))"),
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
}) })
JobRequestVO convert(Job job); JobRequestVO convert(Job job);
@Mappings({
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIdsStr(jobRequestVO.getNotifyIds()))")
})
Job convert(JobRequestVO jobRequestVO);
static Set<Long> toNotifyIds(String notifyIds) { static Set<Long> toNotifyIds(String notifyIds) {
if (StrUtil.isBlank(notifyIds)) { if (StrUtil.isBlank(notifyIds)) {
return new HashSet<>(); return new HashSet<>();
@ -57,5 +51,4 @@ public interface JobConverter {
return JsonUtil.toJsonString(notifyIds); return JsonUtil.toJsonString(notifyIds);
} }
} }

View File

@ -23,15 +23,13 @@ public interface JobResponseVOConverter {
JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class); JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class);
@Mappings({ @Mappings({
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))"), @Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
}) })
List<JobResponseVO> convertList(List<Job> jobs); List<JobResponseVO> convertList(List<Job> jobs);
@Mappings({ @Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))"), @Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))"),
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))"), @Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
}) })
JobResponseVO convert(Job job); JobResponseVO convert(Job job);

View File

@ -6,7 +6,6 @@ import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.expression.ExpressionEngine; import com.aizuda.snailjob.common.core.expression.ExpressionEngine;
import com.aizuda.snailjob.common.core.expression.ExpressionFactory; import com.aizuda.snailjob.common.core.expression.ExpressionFactory;
@ -43,7 +42,10 @@ import com.aizuda.snailjob.server.web.service.handler.GroupHandler;
import com.aizuda.snailjob.server.common.handler.WorkflowHandler; import com.aizuda.snailjob.server.common.handler.WorkflowHandler;
import com.aizuda.snailjob.server.web.util.UserSessionUtils; import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.*; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobSummaryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.*; import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -56,12 +58,10 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -78,7 +78,6 @@ import java.util.stream.Collectors;
public class WorkflowServiceImpl implements WorkflowService { public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowMapper workflowMapper; private final WorkflowMapper workflowMapper;
private final WorkflowHistoryMapper workflowHistoryMapper;
private final WorkflowNodeMapper workflowNodeMapper; private final WorkflowNodeMapper workflowNodeMapper;
private final SystemProperties systemProperties; private final SystemProperties systemProperties;
private final WorkflowHandler workflowHandler; private final WorkflowHandler workflowHandler;
@ -148,14 +147,6 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setVersion(null); workflow.setVersion(null);
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph))); workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new SnailJobServerException("Failed to save workflow graph")); Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new SnailJobServerException("Failed to save workflow graph"));
//准备数据到履历表add 20250519
WorkflowHistory history = new WorkflowHistory();
Workflow workflow1 = workflowMapper.selectById(workflow.getId());
BeanUtils.copyProperties(workflow1, history);
Assert.isTrue(1 == workflowHistoryMapper.insert(history), () -> new SnailJobServerException("Failed to save workflowHistory graph"));
return true; return true;
} }
@ -244,14 +235,6 @@ public class WorkflowServiceImpl implements WorkflowService {
.eq(Workflow::getVersion, version)) > 0, .eq(Workflow::getVersion, version)) > 0,
() -> new SnailJobServerException("Update failed")); () -> new SnailJobServerException("Update failed"));
//工作流表更新后插入履历表
//准备数据到履历表add 20250520
WorkflowHistory history = new WorkflowHistory();
Workflow workflow1 = workflowMapper.selectById(workflow.getId());
BeanUtils.copyProperties(workflow1, history);
history.setCreateDt(LocalDateTime.now());
Assert.isTrue(1 == workflowHistoryMapper.insert(history), () -> new SnailJobServerException("Failed to save workflowHistory graph"));
return Boolean.TRUE; return Boolean.TRUE;
} }
@ -272,9 +255,6 @@ public class WorkflowServiceImpl implements WorkflowService {
return 1 == workflowMapper.updateById(workflow); return 1 == workflowMapper.updateById(workflow);
} }
/**
* 手动触发工作流
*/
@Override @Override
public Boolean trigger(WorkflowTriggerVO triggerVO) { public Boolean trigger(WorkflowTriggerVO triggerVO) {
Workflow workflow = workflowMapper.selectById(triggerVO.getWorkflowId()); Workflow workflow = workflowMapper.selectById(triggerVO.getWorkflowId());
@ -294,8 +274,6 @@ public class WorkflowServiceImpl implements WorkflowService {
// 设置now表示立即执行 // 设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli()); prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType()); prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
// 操作原因手动触发
prepareDTO.setOperationReason(JobOperationReasonEnum.MANUAL_TRIGGER.getReason());
String tmpWfContext = triggerVO.getTmpWfContext(); String tmpWfContext = triggerVO.getTmpWfContext();
if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){ if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){
prepareDTO.setWfContext(tmpWfContext); prepareDTO.setWfContext(tmpWfContext);
@ -399,35 +377,6 @@ public class WorkflowServiceImpl implements WorkflowService {
return Boolean.TRUE; return Boolean.TRUE;
} }
@Override
public List<WorkflowHistory> getWorkflowHistory(Long id) {
return workflowHistoryMapper.selectList(new LambdaQueryWrapper<WorkflowHistory>().eq(WorkflowHistory::getId, id).orderByDesc(WorkflowHistory::getCreateDt));
}
@Override
public WorkflowDetailResponseVO getWorkflowHistoryDetail(Long id, String version) {
WorkflowHistory workflowHistory = workflowHistoryMapper.selectOne(
new LambdaQueryWrapper<WorkflowHistory>()
.eq(WorkflowHistory::getId, id)
.eq(WorkflowHistory::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.eq(WorkflowHistory::getVersion, version)
);
if (Objects.isNull(workflowHistory)) {
return null;
}
Workflow workflow = new Workflow();
BeanUtils.copyProperties(workflowHistory, workflow);
return doGetWorkflowDetail(workflow);
}
@Override
public Boolean deleteHistoryById(Long id, String version) {
return workflowHistoryMapper.delete(new LambdaQueryWrapper<WorkflowHistory>()
.eq(WorkflowHistory::getId, id)
.eq(WorkflowHistory::getVersion, version)) > 0;
}
private void batchSaveWorkflowTask(final List<WorkflowRequestVO> workflowRequestVOList, final String namespaceId) { private void batchSaveWorkflowTask(final List<WorkflowRequestVO> workflowRequestVOList, final String namespaceId) {
Set<String> groupNameSet = StreamUtils.toSet(workflowRequestVOList, WorkflowRequestVO::getGroupName); Set<String> groupNameSet = StreamUtils.toSet(workflowRequestVOList, WorkflowRequestVO::getGroupName);