Compare commits
1 Commits
dev
...
1.5.0-beta
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a4f1c8193c |
@ -1,22 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<trees>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java" title="工作流执行"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobTaskPrepareActor.java" title="调度任务准备阶段"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java" title="处理工作流中各种节点类型"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java" title="回调节点"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java" title="条件节点"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java" title="任务节点"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowWorkflowExecutor.java" title="工作流节点"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow" title="工作流节点"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java" title="集群任务"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/BroadcastTaskJobExecutor.java" title="广播任务"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapJobExecutor.java" title="Map任务"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java" title="MapReduce任务"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/RequestClientActor.java" title="调用客户端执行任务"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ShardingJobExecutor.java" title="分片任务"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job" title="任务执行"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java" title="工作流任务超时检查"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java" title="客户端执行任务完成回调"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClusterClientCallbackHandler.java" title="集群任务执行结果回调"/>
|
||||
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java" title="任务执行结果处理"/>
|
||||
</trees>
|
@ -311,8 +311,7 @@ CREATE TABLE `sj_job`
|
||||
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
|
||||
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
|
||||
`block_strategy` tinyint(4) NOT NULL DEFAULT 1 COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行 4、恢复',
|
||||
`executor_timeout_front` int(11) NOT NULL DEFAULT 60 COMMENT '(front)任务执行超时时间,单位秒',
|
||||
`executor_timeout` int(11) NOT NULL DEFAULT 60 COMMENT '任务执行超时时间,单位秒',
|
||||
`executor_timeout` int(11) NOT NULL DEFAULT 0 COMMENT '任务执行超时时间,单位秒',
|
||||
`max_retry_times` int(11) NOT NULL DEFAULT 0 COMMENT '最大重试次数',
|
||||
`parallel_num` int(11) NOT NULL DEFAULT 1 COMMENT '并行数',
|
||||
`retry_interval` int(11) NOT NULL DEFAULT 0 COMMENT '重试间隔(s)',
|
||||
@ -532,37 +531,3 @@ CREATE TABLE `sj_workflow_task_batch`
|
||||
) ENGINE = InnoDB
|
||||
AUTO_INCREMENT = 0
|
||||
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流批次';
|
||||
|
||||
create table sj_workflow_history
|
||||
(
|
||||
id bigint not null comment '主键',
|
||||
workflow_name varchar(64) not null comment '工作流名称',
|
||||
namespace_id varchar(64) default '764d604ec6fc45f68cd92514c40e9e1a' not null comment '命名空间id',
|
||||
group_name varchar(64) not null comment '组名称',
|
||||
workflow_status tinyint default 1 not null comment '工作流状态 0、关闭、1、开启',
|
||||
trigger_type tinyint not null comment '触发类型 1.CRON 表达式 2. 固定时间',
|
||||
trigger_interval varchar(255) not null comment '间隔时长',
|
||||
next_trigger_at bigint not null comment '下次触发时间',
|
||||
block_strategy tinyint default 1 not null comment '阻塞策略 1、丢弃 2、覆盖 3、并行',
|
||||
executor_timeout int default 0 not null comment '任务执行超时时间,单位秒',
|
||||
description varchar(256) default '' not null comment '描述',
|
||||
flow_info text null comment '流程信息',
|
||||
wf_context text null comment '上下文',
|
||||
notify_ids varchar(128) default '' not null comment '通知告警场景配置id列表',
|
||||
bucket_index int default 0 not null comment 'bucket',
|
||||
version int not null comment '版本号',
|
||||
ext_attrs varchar(256) default '' null comment '扩展字段',
|
||||
deleted tinyint default 0 not null comment '逻辑删除 1、删除',
|
||||
create_dt datetime default CURRENT_TIMESTAMP not null comment '创建时间',
|
||||
update_dt datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间',
|
||||
primary key (id, version)
|
||||
)ENGINE = InnoDB
|
||||
AUTO_INCREMENT = 0
|
||||
DEFAULT CHARSET = utf8mb4
|
||||
comment '工作流履历表';
|
||||
|
||||
create index idx_create_dt
|
||||
on sj_workflow_history (create_dt);
|
||||
|
||||
create index idx_namespace_id_group_name
|
||||
on sj_workflow_history (namespace_id, group_name);
|
||||
|
@ -9,6 +9,7 @@ import com.aizuda.snailjob.common.core.util.SnailJobFileUtil;
|
||||
import com.aizuda.snailjob.common.core.util.SnailJobSystemUtil;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import lombok.Data;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.Charset;
|
||||
@ -17,11 +18,11 @@ import java.nio.file.Files;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class AbstractScriptExecutor {
|
||||
public abstract class AbstractScriptExecutor implements InitializingBean {
|
||||
|
||||
protected static final String SH_SHELL = "/bin/sh";
|
||||
|
||||
private static final String WORKER_DIR = SnailFileUtils.workspace() + "/script_processor/";
|
||||
private static String WORKER_DIR;
|
||||
|
||||
// 下载脚本模式
|
||||
private static final String SCRIPT_DOWNLOAD_METHOD = "DOWNLOAD";
|
||||
@ -290,4 +291,9 @@ public abstract class AbstractScriptExecutor {
|
||||
private String scriptParams;
|
||||
private String charset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
WORKER_DIR = SnailFileUtils.workspace() + "/script_processor/";
|
||||
}
|
||||
}
|
||||
|
@ -225,11 +225,6 @@ public interface SystemConstants {
|
||||
*/
|
||||
Long CALLBACK_JOB_ID = -2000L;
|
||||
|
||||
/**
|
||||
* 系统内置的工作流任务ID
|
||||
*/
|
||||
Long WORKFLOW_JOB_ID = -3000L;
|
||||
|
||||
/**
|
||||
* 客户端返回的非json对象,单值比如 "aa", 123等
|
||||
*/
|
||||
|
@ -18,89 +18,26 @@ import java.util.List;
|
||||
@Getter
|
||||
public enum JobOperationReasonEnum {
|
||||
|
||||
/*
|
||||
* 定义任务执行状态枚举
|
||||
*/
|
||||
/**
|
||||
* 任务执行状态:无
|
||||
*/
|
||||
NONE(0, StrUtil.EMPTY),
|
||||
/**
|
||||
* 任务执行状态:任务执行超时
|
||||
*/
|
||||
TASK_EXECUTION_TIMEOUT(1, "Task execution timeout"),
|
||||
/**
|
||||
* 任务执行状态:没有客户端节点
|
||||
*/
|
||||
NOT_CLIENT(2, "No client nodes"),
|
||||
/**
|
||||
* 任务执行状态:任务已关闭
|
||||
*/
|
||||
JOB_CLOSED(3, "JOB closed"),
|
||||
/**
|
||||
* 任务执行状态:任务被丢弃
|
||||
*/
|
||||
JOB_DISCARD(4, "Task discarded"),
|
||||
/**
|
||||
* 任务执行状态:任务被覆盖
|
||||
*/
|
||||
JOB_OVERLAY(5, "Task overridden"),
|
||||
/**
|
||||
* 任务执行状态:没有可执行的任务项
|
||||
*/
|
||||
NOT_EXECUTION_TASK(6, "No executable task items"),
|
||||
/**
|
||||
* 任务执行状态:任务执行过程中发生意外异常
|
||||
*/
|
||||
TASK_EXECUTION_ERROR(7, "Unexpected exception occurred during task execution"),
|
||||
/**
|
||||
* 任务执行状态:手动停止
|
||||
*/
|
||||
MANNER_STOP(8, "Manual stop"),
|
||||
/**
|
||||
* 工作流条件节点执行异常
|
||||
*/
|
||||
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR(9, "Condition node execution exception"),
|
||||
/**
|
||||
* 任务被中断
|
||||
*/
|
||||
JOB_TASK_INTERRUPTED(10, "Task interrupted"),
|
||||
/**
|
||||
* 工作流回调节点执行异常
|
||||
*/
|
||||
WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR(11, "Callback node execution exception"),
|
||||
/**
|
||||
* 工作流节点无需操作
|
||||
*/
|
||||
WORKFLOW_NODE_NO_REQUIRED(12, "No action required"),
|
||||
/**
|
||||
* 工作流节点已关闭,跳过执行
|
||||
*/
|
||||
WORKFLOW_NODE_CLOSED_SKIP_EXECUTION(13, "Node closed, skipped execution"),
|
||||
/**
|
||||
* 工作流决策未通过
|
||||
*/
|
||||
WORKFLOW_DECISION_FAILED(14, "Judgment not passed"),
|
||||
/**
|
||||
* 手动调用
|
||||
*/
|
||||
MANUAL_TRIGGER(15, "Manual call"),
|
||||
/**
|
||||
* 由工作流中被调用
|
||||
*/
|
||||
WORKFLOW_CALLED(16, "Called by workflow"),
|
||||
|
||||
|
||||
;
|
||||
|
||||
/**
|
||||
* 原因
|
||||
*/
|
||||
private final int reason;
|
||||
|
||||
/**
|
||||
* 描述
|
||||
*/
|
||||
private final String desc;
|
||||
|
||||
/**
|
||||
|
@ -50,9 +50,6 @@ public enum JobTaskBatchStatusEnum {
|
||||
|
||||
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
|
||||
|
||||
/**
|
||||
* 任务完成 状态 包含 SUCCESS 3, FAIL 4, STOP 5, CANCEL 6
|
||||
*/
|
||||
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status);
|
||||
|
||||
public static final List<Integer> NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status, CANCEL.status);
|
||||
|
@ -8,37 +8,15 @@ import lombok.Getter;
|
||||
* @author opensnail
|
||||
* @date 2023-10-02 10:39:22
|
||||
* @since 2.4.0
|
||||
*
|
||||
* 任务类型
|
||||
* 1. 集群任务 ClusterJobExecutor
|
||||
* 2. 广播任务 BroadcastTaskJobExecutor
|
||||
* 3. 静态分片任务 ShardingJobExecutor
|
||||
* 4. Map 任务 MapJobExecutor
|
||||
* 5. MapReduce 任务 MapReduceJobExecutor
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
public enum JobTaskTypeEnum {
|
||||
|
||||
/**
|
||||
* 集群任务
|
||||
*/
|
||||
CLUSTER(1),
|
||||
/**
|
||||
* 广播任务
|
||||
*/
|
||||
BROADCAST(2),
|
||||
/**
|
||||
* 静态分片任务
|
||||
*/
|
||||
SHARDING(3),
|
||||
/**
|
||||
* Map 任务
|
||||
*/
|
||||
MAP(4),
|
||||
/**
|
||||
* MapReduce 任务
|
||||
*/
|
||||
MAP_REDUCE(5),
|
||||
;
|
||||
|
||||
|
@ -13,13 +13,7 @@ import lombok.Getter;
|
||||
@Getter
|
||||
public enum StatusEnum {
|
||||
|
||||
/**
|
||||
* 0、关闭、1、开启
|
||||
*/
|
||||
NO(0),
|
||||
/**
|
||||
* 0、关闭、1、开启
|
||||
*/
|
||||
YES(1);
|
||||
|
||||
private final Integer status;
|
||||
|
@ -4,7 +4,7 @@ import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 1、任务节点 2、条件节点 3、回调节点 4、工作流节点
|
||||
* 1、任务节点 2、条件节点 3、回调节点
|
||||
*
|
||||
* @author xiaowoniu
|
||||
* @date 2023-12-24 08:13:43
|
||||
@ -13,22 +13,9 @@ import lombok.Getter;
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
public enum WorkflowNodeTypeEnum {
|
||||
/**
|
||||
* 任务节点
|
||||
*/
|
||||
JOB_TASK(1, "JOB task"),
|
||||
/**
|
||||
* 条件节点
|
||||
*/
|
||||
DECISION(2, "Decision node"),
|
||||
/**
|
||||
* 回调节点
|
||||
*/
|
||||
CALLBACK(3, "Callback node"),
|
||||
/**
|
||||
* 工作流节点
|
||||
*/
|
||||
WORKFLOW(4, "Workflow node"),
|
||||
;
|
||||
|
||||
private final int type;
|
||||
|
@ -1,15 +0,0 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.mapper;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
|
||||
/**
|
||||
* @author zjw
|
||||
* @description: TODO
|
||||
* @date 2025/05/19
|
||||
*/
|
||||
@Mapper
|
||||
public interface WorkflowHistoryMapper extends BaseMapper<WorkflowHistory> {
|
||||
}
|
@ -102,11 +102,6 @@ public class Job extends CreateUpdateDt {
|
||||
*/
|
||||
private Integer executorTimeout;
|
||||
|
||||
/**
|
||||
* 前端设置的任务执行超时时间,单位秒
|
||||
*/
|
||||
private Integer executorTimeoutFront;
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
|
@ -1,112 +0,0 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.po;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
|
||||
/**
|
||||
* * 工作流履历表
|
||||
* </p>
|
||||
*
|
||||
* @author : xiaowoniu
|
||||
* @date : 2023-12-12
|
||||
* @since : 2.6.0
|
||||
*/
|
||||
@Data
|
||||
@TableName("sj_workflow_history")
|
||||
@EqualsAndHashCode(callSuper=true)
|
||||
public class WorkflowHistory extends CreateUpdateDt{
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@TableId(value = "id")
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 工作流名称
|
||||
*/
|
||||
private String workflowName;
|
||||
|
||||
/**
|
||||
* 命名空间id
|
||||
*/
|
||||
private String namespaceId;
|
||||
|
||||
/**
|
||||
* 组名称
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 触发类型
|
||||
*/
|
||||
private Integer triggerType;
|
||||
|
||||
/**
|
||||
* 阻塞策略
|
||||
*/
|
||||
private Integer blockStrategy;
|
||||
|
||||
/**
|
||||
* 触发间隔
|
||||
*/
|
||||
private String triggerInterval;
|
||||
|
||||
/**
|
||||
* 执行超时时间
|
||||
*/
|
||||
private Integer executorTimeout;
|
||||
|
||||
/**
|
||||
* 工作流状态 0、关闭、1、开启
|
||||
*/
|
||||
private Integer workflowStatus;
|
||||
|
||||
/**
|
||||
* 任务执行时间
|
||||
*/
|
||||
private Long nextTriggerAt;
|
||||
|
||||
/**
|
||||
* 流程信息
|
||||
*/
|
||||
private String flowInfo;
|
||||
|
||||
/**
|
||||
* bucket
|
||||
*/
|
||||
private Integer bucketIndex;
|
||||
|
||||
/**
|
||||
* 描述
|
||||
*/
|
||||
private String description;
|
||||
|
||||
/**
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wfContext;
|
||||
|
||||
/**
|
||||
* 版本号
|
||||
*/
|
||||
private Integer version;
|
||||
|
||||
/**
|
||||
* 扩展字段
|
||||
*/
|
||||
private String extAttrs;
|
||||
|
||||
/**
|
||||
* 逻辑删除 1、删除
|
||||
*/
|
||||
private Integer deleted;
|
||||
|
||||
/**
|
||||
* 通知告警场景配置id列表
|
||||
*/
|
||||
private String notifyIds;
|
||||
}
|
@ -7,7 +7,6 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
|
||||
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowBatchResponseVO;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
|
||||
@ -55,8 +54,7 @@ public interface WorkflowConverter {
|
||||
@Mappings({
|
||||
@Mapping(target = "decision", expression = "java(WorkflowConverter.parseDecisionConfig(workflowNode))"),
|
||||
@Mapping(target = "callback", expression = "java(WorkflowConverter.parseCallbackConfig(workflowNode))"),
|
||||
@Mapping(target = "jobTask", expression = "java(WorkflowConverter.parseJobTaskConfig(workflowNode))"),
|
||||
@Mapping(target = "subWorkflow", expression = "java(WorkflowConverter.parseWorkflowConfig(workflowNode))")
|
||||
@Mapping(target = "jobTask", expression = "java(WorkflowConverter.parseJobTaskConfig(workflowNode))")
|
||||
})
|
||||
WorkflowDetailResponseVO.NodeInfo convert(WorkflowNode workflowNode);
|
||||
|
||||
@ -112,13 +110,6 @@ public interface WorkflowConverter {
|
||||
return null;
|
||||
}
|
||||
|
||||
static WorkflowConfig parseWorkflowConfig(WorkflowNode workflowNode) {
|
||||
if (WorkflowNodeTypeEnum.WORKFLOW.getType() == workflowNode.getNodeType()){
|
||||
return JsonUtil.parseObject(workflowNode.getNodeInfo(), WorkflowConfig.class);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
static Set<Long> toNotifyIds(String notifyIds) {
|
||||
if (StrUtil.isBlank(notifyIds)) {
|
||||
return new HashSet<>();
|
||||
|
@ -1,20 +0,0 @@
|
||||
package com.aizuda.snailjob.server.common.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 工作流配置
|
||||
*/
|
||||
@Data
|
||||
public class WorkflowConfig {
|
||||
|
||||
/**
|
||||
* ID
|
||||
*/
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 名称
|
||||
*/
|
||||
private String name;
|
||||
}
|
@ -14,26 +14,10 @@ import lombok.Getter;
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum JobTaskExecutorSceneEnum {
|
||||
/**
|
||||
* 自动触发任务
|
||||
*/
|
||||
AUTO_JOB(1, SyetemTaskTypeEnum.JOB),
|
||||
/**
|
||||
* 手动触发任务
|
||||
*/
|
||||
MANUAL_JOB(2, SyetemTaskTypeEnum.JOB),
|
||||
/**
|
||||
* 自动触发工作流
|
||||
*/
|
||||
AUTO_WORKFLOW(3, SyetemTaskTypeEnum.WORKFLOW),
|
||||
/**
|
||||
* 手动触发工作流
|
||||
*/
|
||||
MANUAL_WORKFLOW(4, SyetemTaskTypeEnum.WORKFLOW),
|
||||
/**
|
||||
* 工作流触发工作流
|
||||
*/
|
||||
WORKFLOW_WORKFLOW(5, SyetemTaskTypeEnum.WORKFLOW),
|
||||
;
|
||||
|
||||
private final Integer type;
|
||||
|
@ -8,7 +8,6 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
|
||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||
import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
|
||||
@ -162,7 +161,9 @@ public class WorkflowHandler {
|
||||
Assert.notNull(decision.getDefaultDecision(), () -> new SnailJobServerException("Default decision for [{}] cannot be empty", nodeInfo.getNodeName()));
|
||||
Assert.notNull(decision.getExpressionType(), () -> new SnailJobServerException("Expression type for [{}] cannot be empty", nodeInfo.getNodeName()));
|
||||
workflowNode.setNodeInfo(JsonUtil.toJsonString(decision));
|
||||
}else if (WorkflowNodeTypeEnum.CALLBACK.getType() == nodeConfig.getNodeType()) {
|
||||
}
|
||||
|
||||
if (WorkflowNodeTypeEnum.CALLBACK.getType() == nodeConfig.getNodeType()) {
|
||||
workflowNode.setJobId(SystemConstants.CALLBACK_JOB_ID);
|
||||
CallbackConfig callback = nodeInfo.getCallback();
|
||||
Assert.notNull(callback, () -> new SnailJobServerException("Configuration information for [{}] cannot be empty", nodeInfo.getNodeName()));
|
||||
@ -170,18 +171,13 @@ public class WorkflowHandler {
|
||||
Assert.notNull(callback.getContentType(), () -> new SnailJobServerException("Request type for [{}] cannot be empty", nodeInfo.getNodeName()));
|
||||
Assert.notBlank(callback.getSecret(), () -> new SnailJobServerException("Secret key for [{}] cannot be empty", nodeInfo.getNodeName()));
|
||||
workflowNode.setNodeInfo(JsonUtil.toJsonString(callback));
|
||||
}else if (WorkflowNodeTypeEnum.JOB_TASK.getType() == nodeConfig.getNodeType()) {
|
||||
}
|
||||
|
||||
if (WorkflowNodeTypeEnum.JOB_TASK.getType() == nodeConfig.getNodeType()) {
|
||||
JobTaskConfig jobTask = nodeInfo.getJobTask();
|
||||
Assert.notNull(jobTask, () -> new SnailJobServerException("Configuration information for [{}] cannot be empty", nodeInfo.getNodeName()));
|
||||
Assert.notNull(jobTask.getJobId(), () -> new SnailJobServerException("Associated task for [{}] cannot be empty", nodeInfo.getNodeName()));
|
||||
workflowNode.setJobId(jobTask.getJobId());
|
||||
}else if (WorkflowNodeTypeEnum.WORKFLOW.getType() == nodeConfig.getNodeType()) {
|
||||
// TODO 工作流结点
|
||||
WorkflowConfig workflow = nodeInfo.getSubWorkflow();
|
||||
workflowNode.setJobId(SystemConstants.WORKFLOW_JOB_ID);
|
||||
workflowNode.setNodeInfo(JsonUtil.toJsonString(workflow));
|
||||
} else {
|
||||
throw new SnailJobServerException("Unsupported node type [{}]", nodeConfig.getNodeType());
|
||||
}
|
||||
|
||||
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode),
|
||||
|
@ -230,7 +230,6 @@ public class ActorGenerator {
|
||||
/**
|
||||
* Job调度准备阶段actor
|
||||
*
|
||||
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobTaskPrepareActor
|
||||
* @return actor 引用
|
||||
*/
|
||||
public static ActorRef jobTaskPrepareActor() {
|
||||
@ -261,7 +260,7 @@ public class ActorGenerator {
|
||||
|
||||
/**
|
||||
* Job任务执行阶段actor
|
||||
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorActor
|
||||
*
|
||||
* @return actor 引用
|
||||
*/
|
||||
public static ActorRef jobTaskExecutorActor() {
|
||||
@ -275,7 +274,6 @@ public class ActorGenerator {
|
||||
/**
|
||||
* Job任务执行阶段actor
|
||||
*
|
||||
* @see com.aizuda.snailjob.server.job.task.support.dispatch.WorkflowExecutorActor
|
||||
* @return actor 引用
|
||||
*/
|
||||
public static ActorRef workflowTaskExecutorActor() {
|
||||
@ -288,7 +286,7 @@ public class ActorGenerator {
|
||||
|
||||
/**
|
||||
* Job任务执行结果actor
|
||||
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorResultActor
|
||||
*
|
||||
* @return actor 引用
|
||||
*/
|
||||
public static ActorRef jobTaskExecutorResultActor() {
|
||||
@ -300,8 +298,6 @@ public class ActorGenerator {
|
||||
/**
|
||||
* Job任务向客户端发起请求阶段actor
|
||||
*
|
||||
* @see com.aizuda.snailjob.server.job.task.support.executor.job.RequestClientActor
|
||||
*
|
||||
* @return actor 引用
|
||||
*/
|
||||
public static ActorRef jobRealTaskExecutorActor() {
|
||||
|
@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.common.vo;
|
||||
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
@ -145,11 +144,6 @@ public class WorkflowDetailResponseVO {
|
||||
*/
|
||||
private JobTaskConfig jobTask;
|
||||
|
||||
/**
|
||||
* 工作流配置
|
||||
*/
|
||||
private WorkflowConfig subWorkflow;
|
||||
|
||||
/**
|
||||
* 定时任务批次信息
|
||||
*/
|
||||
|
@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.common.vo.request;
|
||||
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
|
||||
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@ -108,7 +107,7 @@ public class WorkflowRequestVO {
|
||||
private Integer priorityLevel;
|
||||
|
||||
/**
|
||||
* 子节点(下一个节点)
|
||||
* 子节点
|
||||
*/
|
||||
private NodeConfig childNode;
|
||||
|
||||
@ -131,11 +130,6 @@ public class WorkflowRequestVO {
|
||||
* 回调配置
|
||||
*/
|
||||
private CallbackConfig callback;
|
||||
|
||||
/**
|
||||
* 工作流配置
|
||||
*/
|
||||
private WorkflowConfig subWorkflow;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -19,9 +19,6 @@ public class WorkflowNodeTaskExecuteDTO {
|
||||
*/
|
||||
private Integer taskExecutorScene;
|
||||
|
||||
/**
|
||||
* 前一个节点 id
|
||||
*/
|
||||
private Long parentId;
|
||||
|
||||
/**
|
||||
|
@ -1,6 +1,5 @@
|
||||
package com.aizuda.snailjob.server.job.task.dto;
|
||||
|
||||
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
@ -16,7 +15,7 @@ public class WorkflowTaskPrepareDTO {
|
||||
private Long workflowId;
|
||||
|
||||
/**
|
||||
* 执行策略 1、auto 2、manual 3、auto_workflow 4、manual_workflow
|
||||
* 执行策略 1、auto 2、manual 3、workflow
|
||||
*/
|
||||
private Integer taskExecutorScene;
|
||||
|
||||
@ -65,11 +64,6 @@ public class WorkflowTaskPrepareDTO {
|
||||
*/
|
||||
private long nextTriggerAt;
|
||||
|
||||
/**
|
||||
* 操作原因
|
||||
*/
|
||||
private Integer operationReason;
|
||||
|
||||
/**
|
||||
* 任务执行时间
|
||||
*/
|
||||
@ -84,9 +78,4 @@ public class WorkflowTaskPrepareDTO {
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wfContext;
|
||||
|
||||
/**
|
||||
* 父工作流上下文
|
||||
*/
|
||||
private WorkflowExecutorContext parentWfContext;
|
||||
}
|
||||
|
@ -28,8 +28,6 @@ import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 客户端执行任务完成回调处理
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2023-10-03 23:12:33
|
||||
* @since 2.4.0
|
||||
@ -130,9 +128,6 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 判定是否需要重试
|
||||
*/
|
||||
private boolean isNeedRetry(ClientCallbackContext context) {
|
||||
|
||||
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
|
||||
|
@ -18,9 +18,6 @@ public class ClientCallbackFactory {
|
||||
CACHE.put(taskInstanceType, callbackHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据任务类型获取任务执行器
|
||||
*/
|
||||
public static ClientCallbackHandler getClientCallback(Integer type) {
|
||||
return CACHE.get(JobTaskTypeEnum.valueOf(type));
|
||||
}
|
||||
|
@ -15,8 +15,6 @@ import org.springframework.stereotype.Component;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 集群任务执行结果回调处理
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2023-10-03 23:12:12
|
||||
* @since 2.4.0
|
||||
@ -46,10 +44,6 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
|
||||
return ClientInfoUtils.generate(serverNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用Job任务执行结果actor
|
||||
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorResultActor
|
||||
*/
|
||||
@Override
|
||||
protected void doCallback(ClientCallbackContext context) {
|
||||
|
||||
|
@ -50,7 +50,6 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import scala.Int;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.time.Duration;
|
||||
@ -110,7 +109,6 @@ public class JobExecutorActor extends AbstractActor {
|
||||
Job job = jobMapper.selectOne(queryWrapper.eq(Job::getId, taskExecute.getJobId()));
|
||||
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
|
||||
try {
|
||||
// 操作原因
|
||||
int operationReason = JobOperationReasonEnum.NONE.getReason();
|
||||
if (Objects.isNull(job)) {
|
||||
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
|
||||
|
@ -22,8 +22,6 @@ import org.springframework.stereotype.Component;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 任务执行结果处理
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2023-10-05 17:16:35
|
||||
* @since 2.4.0
|
||||
@ -76,9 +74,6 @@ public class JobExecutorResultActor extends AbstractActor {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试完成任务
|
||||
*/
|
||||
private void tryCompleteAndStop(JobExecutorResultDTO jobExecutorResultDTO) {
|
||||
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(jobExecutorResultDTO);
|
||||
jobTaskBatchHandler.handleResult(completeJobBatchDTO);
|
||||
|
@ -53,7 +53,7 @@ public class JobTaskPrepareActor extends AbstractActor {
|
||||
private void doPrepare(JobTaskPrepareDTO prepare) {
|
||||
LambdaQueryWrapper<JobTaskBatch> queryWrapper = new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getJobId, prepare.getJobId())
|
||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE);//未完成状态
|
||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE);
|
||||
|
||||
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
|
||||
prepare.getTaskExecutorScene());
|
||||
|
@ -47,8 +47,6 @@ import java.util.stream.Collectors;
|
||||
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
|
||||
|
||||
/**
|
||||
* 工作流执行器
|
||||
*
|
||||
* @author: xiaowoniu
|
||||
* @date : 2023-12-22 10:34
|
||||
* @since : 2.6.0
|
||||
@ -93,7 +91,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
|
||||
private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) {
|
||||
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
|
||||
Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("Task does not exist,WorkflowTaskBatchId:"+taskExecute.getWorkflowTaskBatchId()));
|
||||
Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("Task does not exist"));
|
||||
|
||||
if (SystemConstants.ROOT.equals(taskExecute.getParentId())
|
||||
&& JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {
|
||||
@ -222,16 +220,6 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 填充父工作流节点的执行原因
|
||||
* 该方法旨在处理父工作流节点的执行原因,以决定是否继续执行后续节点
|
||||
* 它通过检查是否存在不应跳过的执行原因来确定是否应该继续执行
|
||||
*
|
||||
* @param allJobTaskBatchList 包含所有作业任务批次的列表
|
||||
* @param parentJobTaskBatchList 包含父工作流节点相关作业任务批次的列表
|
||||
* @param parentWorkflowNode 父工作流节点
|
||||
* @param context 工作流执行的上下文环境
|
||||
*/
|
||||
private static void fillParentOperationReason(final List<JobTaskBatch> allJobTaskBatchList,
|
||||
final List<JobTaskBatch> parentJobTaskBatchList, final WorkflowNode parentWorkflowNode,
|
||||
final WorkflowExecutorContext context) {
|
||||
@ -257,9 +245,6 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 前置任务是否已完成
|
||||
*/
|
||||
private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors,
|
||||
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode,
|
||||
Map<Long, WorkflowNode> workflowNodeMap) {
|
||||
@ -308,12 +293,6 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新工作流任务批次
|
||||
*
|
||||
* @param taskStatus 任务批次状态
|
||||
* @param operationReason 操作原因
|
||||
*/
|
||||
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
|
||||
|
||||
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
||||
|
@ -10,15 +10,6 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
* @author opensnail
|
||||
* @date 2023-10-03 22:13:04
|
||||
* @since 2.4.0
|
||||
*
|
||||
* 任务类型
|
||||
* 1. 集群任务 ClusterJobExecutor
|
||||
* @see com.aizuda.snailjob.server.job.task.support.executor.job.ClusterJobExecutor
|
||||
* 2. 广播任务 BroadcastTaskJobExecutor
|
||||
* 3. 静态分片任务 ShardingJobExecutor
|
||||
* 4. Map 任务 MapJobExecutor
|
||||
* 5. MapReduce 任务 MapReduceJobExecutor
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean {
|
||||
|
||||
|
@ -17,8 +17,6 @@ import java.util.List;
|
||||
* @author opensnail
|
||||
* @date 2023-10-06 10:27:26
|
||||
* @since 2.4.0
|
||||
*
|
||||
* 广播任务执行器
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
|
@ -18,7 +18,6 @@ import java.util.List;
|
||||
* @author opensnail
|
||||
* @date 2023-10-03 22:12:40
|
||||
* @since 2.4.0
|
||||
* 集群任务执行器
|
||||
*/
|
||||
@Component
|
||||
public class ClusterJobExecutor extends AbstractJobExecutor {
|
||||
|
@ -6,8 +6,6 @@ import org.springframework.stereotype.Component;
|
||||
/**
|
||||
* @author: shuguang.zhang
|
||||
* @date : 2024-06-19
|
||||
*
|
||||
* Map 任务
|
||||
*/
|
||||
@Component
|
||||
public class MapJobExecutor extends MapReduceJobExecutor {
|
||||
|
@ -16,8 +16,6 @@ import java.util.List;
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-12
|
||||
* @since : sj_1.1.0
|
||||
*
|
||||
* MapReduce 任务执行器
|
||||
*/
|
||||
@Component
|
||||
public class MapReduceJobExecutor extends AbstractJobExecutor {
|
||||
|
@ -40,8 +40,6 @@ import java.util.Objects;
|
||||
* @author opensnail
|
||||
* @date 2023-10-06 16:42:08
|
||||
* @since 2.4.0
|
||||
*
|
||||
* 调用客户端执行任务
|
||||
*/
|
||||
@Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR)
|
||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
|
@ -17,8 +17,6 @@ import java.util.List;
|
||||
* @author opensnail
|
||||
* @date 2023-10-06 17:33:51
|
||||
* @since 2.4.0
|
||||
*
|
||||
* 分片任务执行器
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
|
@ -37,12 +37,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
|
||||
* @author xiaowoniu
|
||||
* @date 2023-12-24 08:15:19
|
||||
* @since 2.6.0
|
||||
*
|
||||
* 抽象工作流执行器
|
||||
*<br/>
|
||||
* 1.任务节点 JobTaskWorkflowExecutor
|
||||
* 2.决策节点 @see com.aizuda.snailjob.server.job.task.support.executor.workflow.DecisionWorkflowExecutor
|
||||
* 3.回调通知节点 @see com.aizuda.snailjob.server.job.task.support.executor.workflow.CallbackWorkflowExecutor
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {
|
||||
|
@ -34,8 +34,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
|
||||
* @author xiaowoniu
|
||||
* @date 2023-12-24 08:18:06
|
||||
* @since 2.6.0
|
||||
*
|
||||
* 回调通知节点
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
|
@ -34,8 +34,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
|
||||
* @author xiaowoniu
|
||||
* @date 2023-12-24 08:17:11
|
||||
* @since 2.6.0
|
||||
*
|
||||
* 决策节点执行器
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
|
@ -22,8 +22,6 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
|
||||
* @author xiaowoniu
|
||||
* @date 2023-12-24 08:09:14
|
||||
* @since 2.6.0
|
||||
*
|
||||
* 任务节点执行器
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@ -75,9 +73,6 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobTaskPrepareActor
|
||||
*/
|
||||
private static void invokeJobTask(final WorkflowExecutorContext context) {
|
||||
// 生成任务批次
|
||||
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob(), context);
|
||||
|
@ -11,9 +11,6 @@ import lombok.Data;
|
||||
@Data
|
||||
public class WorkflowExecutorContext {
|
||||
|
||||
/**
|
||||
* 命名空间id
|
||||
*/
|
||||
private String namespaceId;
|
||||
|
||||
/**
|
||||
|
@ -12,9 +12,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
*/
|
||||
public class WorkflowExecutorFactory {
|
||||
|
||||
/**
|
||||
* Map<节点类型,节点执行类>
|
||||
*/
|
||||
private static final ConcurrentHashMap<WorkflowNodeTypeEnum, WorkflowExecutor> CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
protected static void registerJobExecutor(WorkflowNodeTypeEnum workflowNodeTypeEnum, WorkflowExecutor executor) {
|
||||
|
@ -1,104 +0,0 @@
|
||||
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
||||
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
|
||||
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
|
||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||
import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler;
|
||||
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
|
||||
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
|
||||
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
|
||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||
private final WorkflowBatchHandler workflowBatchHandler;
|
||||
private final JobTaskBatchHandler jobTaskBatchHandler;
|
||||
private final JobMapper jobMapper;
|
||||
private final WorkflowMapper workflowMapper;
|
||||
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
|
||||
|
||||
@Override
|
||||
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
||||
return WorkflowNodeTypeEnum.WORKFLOW;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doPreValidate(WorkflowExecutorContext context) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecute(WorkflowExecutorContext context) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeExecute(WorkflowExecutorContext context) {
|
||||
context.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
|
||||
context.setOperationReason(JobOperationReasonEnum.NONE.getReason());
|
||||
context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
|
||||
|
||||
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
|
||||
|
||||
JobTask jobTask = generateJobTask(context, jobTaskBatch);
|
||||
|
||||
JobLogMetaDTO jobLogMetaDTO = new JobLogMetaDTO();
|
||||
jobLogMetaDTO.setNamespaceId(context.getNamespaceId());
|
||||
jobLogMetaDTO.setGroupName(context.getGroupName());
|
||||
jobLogMetaDTO.setTaskBatchId(jobTaskBatch.getId());
|
||||
jobLogMetaDTO.setJobId(SystemConstants.WORKFLOW_JOB_ID);
|
||||
jobLogMetaDTO.setTaskId(jobTask.getId());
|
||||
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
|
||||
SnailJobLog.REMOTE.info("Node [{}] workflow success.\nworkflow params: {} \nworkflow result: [{}] <|>{}<|>", context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
|
||||
} else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) {
|
||||
if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(context.getParentOperationReason())) {
|
||||
SnailJobLog.REMOTE.warn("Node [{}] cancels workflow. Cancellation reason: Current task does not require processing <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO);
|
||||
} else {
|
||||
SnailJobLog.REMOTE.warn("Node [{}] cancels workflow. Cancellation reason: Task status is closed <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO);
|
||||
}
|
||||
} else {
|
||||
SnailJobLog.REMOTE.error("Node [{}] fail to workflow.\nReason: {} <|>{}<|>", context.getWorkflowNodeId(), context.getLogMessage(), jobLogMetaDTO);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(WorkflowExecutorContext context) {
|
||||
WorkflowConfig nodeInfo = JsonUtil.parseObject(context.getNodeInfo(), WorkflowConfig.class);
|
||||
Workflow workflow = workflowMapper.selectById(nodeInfo.getId());
|
||||
Assert.notNull(workflow, () -> new SnailJobServerException("workflow can not be null."));
|
||||
|
||||
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
|
||||
// 下次触发时间:设置now表示立即执行
|
||||
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
|
||||
// 执行策略:工作流触发工作流
|
||||
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.WORKFLOW_WORKFLOW.getType());
|
||||
// 操作原因:工作流调用
|
||||
prepareDTO.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLED.getReason());
|
||||
prepareDTO.setParentWfContext(context);
|
||||
terminalWorkflowPrepareHandler.handler(prepareDTO);
|
||||
}
|
||||
|
||||
}
|
@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.job.task.support.generator.batch;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO;
|
||||
@ -29,10 +28,6 @@ import java.util.Optional;
|
||||
public class WorkflowBatchGenerator {
|
||||
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||
|
||||
/**
|
||||
* 生成工作流批次,插入数据库
|
||||
* 开始执行工作流
|
||||
*/
|
||||
public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) {
|
||||
|
||||
// 生成任务批次
|
||||
@ -40,7 +35,6 @@ public class WorkflowBatchGenerator {
|
||||
workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
|
||||
workflowTaskBatch.setOperationReason(context.getOperationReason());
|
||||
workflowTaskBatch.setWfContext(context.getWfContext());
|
||||
workflowTaskBatch.setExtAttrs(JsonUtil.toJsonString(context.getParentWorkflowContext()));
|
||||
|
||||
Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new SnailJobServerException("Adding new scheduling task failed. [{}]", context.getWorkflowId()));
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
package com.aizuda.snailjob.server.job.task.support.generator.batch;
|
||||
|
||||
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
@ -39,7 +38,7 @@ public class WorkflowTaskBatchGeneratorContext {
|
||||
private Integer taskBatchStatus;
|
||||
|
||||
/**
|
||||
* 执行策略 1、auto 2、manual 3、auto_workflow 4、manual_workflow
|
||||
* 执行策略 1、auto 2、manual 3、workflow
|
||||
*/
|
||||
private Integer taskExecutorScene;
|
||||
|
||||
@ -53,9 +52,5 @@ public class WorkflowTaskBatchGeneratorContext {
|
||||
*/
|
||||
private String wfContext;
|
||||
|
||||
/**
|
||||
* 父工作流执行上下文
|
||||
*/
|
||||
private WorkflowExecutorContext parentWorkflowContext;
|
||||
|
||||
}
|
||||
|
@ -54,9 +54,6 @@ public class JobTaskBatchHandler {
|
||||
private final GroupConfigMapper groupConfigMapper;
|
||||
private final List<JobExecutorResultHandler> resultHandlerList;
|
||||
|
||||
/**
|
||||
* 处理任务批次结果
|
||||
*/
|
||||
@Transactional
|
||||
public boolean handleResult(CompleteJobBatchDTO completeJobBatchDTO) {
|
||||
Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null"));
|
||||
|
@ -99,16 +99,10 @@ public class WorkflowBatchHandler {
|
||||
return isNeedProcess;
|
||||
}
|
||||
|
||||
/**
|
||||
* 工作流执行完成
|
||||
*/
|
||||
public boolean complete(Long workflowTaskBatchId) {
|
||||
return complete(workflowTaskBatchId, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 工作流执行完成
|
||||
*/
|
||||
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) {
|
||||
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
|
||||
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
|
||||
@ -181,15 +175,12 @@ public class WorkflowBatchHandler {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改工作流任务批次状态
|
||||
*/
|
||||
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
|
||||
|
||||
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
||||
jobTaskBatch.setId(workflowTaskBatchId);
|
||||
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||
// jobTaskBatch.setOperationReason(operationReason);
|
||||
jobTaskBatch.setOperationReason(operationReason);
|
||||
workflowTaskBatchMapper.updateById(jobTaskBatch);
|
||||
}
|
||||
|
||||
@ -321,9 +312,6 @@ public class WorkflowBatchHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 开启下一个工作流节点
|
||||
*/
|
||||
public void openNextNode(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
|
||||
if (Objects.isNull(taskExecuteDTO.getParentId()) || Objects.isNull(taskExecuteDTO.getWorkflowTaskBatchId()) || Long.valueOf(0).equals(taskExecuteDTO.getWorkflowTaskBatchId())) {
|
||||
return;
|
||||
@ -342,16 +330,12 @@ public class WorkflowBatchHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通知工作流执行器
|
||||
* @see com.aizuda.snailjob.server.job.task.support.dispatch.WorkflowExecutorActor
|
||||
*/
|
||||
private void tellWorkflowTaskExecutor(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
|
||||
try {
|
||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||
actorRef.tell(taskExecuteDTO, actorRef);
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.error("Task scheduling execution failed", e); //任务调度执行失败
|
||||
SnailJobLog.LOCAL.error("Task scheduling execution failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ import java.util.Objects;
|
||||
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
|
||||
|
||||
/**
|
||||
* 处理处于已完成 {@link JobTaskBatchStatusEnum::COMPLETED} 完成状态的任务
|
||||
* 处理处于已完成 {@link JobTaskBatchStatusEnum::COMPLETED} 状态的任务
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2023-10-02 10:16:28
|
||||
@ -29,9 +29,6 @@ public class TerminalJobPrepareHandler extends AbstractJobPrepareHandler {
|
||||
@Autowired
|
||||
private JobTaskBatchGenerator jobTaskBatchGenerator;
|
||||
|
||||
/**
|
||||
* 任务完成状态
|
||||
*/
|
||||
@Override
|
||||
public boolean matches(Integer status) {
|
||||
return COMPLETED.contains(status);
|
||||
|
@ -13,7 +13,7 @@ import java.text.MessageFormat;
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}待处理状态的任务
|
||||
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2023-10-05 18:29:22
|
||||
|
@ -32,9 +32,6 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
|
||||
|
||||
private final WorkflowBatchHandler workflowBatchHandler;
|
||||
|
||||
/**
|
||||
* 运行中的任务处理
|
||||
*/
|
||||
@Override
|
||||
public boolean matches(Integer status) {
|
||||
return Objects.nonNull(status) && JobTaskBatchStatusEnum.RUNNING.getStatus() == status;
|
||||
|
@ -14,7 +14,7 @@ import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 处理处于{@link JobTaskBatchStatusEnum::WAITING}待处理 状态的任务
|
||||
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务
|
||||
*
|
||||
* @author xiaowoniu
|
||||
* @date 2023-10-05 18:29:22
|
||||
|
@ -23,8 +23,6 @@ import java.util.Optional;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT;
|
||||
|
||||
/**
|
||||
* 客户端执行完成后上报结果
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2023-09-30 23:01:58
|
||||
* @since 2.4.0
|
||||
|
@ -84,7 +84,7 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
|
||||
doHandleSuccess(context);
|
||||
}
|
||||
|
||||
// 开启下一个工作流节点(如果需要)
|
||||
// 开启下一个工作流节点
|
||||
openNextWorkflowNode(context);
|
||||
|
||||
boolean res = updateStatus(context, taskBatchStatus);
|
||||
@ -104,9 +104,6 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
|
||||
workflowBatchHandler.openNextNode(taskExecuteDTO);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新任务批次状态
|
||||
*/
|
||||
protected boolean updateStatus(final JobExecutorResultContext context, final Integer taskBatchStatus) {
|
||||
JobTaskBatch jobTaskBatch = new JobTaskBatch();
|
||||
jobTaskBatch.setId(context.getTaskBatchId());
|
||||
@ -138,9 +135,6 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
|
||||
instanceInterrupt.stop(stopJobContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* 成功(除MapReduce任务外啥也没干)
|
||||
*/
|
||||
protected abstract void doHandleSuccess(final JobExecutorResultContext context);
|
||||
|
||||
protected abstract void doHandleStop(final JobExecutorResultContext context);
|
||||
|
@ -18,7 +18,6 @@ import java.text.MessageFormat;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 工作流任务超时检查任务
|
||||
* @author opensnail
|
||||
* @date 2024-05-20 22:25:12
|
||||
* @since sj_1.0.0
|
||||
|
@ -33,7 +33,7 @@ public class WorkflowTimerTask implements TimerTask<String> {
|
||||
|
||||
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
|
||||
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
|
||||
taskExecuteDTO.setTaskExecutorScene(workflowTimerTaskDTO.getTaskExecutorScene());//执行策略
|
||||
taskExecuteDTO.setTaskExecutorScene(workflowTimerTaskDTO.getTaskExecutorScene());
|
||||
taskExecuteDTO.setParentId(SystemConstants.ROOT);
|
||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||
actorRef.tell(taskExecuteDTO, actorRef);
|
||||
|
@ -4,18 +4,6 @@ server:
|
||||
context-path: /snail-job
|
||||
|
||||
spring:
|
||||
# mail:
|
||||
# host: smtp.qq.com
|
||||
# port: 465
|
||||
# username: nianqing_16@qq.com
|
||||
# password: yojeelyaxrvacihg
|
||||
# protocol: smtp
|
||||
# ssl:
|
||||
# enable: true
|
||||
# properties:
|
||||
# mail.smtp.auth: true
|
||||
# mail.smtp.starttls.enable: true
|
||||
|
||||
main:
|
||||
banner-mode: off
|
||||
profiles:
|
||||
@ -94,19 +82,3 @@ snail-job:
|
||||
server-port: 17888 # 服务器端口
|
||||
log-storage: 7 # 日志保存时间(单位: day)
|
||||
rpc-type: grpc
|
||||
|
||||
mail:
|
||||
enabled: true
|
||||
host: smtp.qq.com
|
||||
port: 465
|
||||
user: nianqing_16@qq.com
|
||||
pass: yojeelyaxrvacihg
|
||||
from: nianqing_16@qq.com
|
||||
sslEnable: true
|
||||
starttlsEnable: false
|
||||
timeout: 5000
|
||||
connectionTimeout: 5000
|
||||
properties:
|
||||
mail.smtp.auth: true
|
||||
auth: true
|
||||
|
||||
|
@ -12,7 +12,6 @@ import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
|
||||
import com.aizuda.snailjob.server.web.service.WorkflowService;
|
||||
import com.aizuda.snailjob.server.web.util.ExportUtils;
|
||||
import com.aizuda.snailjob.server.web.util.ImportUtils;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.http.MediaType;
|
||||
@ -49,36 +48,6 @@ public class WorkflowController {
|
||||
return workflowService.listPage(queryVO);
|
||||
}
|
||||
|
||||
/***
|
||||
* 获取当前id历史版本
|
||||
* @param id
|
||||
* @return
|
||||
*/
|
||||
@GetMapping("/history/{id}")
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
public List<WorkflowHistory> historyVos(@PathVariable("id") Long id) {
|
||||
return workflowService.getWorkflowHistory(id);
|
||||
}
|
||||
|
||||
/***
|
||||
* 获取当前id历史版本 详细
|
||||
* @param id
|
||||
* @param version
|
||||
* @return
|
||||
*/
|
||||
@GetMapping("/historyDetail/{id}")
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
public WorkflowDetailResponseVO historyDetail(@PathVariable("id") Long id, @RequestParam("version") String version) {
|
||||
return workflowService.getWorkflowHistoryDetail(id,version);
|
||||
}
|
||||
|
||||
@GetMapping("/history/del/{id}")
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
public Boolean deleteById(@PathVariable("id") Long id, @RequestParam("version") String version) {
|
||||
return workflowService.deleteHistoryById(id,version);
|
||||
}
|
||||
|
||||
|
||||
@PutMapping
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
public Boolean updateWorkflow(@RequestBody @Validated WorkflowRequestVO workflowRequestVO) {
|
||||
|
@ -85,11 +85,6 @@ public class JobResponseVO {
|
||||
*/
|
||||
private Integer executorTimeout;
|
||||
|
||||
/**
|
||||
* 前置任务执行超时时间,单位秒
|
||||
*/
|
||||
private Integer executorTimeoutFront;
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
|
@ -6,7 +6,6 @@ import com.aizuda.snailjob.server.web.model.base.PageResult;
|
||||
import com.aizuda.snailjob.server.web.model.request.*;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
|
||||
@ -43,10 +42,4 @@ public interface WorkflowService {
|
||||
);
|
||||
|
||||
Boolean deleteByIds(Set<Long> ids);
|
||||
|
||||
List<WorkflowHistory> getWorkflowHistory(Long id);
|
||||
|
||||
WorkflowDetailResponseVO getWorkflowHistoryDetail(Long id, String version);
|
||||
|
||||
Boolean deleteHistoryById(Long id, String version);
|
||||
}
|
||||
|
@ -24,24 +24,18 @@ public interface JobConverter {
|
||||
|
||||
JobConverter INSTANCE = Mappers.getMapper(JobConverter.class);
|
||||
|
||||
@Mappings({
|
||||
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
|
||||
})
|
||||
List<JobRequestVO> convertList(List<Job> jobs);
|
||||
|
||||
@Mappings({
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIdsStr(jobRequestVO.getNotifyIds()))"),
|
||||
@Mapping(source = "executorTimeout", target = "executorTimeoutFront"),
|
||||
@Mapping(target = "executorTimeout", expression = "java( (jobRequestVO.getExecutorTimeout() != null&&jobRequestVO.getExecutorTimeout() <= 0) ? Integer.MAX_VALUE : jobRequestVO.getExecutorTimeout())")
|
||||
})
|
||||
Job convert(JobRequestVO jobRequestVO);
|
||||
|
||||
@Mappings({
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))"),
|
||||
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
|
||||
})
|
||||
JobRequestVO convert(Job job);
|
||||
|
||||
@Mappings({
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIdsStr(jobRequestVO.getNotifyIds()))")
|
||||
})
|
||||
Job convert(JobRequestVO jobRequestVO);
|
||||
|
||||
static Set<Long> toNotifyIds(String notifyIds) {
|
||||
if (StrUtil.isBlank(notifyIds)) {
|
||||
return new HashSet<>();
|
||||
@ -57,5 +51,4 @@ public interface JobConverter {
|
||||
|
||||
return JsonUtil.toJsonString(notifyIds);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,15 +23,13 @@ public interface JobResponseVOConverter {
|
||||
JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class);
|
||||
|
||||
@Mappings({
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))"),
|
||||
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
|
||||
})
|
||||
List<JobResponseVO> convertList(List<Job> jobs);
|
||||
|
||||
@Mappings({
|
||||
@Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))"),
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))"),
|
||||
@Mapping(source = "executorTimeoutFront", target = "executorTimeout")
|
||||
@Mapping(target = "notifyIds", expression = "java(JobConverter.toNotifyIds(job.getNotifyIds()))")
|
||||
})
|
||||
JobResponseVO convert(Job job);
|
||||
|
||||
|
@ -6,7 +6,6 @@ import cn.hutool.core.lang.Pair;
|
||||
import cn.hutool.core.util.HashUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.expression.ExpressionEngine;
|
||||
import com.aizuda.snailjob.common.core.expression.ExpressionFactory;
|
||||
@ -43,7 +42,10 @@ import com.aizuda.snailjob.server.web.service.handler.GroupHandler;
|
||||
import com.aizuda.snailjob.server.common.handler.WorkflowHandler;
|
||||
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
|
||||
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.*;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobSummaryMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.*;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
@ -56,12 +58,10 @@ import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.stream.Collectors;
|
||||
@ -78,7 +78,6 @@ import java.util.stream.Collectors;
|
||||
public class WorkflowServiceImpl implements WorkflowService {
|
||||
|
||||
private final WorkflowMapper workflowMapper;
|
||||
private final WorkflowHistoryMapper workflowHistoryMapper;
|
||||
private final WorkflowNodeMapper workflowNodeMapper;
|
||||
private final SystemProperties systemProperties;
|
||||
private final WorkflowHandler workflowHandler;
|
||||
@ -148,14 +147,6 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
workflow.setVersion(null);
|
||||
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
|
||||
Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new SnailJobServerException("Failed to save workflow graph"));
|
||||
|
||||
//准备数据到履历表add 20250519
|
||||
WorkflowHistory history = new WorkflowHistory();
|
||||
Workflow workflow1 = workflowMapper.selectById(workflow.getId());
|
||||
BeanUtils.copyProperties(workflow1, history);
|
||||
|
||||
Assert.isTrue(1 == workflowHistoryMapper.insert(history), () -> new SnailJobServerException("Failed to save workflowHistory graph"));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -244,14 +235,6 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
.eq(Workflow::getVersion, version)) > 0,
|
||||
() -> new SnailJobServerException("Update failed"));
|
||||
|
||||
//工作流表更新后插入履历表
|
||||
//准备数据到履历表add 20250520
|
||||
WorkflowHistory history = new WorkflowHistory();
|
||||
Workflow workflow1 = workflowMapper.selectById(workflow.getId());
|
||||
BeanUtils.copyProperties(workflow1, history);
|
||||
history.setCreateDt(LocalDateTime.now());
|
||||
Assert.isTrue(1 == workflowHistoryMapper.insert(history), () -> new SnailJobServerException("Failed to save workflowHistory graph"));
|
||||
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
@ -272,9 +255,6 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
return 1 == workflowMapper.updateById(workflow);
|
||||
}
|
||||
|
||||
/**
|
||||
* 手动触发工作流
|
||||
*/
|
||||
@Override
|
||||
public Boolean trigger(WorkflowTriggerVO triggerVO) {
|
||||
Workflow workflow = workflowMapper.selectById(triggerVO.getWorkflowId());
|
||||
@ -294,8 +274,6 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
// 设置now表示立即执行
|
||||
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
|
||||
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
|
||||
// 操作原因:手动触发
|
||||
prepareDTO.setOperationReason(JobOperationReasonEnum.MANUAL_TRIGGER.getReason());
|
||||
String tmpWfContext = triggerVO.getTmpWfContext();
|
||||
if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){
|
||||
prepareDTO.setWfContext(tmpWfContext);
|
||||
@ -399,35 +377,6 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WorkflowHistory> getWorkflowHistory(Long id) {
|
||||
return workflowHistoryMapper.selectList(new LambdaQueryWrapper<WorkflowHistory>().eq(WorkflowHistory::getId, id).orderByDesc(WorkflowHistory::getCreateDt));
|
||||
}
|
||||
|
||||
@Override
|
||||
public WorkflowDetailResponseVO getWorkflowHistoryDetail(Long id, String version) {
|
||||
WorkflowHistory workflowHistory = workflowHistoryMapper.selectOne(
|
||||
new LambdaQueryWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getId, id)
|
||||
.eq(WorkflowHistory::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
|
||||
.eq(WorkflowHistory::getVersion, version)
|
||||
);
|
||||
if (Objects.isNull(workflowHistory)) {
|
||||
return null;
|
||||
}
|
||||
Workflow workflow = new Workflow();
|
||||
BeanUtils.copyProperties(workflowHistory, workflow);
|
||||
return doGetWorkflowDetail(workflow);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean deleteHistoryById(Long id, String version) {
|
||||
|
||||
return workflowHistoryMapper.delete(new LambdaQueryWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getId, id)
|
||||
.eq(WorkflowHistory::getVersion, version)) > 0;
|
||||
}
|
||||
|
||||
private void batchSaveWorkflowTask(final List<WorkflowRequestVO> workflowRequestVOList, final String namespaceId) {
|
||||
|
||||
Set<String> groupNameSet = StreamUtils.toSet(workflowRequestVOList, WorkflowRequestVO::getGroupName);
|
||||
|
Loading…
Reference in New Issue
Block a user