feat: 2.6.0

1. 重构表达式引擎
2. 添加回调对象
This commit is contained in:
byteblogs168 2023-12-30 18:38:51 +08:00
parent 9ad8e1bb1b
commit 8e9f932c16
50 changed files with 912 additions and 546 deletions

View File

@ -8,14 +8,14 @@ USE
CREATE TABLE `namespace`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(64) NOT NULL COMMENT '名称',
`unique_id` varchar(64) NOT NULL COMMENT '唯一id',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`name` varchar(64) NOT NULL COMMENT '名称',
`unique_id` varchar(64) NOT NULL COMMENT '唯一id',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
PRIMARY KEY (`id`),
KEY `idx_name` (`name`),
KEY `idx_name` (`name`),
UNIQUE KEY `uk_unique_id` (`unique_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='命名空间';
@ -27,7 +27,7 @@ VALUES (1, 'Default', '764d604ec6fc45f68cd92514c40e9e1a', now(), now(), 0);
CREATE TABLE `group_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '组描述',
`group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用',
@ -47,8 +47,8 @@ CREATE TABLE `group_config`
CREATE TABLE `notify_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
`notify_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知状态 0、未启用 1、启用',
@ -71,7 +71,7 @@ CREATE TABLE `notify_config`
CREATE TABLE `retry_dead_letter_0`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
@ -96,7 +96,7 @@ CREATE TABLE `retry_dead_letter_0`
CREATE TABLE `retry_task_0`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
@ -112,9 +112,9 @@ CREATE TABLE `retry_task_0`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`,`group_name`, `scene_name`),
KEY `idx_namespace_id_group_name_task_type` (`namespace_id`,`group_name`, `task_type`),
KEY `idx_namespace_id_group_name_retry_status` (`namespace_id`,`group_name`, `retry_status`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`),
KEY `idx_namespace_id_group_name_task_type` (`namespace_id`, `group_name`, `task_type`),
KEY `idx_namespace_id_group_name_retry_status` (`namespace_id`, `group_name`, `retry_status`),
KEY `idx_idempotent_id` (`idempotent_id`),
KEY `idx_biz_no` (`biz_no`),
KEY `idx_create_dt` (`create_dt`),
@ -127,7 +127,7 @@ CREATE TABLE `retry_task_0`
CREATE TABLE `retry_task_log`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
@ -140,7 +140,7 @@ CREATE TABLE `retry_task_log`
`task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_group_name_scene_name` (`namespace_id`,`group_name`, `scene_name`),
KEY `idx_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`),
KEY `idx_retry_status` (`retry_status`),
KEY `idx_idempotent_id` (`idempotent_id`),
KEY `idx_unique_id` (`unique_id`),
@ -153,15 +153,15 @@ CREATE TABLE `retry_task_log`
CREATE TABLE `retry_task_log_message`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`message` text NOT NULL COMMENT '异常信息',
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`message` text NOT NULL COMMENT '异常信息',
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`,`group_name`, `unique_id`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `unique_id`),
KEY `idx_create_dt` (`create_dt`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
@ -171,7 +171,7 @@ CREATE TABLE `retry_task_log_message`
CREATE TABLE `scene_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`scene_name` varchar(64) NOT NULL COMMENT '场景名称',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`scene_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用',
@ -194,7 +194,7 @@ CREATE TABLE `scene_config`
CREATE TABLE `server_node`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`host_id` varchar(64) NOT NULL COMMENT '主机id',
`host_ip` varchar(64) NOT NULL COMMENT '机器ip',
@ -206,7 +206,7 @@ CREATE TABLE `server_node`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name` (`namespace_id`,`group_name`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`),
KEY `idx_expire_at_node_type` (`expire_at`, `node_type`),
UNIQUE KEY `uk_host_id_host_ip` (`host_id`, `host_ip`)
) ENGINE = InnoDB
@ -251,7 +251,7 @@ CREATE TABLE `system_user_permission`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`system_user_id` bigint(20) NOT NULL COMMENT '系统用户id',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
@ -263,11 +263,11 @@ CREATE TABLE `system_user_permission`
CREATE TABLE `sequence_alloc`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称',
`max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '最大id',
`step` int(11) NOT NULL DEFAULT '100' COMMENT '步长',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称',
`max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '最大id',
`step` int(11) NOT NULL DEFAULT '100' COMMENT '步长',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
@ -277,7 +277,7 @@ CREATE TABLE `sequence_alloc`
CREATE TABLE `job`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_name` varchar(64) NOT NULL COMMENT '名称',
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
@ -313,7 +313,7 @@ CREATE TABLE `job`
CREATE TABLE `job_log_message`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
`task_batch_id` bigint(20) NOT NULL COMMENT '任务批次id',
@ -332,7 +332,7 @@ CREATE TABLE `job_log_message`
CREATE TABLE `job_task`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
`task_batch_id` bigint(20) NOT NULL COMMENT '调度任务id',
@ -356,26 +356,26 @@ CREATE TABLE `job_task`
CREATE TABLE `job_task_batch`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流节点id',
`parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务父批次id',
`workflow_task_batch_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流节点id',
`parent_workflow_node_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务父批次id',
`workflow_task_batch_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '工作流任务批次id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`),
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`),
UNIQUE KEY `uk_workflow_task_batch_id_workflow_node_id` (`workflow_task_batch_id`, `workflow_node_id`)
KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`),
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`),
UNIQUE KEY `uk_workflow_task_batch_id_workflow_node_id` (`workflow_task_batch_id`, `workflow_node_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次';
@ -383,24 +383,24 @@ CREATE TABLE `job_task_batch`
CREATE TABLE `job_notify_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`notify_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知状态 0、未启用 1、启用',
`notify_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信',
`notify_attribute` varchar(512) NOT NULL COMMENT '配置属性',
`notify_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '通知阈值',
`notify_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知场景',
`rate_limiter_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '限流状态 0、未启用 1、启用',
`rate_limiter_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '每秒限流阈值',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`notify_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知状态 0、未启用 1、启用',
`notify_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信',
`notify_attribute` varchar(512) NOT NULL COMMENT '配置属性',
`notify_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '通知阈值',
`notify_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知场景',
`rate_limiter_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '限流状态 0、未启用 1、启用',
`rate_limiter_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '每秒限流阈值',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_job_id` (`namespace_id`,`group_name`, job_id)
) ENGINE=InnoDB
AUTO_INCREMENT=4
DEFAULT CHARSET=utf8mb4 COMMENT='job通知配置';
KEY `idx_namespace_id_group_name_job_id` (`namespace_id`, `group_name`, job_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 4
DEFAULT CHARSET = utf8mb4 COMMENT ='job通知配置';
CREATE TABLE `job_summary`
(
@ -447,25 +447,26 @@ CREATE TABLE `retry_summary`
CREATE TABLE `workflow`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`workflow_name` varchar(64) NOT NULL COMMENT '工作流名称',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`workflow_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态 0、关闭、1、开启',
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
`workflow_name` varchar(64) NOT NULL COMMENT '工作流名称',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`workflow_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态 0、关闭、1、开启',
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
`trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
`next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
`block_strategy` tinyint(4) NOT NULL DEFAULT '1' COMMENT '阻塞策略 1、丢弃 2、覆盖 3、并行',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`flow_info` text DEFAULT NULL COMMENT '流程信息',
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`flow_info` text DEFAULT NULL COMMENT '流程信息',
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
`version` int(11) NOT NULL COMMENT '版本号',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流';
@ -473,24 +474,25 @@ CREATE TABLE `workflow`
CREATE TABLE `workflow_node`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`node_name` varchar(64) NOT NULL COMMENT '节点名称',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id',
`workflow_id` bigint(20) NOT NULL COMMENT '工作流ID',
`node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点',
`expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
`fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
`workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
`priority_level` int(11) NOT NULL DEFAULT 1 COMMENT '优先级',
`node_expression` text DEFAULT NULL COMMENT '节点表达式',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`node_name` varchar(64) NOT NULL COMMENT '节点名称',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务信息id',
`workflow_id` bigint(20) NOT NULL COMMENT '工作流ID',
`node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点',
`expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
`fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
`workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
`priority_level` int(11) NOT NULL DEFAULT 1 COMMENT '优先级',
`node_info` text DEFAULT NULL COMMENT '节点信息 ',
`version` int(11) NOT NULL COMMENT '版本号',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流节点';
@ -498,21 +500,21 @@ CREATE TABLE `workflow_node`
CREATE TABLE `workflow_task_batch`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`flow_info` text DEFAULT NULL COMMENT '流程信息',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`flow_info` text DEFAULT NULL COMMENT '流程信息',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
KEY `idx_job_id_task_batch_status` (`workflow_id`, `task_batch_status`),
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
KEY `idx_job_id_task_batch_status` (`workflow_id`, `task_batch_status`),
KEY `idx_create_dt` (`create_dt`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流批次';

View File

@ -1,24 +0,0 @@
package com.aizuda.easy.retry.client.core;
import java.lang.reflect.Method;
import java.util.Map;
/**
* 参数表达式解析引擎
*
* @author www.byteblogs.com
* @date 2023-09-10 12:30:23
* @since 2.3.0
*/
public interface ExpressionEngine {
/**
* 执行表达式
* @param expression 表达式
*
* @param args 参数信息
* @param method 方法对象
* @return 执行结果
*/
Object eval(String expression, Object[] args, Method method);
}

View File

@ -1,43 +0,0 @@
package com.aizuda.easy.retry.client.core.expression;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.core.ExpressionEngine;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.core.DefaultParameterNameDiscoverer;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* @author www.byteblogs.com
* @date 2023-09-10 12:31:17
* @since 2.3.0
*/
public abstract class AbstractExpressionEngine implements ExpressionEngine {
private static final DefaultParameterNameDiscoverer DISCOVERER = new DefaultParameterNameDiscoverer();
@Override
public Object eval(String expression, Object[] args, Method method) {
if (StringUtils.isBlank(expression)) {
return StrUtil.EMPTY;
}
// 获取参数名称
String[] paramNameArr = DISCOVERER.getParameterNames(method);
if (ArrayUtils.isEmpty(paramNameArr)) {
return null;
}
Map<String, Object> context = new HashMap<>(args.length);
for (int i = 0; i < paramNameArr.length; i++) {
context.put(paramNameArr[i], args[i]);
}
return doEval(expression, context);
}
protected abstract Object doEval(String expression, Map<String, Object> context);
}

View File

@ -0,0 +1,53 @@
package com.aizuda.easy.retry.client.core.expression;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import org.springframework.core.DefaultParameterNameDiscoverer;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* @author xiaowoniu
* @date 2023-12-30 17:22:51
* @since 2.6.0
*/
public class ExpressionInvocationHandler implements InvocationHandler {
private static final DefaultParameterNameDiscoverer DISCOVERER = new DefaultParameterNameDiscoverer();
private final Object expressionEngine;
public ExpressionInvocationHandler(Object expressionEngine) {
this.expressionEngine = expressionEngine;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 表达式
String expression = (String) args[0];
if (StrUtil.isBlank(expression)) {
return StrUtil.EMPTY;
}
// 表达式参数 params => 0: 重试方法的参数, 1: 重试方法
Object[] params = (Object[])args[1];
// 获取参数名称
String[] paramNameArr = DISCOVERER.getParameterNames((Method) params[1]);
if (ArrayUtil.isEmpty(paramNameArr)) {
return null;
}
// 重试方法的参数
Object[] methodArgs = (Object[]) params[0];
Map<String, Object> context = new HashMap<>(methodArgs.length);
for (int i = 0; i < paramNameArr.length; i++) {
context.put(paramNameArr[i], methodArgs[i]);
}
return method.invoke(expression, context);
}
}

View File

@ -1,16 +1,20 @@
package com.aizuda.easy.retry.client.core.loader;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.ServiceLoaderUtil;
import com.aizuda.easy.retry.client.core.ExpressionEngine;
import com.aizuda.easy.retry.client.core.expression.ExpressionInvocationHandler;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import com.aizuda.easy.retry.client.core.RetryArgSerializer;
import com.aizuda.easy.retry.client.core.event.EasyRetryListener;
import com.aizuda.easy.retry.client.core.event.SimpleEasyRetryListener;
import com.aizuda.easy.retry.client.core.RetrySiteSnapshotContext;
import com.aizuda.easy.retry.client.core.expression.SpELExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.ExpressionFactory;
import com.aizuda.easy.retry.common.core.expression.strategy.SpELExpressionEngine;
import com.aizuda.easy.retry.client.core.intercepter.ThreadLockRetrySiteSnapshotContext;
import com.aizuda.easy.retry.client.core.serializer.JacksonSerializer;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -66,7 +70,8 @@ public class EasyRetrySpiLoader {
* @return {@link SpELExpressionEngine} 默认序列化类为SpELExpressionEngine
*/
public static ExpressionEngine loadExpressionEngine() {
return Optional.ofNullable(ServiceLoaderUtil.loadFirst(ExpressionEngine.class)).orElse(new SpELExpressionEngine());
ExpressionEngine expressionEngine = Optional.ofNullable(ServiceLoaderUtil.loadFirst(ExpressionEngine.class)).orElse(new SpELExpressionEngine());
return ExpressionFactory.getExpressionEngine(new ExpressionInvocationHandler(expressionEngine));
}
}

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.client.core.report;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.core.ExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import com.aizuda.easy.retry.client.core.IdempotentIdGenerate;
import com.aizuda.easy.retry.client.core.Report;
import com.aizuda.easy.retry.client.core.RetryArgSerializer;

View File

@ -49,6 +49,20 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-crypto</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.aviator</groupId>
<artifactId>aviator</artifactId>
<version>5.3.3</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>QLExpress</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>

View File

@ -103,7 +103,12 @@ public interface SystemConstants {
Long ROOT = -1L;
/**
* 系统内置的条件任务ID
* 系统内置的决策任务ID
*/
Long CONDITION_JOB_ID = -1000L;
Long DECISION_JOB_ID = -1000L;
/**
* 系统内置的回调任务ID
*/
Long CALLBACK_JOB_ID = -2000L;
}

View File

@ -12,7 +12,7 @@ import lombok.Getter;
@Getter
public enum WorkflowNodeTypeEnum {
JOB_TASK(1, "JOB任务"),
CONDITION(2, "条件节点"),
DECISION(2, "决策节点"),
CALLBACK(3, "回调节点"),
;

View File

@ -0,0 +1,20 @@
package com.aizuda.easy.retry.common.core.expression;
/**
* 参数表达式解析引擎
*
* @author www.byteblogs.com
* @date 2023-09-10 12:30:23
* @since 2.3.0
*/
public interface ExpressionEngine{
/**
* 执行表达式
* @param expression 表达式
*
* @param t 参数信息
* @return 执行结果
*/
Object eval(String expression, Object ...t);
}

View File

@ -0,0 +1,31 @@
package com.aizuda.easy.retry.common.core.expression;
import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
/**
* @author xiaowoniu
* @date 2023-12-30 17:56:58
* @since 2.6.0
*/
public class ExpressionFactory {
/**
* 返回一个代理对象
*
* @param invocationHandler 表达式执行的代理对象
* @return 返回一个代理对象
*/
public static ExpressionEngine getExpressionEngine(InvocationHandler invocationHandler) {
try {
return (ExpressionEngine) Proxy.newProxyInstance(ExpressionEngine.class.getClassLoader(),
new Class[]{ExpressionEngine.class}, invocationHandler);
} catch (Exception e) {
throw new EasyRetryCommonException("class not found exception to: [{}]", e);
}
}
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.common.core.expression.strategy;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import org.springframework.core.DefaultParameterNameDiscoverer;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* @author www.byteblogs.com
* @date 2023-09-10 12:31:17
* @since 2.3.0
*/
public abstract class AbstractExpressionEngine implements ExpressionEngine {
@Override
public Object eval(String expression, Object... t) {
return doEval(expression, (Map<String, Object>) t[0]);
}
protected abstract Object doEval(String expression, Map<String, Object> context);
}

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.client.core.expression;
package com.aizuda.easy.retry.common.core.expression.strategy;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.AviatorEvaluatorInstance;
@ -14,7 +14,7 @@ import java.util.Map;
* @date 2023-09-10 17:34:07
* @since 2.3.0
*/
public class AviatorExpressionEngine extends AbstractExpressionEngine{
public class AviatorExpressionEngine extends AbstractExpressionEngine {
private static final AviatorEvaluatorInstance ENGINE = AviatorEvaluator.getInstance();
@ -24,7 +24,7 @@ public class AviatorExpressionEngine extends AbstractExpressionEngine{
try {
return ENGINE.execute(expression, context);
} catch (Exception e) {
throw new EasyRetryClientException("Aviator表达式解析异常. expression:[{}] context:[{}]",
throw new EasyRetryCommonException("Aviator表达式解析异常. expression:[{}] context:[{}]",
expression, JsonUtil.toJsonString(context), e);
}
}

View File

@ -1,7 +1,6 @@
package com.aizuda.easy.retry.client.core.expression;
package com.aizuda.easy.retry.common.core.expression.strategy;
import cn.hutool.extra.expression.ExpressionException;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.ql.util.express.DefaultContext;
import com.ql.util.express.ExpressRunner;
@ -15,7 +14,7 @@ import java.util.Map;
* @date 2023-09-10 17:40:34
* @since 2.3.0
*/
public class QLExpressEngine extends AbstractExpressionEngine {
public class QLExpressEngine extends AbstractExpressionEngine {
private static final ExpressRunner ENGINE = new ExpressRunner();
@ -27,7 +26,7 @@ public class QLExpressEngine extends AbstractExpressionEngine {
try {
return ENGINE.execute(expression, defaultContext, null, true, false);
} catch (Exception e) {
throw new EasyRetryClientException("QL表达式解析异常. expression:[{}] context:[{}]",
throw new EasyRetryCommonException("QL表达式解析异常. expression:[{}] context:[{}]",
expression, JsonUtil.toJsonString(context), e);
}

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.client.core.expression;
package com.aizuda.easy.retry.common.core.expression.strategy;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
@ -28,7 +28,7 @@ public class SpELExpressionEngine extends AbstractExpressionEngine {
context.forEach(evaluationContext::setVariable);
return ENGINE.parseExpression(expression).getValue(evaluationContext, String.class);
} catch (Exception e) {
throw new EasyRetryClientException("SpEL表达式解析异常. expression:[{}] context:[{}]",
throw new EasyRetryCommonException("SpEL表达式解析异常. expression:[{}] context:[{}]",
expression, JsonUtil.toJsonString(context), e);
}

View File

@ -75,6 +75,15 @@ public class JsonUtil {
});
}
/**
* 将JSON字符串转Map 对象
* @param jsonString
* @return
*/
public static <T> T parseObject(String jsonString, TypeReference<T> reference) {
return (T) JsonMapper.toJavaObject(jsonString, reference);
}
/**
* 将JSON字符串转JSON 对象
* @param jsonString
@ -205,7 +214,7 @@ public class JsonUtil {
* @param typeReference
* @return
*/
private static Object toJavaObject(String jsonString, TypeReference typeReference) {
public static Object toJavaObject(String jsonString, TypeReference typeReference) {
try {
return objectMapper.readValue(jsonString, typeReference);
} catch (Exception e) {

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
@ -90,6 +91,12 @@ public class Workflow implements Serializable {
*/
private String description;
/**
* 版本号
*/
@TableField(value = "version", update= "%s+1")
private Integer version;
/**
* 创建时间
*/

View File

@ -55,14 +55,14 @@ public class WorkflowNode implements Serializable {
private Long workflowId;
/**
* 1任务节点 2条件节点
* 1任务节点 2条件节点 3回调节点
*/
private Integer nodeType;
/**
* 1SpEl2Aviator 3QL
* 节点信息
*/
private Integer expressionType;
private String nodeInfo;
/**
* 失败策略 1跳过 2阻塞
@ -80,9 +80,9 @@ public class WorkflowNode implements Serializable {
private Integer workflowNodeStatus;
/**
* 节点表达式
* 版本号
*/
private String nodeExpression;
private Integer version;
/**
* 创建时间

View File

@ -0,0 +1,29 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
/**
* 回调节点配置
*
* @author xiaowoniu
* @date 2023-12-30 11:18:14
* @since 2.6.0
*/
@Data
public class CallbackConfig {
/**
* webhook
*/
private Integer webhook;
/**
* 请求类型
*/
private String contentType;
/**
* 秘钥
*/
private String secret;
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
/**
* 决策节点配置
*
* @author xiaowoniu
* @date 2023-12-30 11:17:30
* @since 2.6.0
*/
@Data
public class DecisionConfig {
/**
* 表达式类型 1SpEl2Aviator 3QL
*/
private Integer expressionType;
/**
* 条件节点表达式
*/
private String nodeExpression;
}

View File

@ -0,0 +1,21 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
import javax.validation.constraints.NotNull;
/**
* @author xiaowoniu
* @date 2023-12-30 21:42:59
* @since 2.6.0
*/
@Data
public class JobTaskConfig {
/**
* 任务ID
*/
@NotNull(message = "任务ID不能为空")
private Long jobId;
}

View File

@ -0,0 +1,35 @@
package com.aizuda.easy.retry.server.common.enums;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.strategy.AviatorExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.strategy.QLExpressEngine;
import com.aizuda.easy.retry.common.core.expression.strategy.SpELExpressionEngine;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author xiaowoniu
* @date 2023-12-30 10:50:05
* @since 2.6.0
*/
@Getter
@AllArgsConstructor
public enum ExpressionTypeEnum {
SPEL(1, new SpELExpressionEngine()),
AVIATOR(2, new AviatorExpressionEngine()),
QL(3, new QLExpressEngine());
private final Integer type;
private final ExpressionEngine expressionEngine;
public static ExpressionEngine valueOf(Integer type) {
for (ExpressionTypeEnum expressionTypeEnum : ExpressionTypeEnum.values()) {
if (expressionTypeEnum.getType().equals(type)) {
return expressionTypeEnum.getExpressionEngine();
}
}
return null;
}
}

View File

@ -12,7 +12,7 @@ import lombok.Getter;
*/
@AllArgsConstructor
@Getter
public enum IdGeneratorMode {
public enum IdGeneratorModeEnum {
SEGMENT(1,"号段模式"),
SNOWFLAKE(2, "雪花算法模式");
@ -21,8 +21,8 @@ public enum IdGeneratorMode {
private final String desc;
public static IdGeneratorMode modeOf(int mode) {
for (IdGeneratorMode value : IdGeneratorMode.values()) {
public static IdGeneratorModeEnum modeOf(int mode) {
for (IdGeneratorModeEnum value : IdGeneratorModeEnum.values()) {
if (value.getMode() == mode) {
return value;
}

View File

@ -13,7 +13,7 @@ import lombok.Getter;
*/
@AllArgsConstructor
@Getter
public enum TaskGeneratorScene {
public enum TaskGeneratorSceneEnum {
CLIENT_REPORT(1,"客户端匹配上报"),
MANA_BATCH(2, "控制台手动批量新增"),
@ -24,8 +24,8 @@ public enum TaskGeneratorScene {
private final String desc;
public static TaskGeneratorScene modeOf(int scene) {
for (TaskGeneratorScene value : TaskGeneratorScene.values()) {
public static TaskGeneratorSceneEnum modeOf(int scene) {
for (TaskGeneratorSceneEnum value : TaskGeneratorSceneEnum.values()) {
if (value.getScene() == scene) {
return value;
}

View File

@ -1,29 +0,0 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author xiaowoniu
* @date 2023-12-15 22:42:17
* @since 2.6.0
*/
@Getter
@AllArgsConstructor
public enum WorkflowNodeType {
TASK(1, "任务节点"),
Condition(1, "条件节点"),
callback(1, "回调节点");
private final Integer nodeType;
private final String desc;
public static WorkflowNodeType get(Integer nodeType) {
for (WorkflowNodeType workflowNodeType : WorkflowNodeType.values()) {
if (workflowNodeType.nodeType.equals(nodeType)) {
return workflowNodeType;
}
}
return null;
}
}

View File

@ -0,0 +1,29 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author xiaowoniu
* @date 2023-12-15 22:42:17
* @since 2.6.0
*/
@Getter
@AllArgsConstructor
public enum WorkflowNodeTypeEnum {
TASK(1, "任务节点"),
CONDITION(2, "条件节点"),
CALLBACK(3, "回调节点");
private final Integer nodeType;
private final String desc;
public static WorkflowNodeTypeEnum get(Integer nodeType) {
for (WorkflowNodeTypeEnum workflowNodeTypeEnum : WorkflowNodeTypeEnum.values()) {
if (workflowNodeTypeEnum.nodeType.equals(nodeType)) {
return workflowNodeTypeEnum;
}
}
return null;
}
}

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.common.generator.id;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorMode;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorModeEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.SequenceAllocMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.SequenceAlloc;
@ -305,7 +305,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
@Override
public boolean supports(int mode) {
return IdGeneratorMode.SEGMENT.getMode() == mode;
return IdGeneratorModeEnum.SEGMENT.getMode() == mode;
}
@Override

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.common.generator.id;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.IdUtil;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorMode;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorModeEnum;
import org.springframework.stereotype.Component;
/**
@ -20,7 +20,7 @@ public class SnowflakeIdGenerator implements IdGenerator {
@Override
public boolean supports(int mode) {
return IdGeneratorMode.SNOWFLAKE.getMode() == mode;
return IdGeneratorModeEnum.SNOWFLAKE.getMode() == mode;
}
@Override

View File

@ -1,11 +1,11 @@
package com.aizuda.easy.retry.server.common.util;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -17,14 +17,19 @@ import java.util.Map;
public class GraphUtils {
// 从JSON反序列化为Guava图
public static MutableGraph<Long> deserializeJsonToGraph(String jsonGraph) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
/**
* 从JSON反序列化为Guava图
*
* @param jsonGraph 图的json串
* @return {@link MutableGraph} 图对象
*/
public static MutableGraph<Long> deserializeJsonToGraph(String jsonGraph) {
if (StrUtil.isBlank(jsonGraph)) {
return null;
}
// 将JSON字符串转换为Map<Long, Iterable<Long>>
Map<Long, Iterable<Long>> adjacencyList = objectMapper.readValue(
jsonGraph, new TypeReference<Map<Long, Iterable<Long>>>() {
});
Map<Long, Iterable<Long>> adjacencyList = JsonUtil.parseObject(jsonGraph, new TypeReference<Map<Long, Iterable<Long>>>() {
});
// 创建Guava图并添加节点和边
MutableGraph<Long> graph = GraphBuilder.directed().build();

View File

@ -61,6 +61,22 @@
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-server-retry-task</artifactId>
</dependency>
<dependency>
<groupId>com.googlecode.aviator</groupId>
<artifactId>aviator</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>QLExpress</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,49 @@
package com.aizuda.easy.retry.server.job.task.support.cache;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.graph.MutableGraph;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* @author xiaowoniu
* @date 2023-12-30 13:18:07
* @since 2.6.0
*/
public class MutableGraphCache {
private static final Cache<Long/*工作流批次*/, MutableGraph<Long>> cache;
static {
cache = CacheBuilder.newBuilder()
.concurrencyLevel(8) // 并发级别
.expireAfterWrite(5, TimeUnit.MINUTES) // 写入后的过期时间
.build();
}
/**
* 获取指定workflowBatchId的可变图对象若图对象不存在则使用给定的jsonGraph反序列化生成新的图对象并返回
*
* @param workflowBatchId 工作流批次ID
* @param jsonGraph JSON格式的图对象字符串
* @return {@link MutableGraph} 图对象
*/
public static MutableGraph<Long> getOrDefault(Long workflowBatchId, String jsonGraph) {
return Optional.ofNullable(cache.getIfPresent(workflowBatchId)).orElse(GraphUtils.deserializeJsonToGraph(jsonGraph));
}
/**
* 根据给定的workflowBatchId获取图对象
*
* @param workflowBatchId 工作流批次ID
* @return {@link MutableGraph} 返回对应的图对象如果不存在则返回空图
*/
public static MutableGraph<Long> get(Long workflowBatchId) {
return getOrDefault(workflowBatchId, "");
}
}

View File

@ -21,6 +21,7 @@ import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
@ -90,7 +91,7 @@ public class WorkflowExecutorActor extends AbstractActor {
// 获取DAG图
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(workflowTaskBatch.getId(), flowInfo);
Set<Long> successors = graph.successors(taskExecute.getParentId());
if (CollectionUtils.isEmpty(successors)) {

View File

@ -2,20 +2,23 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.ExpressionFactory;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
@ -50,7 +53,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CONDITION;
return WorkflowNodeTypeEnum.DECISION;
}
@Override
@ -67,12 +70,11 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
} else {
try {
Map<String, Object> contextMap = new HashMap<>();
// 根据配置的表达式执行
if (StrUtil.isNotBlank(context.getResult())) {
contextMap = JsonUtil.parseHashMap(context.getResult());
}
result = Optional.ofNullable(doEval(context.getNodeExpression(), contextMap)).orElse(Boolean.FALSE);
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(context.getExpressionType());
Assert.notNull(realExpressionEngine, () -> new EasyRetryServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
result = (Boolean) Optional.ofNullable(expressionEngine.eval(context.getNodeExpression(), context.getResult())).orElse(Boolean.FALSE);
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result);
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", context.getNodeExpression(), context.getResult(), e);
@ -104,14 +106,14 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason);
generatorContext.setJobId(SystemConstants.CONDITION_JOB_ID);
generatorContext.setJobId(SystemConstants.DECISION_JOB_ID);
JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
// 生成执行任务实例
JobTask jobTask = new JobTask();
jobTask.setGroupName(context.getGroupName());
jobTask.setNamespaceId(context.getNamespaceId());
jobTask.setJobId(SystemConstants.CONDITION_JOB_ID);
jobTask.setJobId(SystemConstants.DECISION_JOB_ID);
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
@ -125,7 +127,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
// TODO 等实时日志处理完毕后再处理
jobLogDTO.setMessage(message);
jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.CONDITION_JOB_ID);
jobLogDTO.setJobId(SystemConstants.DECISION_JOB_ID);
jobLogDTO.setGroupName(context.getGroupName());
jobLogDTO.setNamespaceId(context.getNamespaceId());
jobLogDTO.setTaskBatchId(jobTaskBatch.getId());

View File

@ -0,0 +1,35 @@
package com.aizuda.easy.retry.server.job.task.support.expression;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* @author xiaowoniu
* @date 2023-12-30 17:22:51
* @since 2.6.0
*/
public class ExpressionInvocationHandler implements InvocationHandler {
private final Object expressionEngine;
public ExpressionInvocationHandler(Object expressionEngine) {
this.expressionEngine = expressionEngine;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object[] expressionParams = (Object[]) args[1];
String params = (String) expressionParams[0];
Map<String, Object> contextMap = new HashMap<>();
if (StrUtil.isNotBlank(params)) {
contextMap = JsonUtil.parseHashMap(params);
}
return method.invoke(expressionEngine, contextMap);
}
}

View File

@ -5,7 +5,6 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
@ -16,6 +15,7 @@ import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
@ -23,7 +23,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatch
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
@ -42,7 +41,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
/**
@ -69,7 +67,7 @@ public class WorkflowBatchHandler {
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(workflowTaskBatchId, flowInfo);
// 说明没有后继节点了, 此时需要判断整个DAG是否全部执行完成
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
@ -104,7 +102,7 @@ public class WorkflowBatchHandler {
Set<Long> predecessors = graph.predecessors(jobTaskBatch.getWorkflowNodeId());
WorkflowNode workflowNode = workflowNodeMap.get(jobTaskBatch.getWorkflowNodeId());
// 条件节点是或的关系一个成功就代表成功
if (WorkflowNodeTypeEnum.CONDITION.getType() == workflowNode.getNodeType()) {
if (WorkflowNodeTypeEnum.DECISION.getType() == workflowNode.getNodeType()) {
for (final Long predecessor : predecessors) {
List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList());
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
@ -225,7 +223,7 @@ public class WorkflowBatchHandler {
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
MutableGraph<Long> graph =MutableGraphCache.getOrDefault(workflowTaskBatchId, flowInfo);
Set<Long> successors = graph.successors(SystemConstants.ROOT);
if (CollectionUtils.isEmpty(successors)) {
return;

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.retry.task.generator.task;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorSceneEnum;
import org.springframework.stereotype.Component;
/**
@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
public class ClientReportRetryGenerator extends AbstractGenerator {
@Override
public boolean supports(int scene) {
return TaskGeneratorScene.CLIENT_REPORT.getScene() == scene;
return TaskGeneratorSceneEnum.CLIENT_REPORT.getScene() == scene;
}
@Override

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.retry.task.generator.task;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorSceneEnum;
import org.springframework.stereotype.Component;
import java.util.Optional;
@ -17,7 +17,7 @@ import java.util.Optional;
public class ManaBatchRetryGenerator extends AbstractGenerator {
@Override
public boolean supports(int scene) {
return TaskGeneratorScene.MANA_BATCH.getScene() == scene;
return TaskGeneratorSceneEnum.MANA_BATCH.getScene() == scene;
}
@Override

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.retry.task.generator.task;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorSceneEnum;
import org.springframework.stereotype.Component;
import java.util.Optional;
@ -17,7 +17,7 @@ import java.util.Optional;
public class ManaSingleRetryGenerator extends AbstractGenerator {
@Override
public boolean supports(int scene) {
return TaskGeneratorScene.MANA_SINGLE.getScene() == scene;
return TaskGeneratorSceneEnum.MANA_SINGLE.getScene() == scene;
}
@Override

View File

@ -9,7 +9,7 @@ import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.PostHttpRequestHandler;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
@ -74,7 +74,7 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
syncConfig(headers);
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorScene.CLIENT_REPORT.getScene()))
.filter(t -> t.supports(TaskGeneratorSceneEnum.CLIENT_REPORT.getScene()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("没有匹配的任务生成器"));
Assert.notEmpty(args, () -> new EasyRetryServerException("上报的数据不能为空. reqId:[{}]", retryRequest.getReqId()));

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.server.web.model.request;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorMode;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorModeEnum;
import lombok.Data;
import javax.validation.constraints.NotBlank;
@ -36,7 +36,7 @@ public class GroupConfigRequestVO {
/**
* 唯一id生成模式
* {@link IdGeneratorMode}
* {@link IdGeneratorModeEnum}
*/
private Integer idGeneratorMode;

View File

@ -1,5 +1,8 @@
package com.aizuda.easy.retry.server.web.model.request;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import lombok.Data;
import javax.validation.Valid;
@ -67,6 +70,7 @@ public class WorkflowRequestVO {
/**
* 节点信息
*/
@NotEmpty(message = "节点信息不能为空")
private List<NodeInfo> conditionNodes;
/**
@ -82,48 +86,45 @@ public class WorkflowRequestVO {
/**
* 节点名称
*/
@NotBlank(message = "节点名称不能为空")
private String nodeName;
/**
* 优先级
*/
private Integer priorityLevel;
/**
* 任务ID
*/
@NotNull(message = "任务ID不能为空")
private Long jobId;
/**
* 表达式类型 1SpEl2Aviator 3QL
*/
@NotNull(message = "表达式类型不能为空")
private Integer expressionType;
/**
* 条件节点表达式
*/
private String nodeExpression;
/**
* 1跳过 2阻塞
*/
@NotNull(message = "失败策略不能为空")
private Integer failStrategy;
/**
* 工作流状态 0关闭1开启
*/
@NotNull(message = "工作流状态不能为空")
private Integer workflowNodeStatus;
/**
* 优先级
*/
@NotNull(message = "优先级不能为空")
private Integer priorityLevel;
/**
* 子节点
*/
private NodeConfig childNode;
/**
* 1跳过 2阻塞
*/
private Integer failStrategy;
/**
* 任务节点配置
*/
private JobTaskConfig jobTask;
/**
* 决策节点配置
*/
private DecisionConfig decision;
/**
* 回调配置
*/
private CallbackConfig callback;
}
}

View File

@ -4,9 +4,6 @@ import lombok.Data;
import java.time.LocalDateTime;
/**
* @author: www.byteblogs.com
* @date : 2023-10-12 10:18

View File

@ -1,5 +1,8 @@
package com.aizuda.easy.retry.server.web.model.response;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import lombok.Data;
import java.time.LocalDateTime;
@ -96,50 +99,35 @@ public class WorkflowDetailResponseVO {
*/
private Integer priorityLevel;
/**
* 任务ID
*/
private Long jobId;
/**
* 表达式类型 1SpEl2Aviator 3QL
*/
private Integer expressionType;
/**
* 条件节点表达式
*/
private String nodeExpression;
/**
* 1跳过 2阻塞
*/
private Integer failStrategy;
/**
* 工作流状态 0关闭1开启
*/
private Integer workflowNodeStatus;
/**
* 任务批次状态
* 失败策略 1跳过 2阻塞
*/
private Integer taskBatchStatus;
private Integer failStrategy;
/**
* 定时任务批次id
* 判定配置
*/
private Long jobTaskBatchId;
private DecisionConfig decision;
/**
* 任务执行时间
* 回调配置
*/
private LocalDateTime executionAt;
private CallbackConfig callback;
/**
* 操作原因
* 任务配置
*/
private Integer operationReason;
private JobTaskConfig jobTask;
/**
* 定时任务批次信息
*/
private JobBatchResponseVO jobBatch;
/**
* 子节点
@ -148,5 +136,4 @@ public class WorkflowDetailResponseVO {
}
}

View File

@ -31,6 +31,11 @@ public interface JobBatchResponseVOConverter {
})
JobBatchResponseVO toJobBatchResponseVO(JobBatchResponseDO jobBatchResponseDO);
@Mappings({
@Mapping(target = "executionAt", expression = "java(JobBatchResponseVOConverter.toLocalDateTime(jobTaskBatch.getExecutionAt()))")
})
JobBatchResponseVO toJobBatchResponseVO(JobTaskBatch jobTaskBatch);
@Mappings({
@Mapping(source = "jobBatch.groupName", target = "groupName"),
@Mapping(source = "jobBatch.id", target = "id"),

View File

@ -1,5 +1,10 @@
package com.aizuda.easy.retry.server.web.service.convert;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
import com.aizuda.easy.retry.server.web.model.response.WorkflowBatchResponseVO;
@ -36,6 +41,13 @@ public interface WorkflowConverter {
List<WorkflowDetailResponseVO.NodeInfo> toNodeInfo(List<WorkflowNode> workflowNodes);
@Mappings({
@Mapping(target = "decision", expression = "java(WorkflowConverter.parseDecisionConfig(workflowNode))"),
@Mapping(target = "callback", expression = "java(WorkflowConverter.parseCallbackConfig(workflowNode))"),
@Mapping(target = "jobTask", expression = "java(WorkflowConverter.parseJobTaskConfig(workflowNode))")
})
WorkflowDetailResponseVO.NodeInfo toNodeInfo(WorkflowNode workflowNode);
List<WorkflowResponseVO> toWorkflowResponseVO(List<Workflow> workflowList);
List<WorkflowBatchResponseVO> toWorkflowBatchResponseVO(List<WorkflowBatchResponseDO> workflowBatchResponseList);
@ -55,4 +67,30 @@ public interface WorkflowConverter {
return DateUtils.toLocalDateTime(nextTriggerAt);
}
static DecisionConfig parseDecisionConfig(WorkflowNode workflowNode) {
if (WorkflowNodeTypeEnum.DECISION.getType() == workflowNode.getNodeType()) {
return JsonUtil.parseObject(workflowNode.getNodeInfo(), DecisionConfig.class);
}
return null;
}
static CallbackConfig parseCallbackConfig(WorkflowNode workflowNode) {
if (WorkflowNodeTypeEnum.CALLBACK.getType() == workflowNode.getNodeType()) {
return JsonUtil.parseObject(workflowNode.getNodeInfo(), CallbackConfig.class);
}
return null;
}
static JobTaskConfig parseJobTaskConfig(WorkflowNode workflowNode) {
if (WorkflowNodeTypeEnum.JOB_TASK.getType() == workflowNode.getNodeType()) {
return JsonUtil.parseObject(workflowNode.getNodeInfo(), JobTaskConfig.class);
}
return null;
}
}

View File

@ -0,0 +1,187 @@
package com.aizuda.easy.retry.server.web.service.handler;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
import com.aizuda.easy.retry.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author xiaowoniu
* @date 2023-12-30 23:26:43
* @since 2.6.0
*/
@Component("webWorkflowHandler")
@Slf4j
@RequiredArgsConstructor
public class WorkflowHandler {
private final WorkflowNodeMapper workflowNodeMapper;
/**
* 根据给定的图父节点ID节点配置Map和工作流节点Map构建节点配置
*
* @param graph
* @param parentId 父节点ID
* @param nodeConfigMap 节点配置Map
* @param workflowNodeMap 工作流节点Map
* @return 构建的节点配置
*/
public WorkflowDetailResponseVO.NodeConfig buildNodeConfig(MutableGraph<Long> graph,
Long parentId,
Map<Long, WorkflowDetailResponseVO.NodeConfig> nodeConfigMap,
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap) {
Set<Long> successors = graph.successors(parentId);
if (CollectionUtils.isEmpty(successors)) {
return null;
}
WorkflowDetailResponseVO.NodeInfo previousNodeInfo = workflowNodeMap.get(parentId);
WorkflowDetailResponseVO.NodeConfig currentConfig = new WorkflowDetailResponseVO.NodeConfig();
currentConfig.setConditionNodes(Lists.newArrayList());
// 是否挂载子节点
boolean mount = false;
for (Long successor : successors) {
Set<Long> predecessors = graph.predecessors(successor);
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(successor);
currentConfig.setNodeType(nodeInfo.getNodeType());
currentConfig.getConditionNodes().add(nodeInfo);
nodeConfigMap.put(successor, currentConfig);
if (predecessors.size() >= 2) {
// 查找predecessors的公共祖先节点
Map<Long, Set<Long>> sets = new HashMap<>();
for (final Long predecessor : predecessors) {
Set<Long> set = Sets.newHashSet();
sets.put(predecessor, set);
findCommonAncestor(predecessor, set, graph);
}
Set<Long> intersection = sets.values().stream().findFirst().get();
for (final Set<Long> value : sets.values()) {
intersection = Sets.intersection(value, intersection);
}
Long commonAncestor = new ArrayList<>(intersection).get(intersection.size() - 1);
WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(graph.successors(commonAncestor).stream().findFirst().get());
parentNodeConfig.setChildNode(currentConfig);
mount = false;
} else {
mount = true;
}
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
}
if (!parentId.equals(SystemConstants.ROOT) && mount) {
previousNodeInfo.setChildNode(currentConfig);
}
currentConfig.getConditionNodes().sort(Comparator.comparing(WorkflowDetailResponseVO.NodeInfo::getPriorityLevel));
return currentConfig;
}
private void findCommonAncestor(Long predecessor, Set<Long> set, MutableGraph<Long> graph) {
Set<Long> predecessors = graph.predecessors(predecessor);
if (CollectionUtils.isEmpty(predecessors)) {
return;
}
set.addAll(predecessors);
findCommonAncestor(new ArrayList<>(predecessors).get(0), set, graph);
}
/**
* 根据给定的父节点ID队列工作流组名工作流ID节点配置图构建图
*
* @param parentIds 父节点ID列表
* @param deque 队列
* @param groupName 工作流组名
* @param workflowId 工作流ID
* @param nodeConfig 节点配置
* @param graph
*/
public void buildGraph(List<Long> parentIds, LinkedBlockingDeque<Long> deque,
String groupName, Long workflowId,
WorkflowRequestVO.NodeConfig nodeConfig, MutableGraph<Long> graph) {
if (Objects.isNull(nodeConfig)) {
return;
}
// 获取节点信息
List<WorkflowRequestVO.NodeInfo> conditionNodes = nodeConfig.getConditionNodes();
if (!CollectionUtils.isEmpty(conditionNodes)) {
for (final WorkflowRequestVO.NodeInfo nodeInfo : conditionNodes) {
WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo);
workflowNode.setWorkflowId(workflowId);
workflowNode.setGroupName(groupName);
workflowNode.setNodeType(nodeConfig.getNodeType());
if (WorkflowNodeTypeEnum.DECISION.getType() == nodeConfig.getNodeType()) {
workflowNode.setJobId(SystemConstants.DECISION_JOB_ID);
workflowNode.setNodeInfo(JsonUtil.toJsonString(nodeInfo.getDecision()));
}
if (WorkflowNodeTypeEnum.CALLBACK.getType() == nodeConfig.getNodeType()) {
workflowNode.setJobId(SystemConstants.CALLBACK_JOB_ID);
workflowNode.setNodeInfo(JsonUtil.toJsonString(nodeInfo.getCallback()));
}
if (WorkflowNodeTypeEnum.JOB_TASK.getType() == nodeConfig.getNodeType()) {
JobTaskConfig jobTask = nodeInfo.getJobTask();
workflowNode.setJobId(jobTask.getJobId());
}
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode),
() -> new EasyRetryServerException("新增工作流节点失败"));
// 添加节点
graph.addNode(workflowNode.getId());
for (final Long parentId : parentIds) {
// 添加边
graph.putEdge(parentId, workflowNode.getId());
}
log.warn("workflowNodeId:[{}] parentIds:[{}]",
workflowNode.getId(), JsonUtil.toJsonString(parentIds));
WorkflowRequestVO.NodeConfig childNode = nodeInfo.getChildNode();
if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) {
buildGraph(Lists.newArrayList(workflowNode.getId()), deque, groupName, workflowId, childNode,
graph);
} else {
// 叶子节点记录一下
deque.add(workflowNode.getId());
}
}
}
WorkflowRequestVO.NodeConfig childNode = nodeConfig.getChildNode();
if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) {
// 应该是conditionNodes里面叶子节点的选择
List<Long> list = Lists.newArrayList();
deque.drainTo(list);
buildGraph(list, deque, groupName, workflowId, childNode, graph);
}
}
}

View File

@ -5,7 +5,7 @@ import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorMode;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorModeEnum;
import com.aizuda.easy.retry.server.common.enums.SystemModeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.handler.ConfigVersionSyncHandler;
@ -28,7 +28,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import org.jetbrains.annotations.NotNull;
import org.slf4j.helpers.MessageFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
@ -200,7 +199,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
records);
for (GroupConfigResponseVO groupConfigResponseVO : responseVOList) {
Optional.ofNullable(IdGeneratorMode.modeOf(groupConfigResponseVO.getIdGeneratorMode()))
Optional.ofNullable(IdGeneratorModeEnum.modeOf(groupConfigResponseVO.getIdGeneratorMode()))
.ifPresent(idGeneratorMode -> {
groupConfigResponseVO.setIdGeneratorModeName(idGeneratorMode.getDesc());
});
@ -281,7 +280,7 @@ public class GroupConfigServiceImpl implements GroupConfigService {
GroupConfigResponseVO groupConfigResponseVO = GroupConfigResponseVOConverter.INSTANCE.toGroupConfigResponseVO(
groupConfig);
Optional.ofNullable(IdGeneratorMode.modeOf(groupConfig.getIdGeneratorMode())).ifPresent(idGeneratorMode -> {
Optional.ofNullable(IdGeneratorModeEnum.modeOf(groupConfig.getIdGeneratorMode())).ifPresent(idGeneratorMode -> {
groupConfigResponseVO.setIdGeneratorModeName(idGeneratorMode.getDesc());
});

View File

@ -10,7 +10,7 @@ import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
@ -45,7 +45,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -204,7 +203,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
}
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorScene.MANA_SINGLE.getScene()))
.filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_SINGLE.getScene()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("没有匹配的任务生成器"));
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
@ -321,7 +320,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
Assert.isTrue(waitInsertList.size() <= 500, () -> new EasyRetryServerException("最多只能处理500条数据"));
TaskGenerator taskGenerator = taskGenerators.stream()
.filter(t -> t.supports(TaskGeneratorScene.MANA_BATCH.getScene()))
.filter(t -> t.supports(TaskGeneratorSceneEnum.MANA_BATCH.getScene()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("没有匹配的任务生成器"));
boolean allMatch = waitInsertList.stream()

View File

@ -4,8 +4,7 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowBatchQueryVO;
@ -15,9 +14,8 @@ import com.aizuda.easy.retry.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.easy.retry.server.web.service.WorkflowBatchService;
import com.aizuda.easy.retry.server.web.service.convert.JobBatchResponseVOConverter;
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler;
import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.WorkflowBatchQueryDO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
@ -30,18 +28,17 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@ -57,6 +54,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private final WorkflowMapper workflowMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowHandler workflowHandler;
@Override
public PageResult<List<WorkflowBatchResponseVO>> listPage(WorkflowBatchQueryVO queryVO) {
@ -120,70 +118,24 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
.peek(nodeInfo -> {
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(nodeInfo.getId());
if (Objects.nonNull(jobTaskBatch)) {
nodeInfo.setJobTaskBatchId(jobTaskBatch.getId());
nodeInfo.setExecutionAt(DateUtils.toLocalDateTime(jobTaskBatch.getExecutionAt()));
nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus());
nodeInfo.setOperationReason(jobTaskBatch.getOperationReason());
JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch);
nodeInfo.setJobBatch(jobBatchResponseVO);
}
})
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
String flowInfo = workflowTaskBatch.getFlowInfo();
try {
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(id, flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap);
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
throw new EasyRetryServerException("查询工作流批次详情失败");
}
return responseVO;
}
private WorkflowDetailResponseVO.NodeConfig buildNodeConfig(MutableGraph<Long> graph,
Long parentId,
Map<Long, WorkflowDetailResponseVO.NodeConfig> nodeConfigMap,
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap) {
Set<Long> successors = graph.successors(parentId);
if (CollectionUtils.isEmpty(successors)) {
return null;
}
WorkflowDetailResponseVO.NodeInfo previousNodeInfo = workflowNodeMap.get(parentId);
WorkflowDetailResponseVO.NodeConfig currentConfig = new WorkflowDetailResponseVO.NodeConfig();
currentConfig.setConditionNodes(Lists.newArrayList());
// 是否挂载子节点
boolean mount = false;
for (Long successor : successors) {
Set<Long> predecessors = graph.predecessors(successor);
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(successor);
currentConfig.setNodeType(nodeInfo.getNodeType());
currentConfig.getConditionNodes().add(nodeInfo);
nodeConfigMap.put(successor, currentConfig);
if (predecessors.size() >= 2) {
WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(new ArrayList<>(predecessors).get(0));
parentNodeConfig.setChildNode(currentConfig);
mount = false;
} else {
mount = true;
}
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
}
if (!parentId.equals(SystemConstants.ROOT) && mount) {
previousNodeInfo.setChildNode(currentConfig);
}
return currentConfig;
}
}

View File

@ -9,12 +9,12 @@ import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.JobRequestVO;
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
@ -24,6 +24,7 @@ import com.aizuda.easy.retry.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.easy.retry.server.web.model.response.WorkflowResponseVO;
import com.aizuda.easy.retry.server.web.service.WorkflowService;
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler;
import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
@ -31,15 +32,12 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@ -62,6 +60,7 @@ public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowMapper workflowMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final SystemProperties systemProperties;
private final WorkflowHandler workflowHandler;
@Override
@Transactional
@ -73,6 +72,7 @@ public class WorkflowServiceImpl implements WorkflowService {
// 组装工作流信息
Workflow workflow = WorkflowConverter.INSTANCE.toWorkflow(workflowRequestVO);
workflow.setVersion(1);
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(StrUtil.EMPTY);
workflow.setBucketIndex(HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
@ -84,14 +84,13 @@ public class WorkflowServiceImpl implements WorkflowService {
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
workflowMapper.updateById(workflow);
Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new EasyRetryServerException("保存工作流图失败"));
return true;
}
@ -104,7 +103,7 @@ public class WorkflowServiceImpl implements WorkflowService {
}
@Override
public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException {
public WorkflowDetailResponseVO getWorkflowDetail(Long id) {
Workflow workflow = workflowMapper.selectById(id);
if (Objects.isNull(workflow)) {
@ -114,6 +113,7 @@ public class WorkflowServiceImpl implements WorkflowService {
WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.toWorkflowDetailResponseVO(workflow);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getDeleted, 0)
.eq(WorkflowNode::getVersion, workflow.getVersion())
.eq(WorkflowNode::getWorkflowId, id)
.orderByAsc(WorkflowNode::getPriorityLevel));
@ -126,7 +126,7 @@ public class WorkflowServiceImpl implements WorkflowService {
try {
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(),
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(),
workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
@ -170,7 +170,7 @@ public class WorkflowServiceImpl implements WorkflowService {
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
@ -178,6 +178,7 @@ public class WorkflowServiceImpl implements WorkflowService {
// 保存图信息
Workflow workflow = new Workflow();
workflow.setId(workflowRequestVO.getId());
workflow.setVersion(1);
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(workflowMapper.updateById(workflow) > 0, () -> new EasyRetryServerException("更新失败"));
@ -209,123 +210,4 @@ public class WorkflowServiceImpl implements WorkflowService {
return 1 == workflowMapper.updateById(workflow);
}
private WorkflowDetailResponseVO.NodeConfig buildNodeConfig(MutableGraph<Long> graph,
Long parentId,
Map<Long, WorkflowDetailResponseVO.NodeConfig> nodeConfigMap,
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap) {
Set<Long> successors = graph.successors(parentId);
if (CollectionUtils.isEmpty(successors)) {
return null;
}
WorkflowDetailResponseVO.NodeInfo previousNodeInfo = workflowNodeMap.get(parentId);
WorkflowDetailResponseVO.NodeConfig currentConfig = new WorkflowDetailResponseVO.NodeConfig();
currentConfig.setConditionNodes(Lists.newArrayList());
// 是否挂载子节点
boolean mount = false;
for (Long successor : successors) {
Set<Long> predecessors = graph.predecessors(successor);
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(successor);
currentConfig.setNodeType(nodeInfo.getNodeType());
currentConfig.getConditionNodes().add(nodeInfo);
nodeConfigMap.put(successor, currentConfig);
if (predecessors.size() >= 2) {
// 查找predecessors的公共祖先节点
Map<Long, Set<Long>> sets = new HashMap<>();
for (final Long predecessor : predecessors) {
Set<Long> set = Sets.newHashSet();
sets.put(predecessor, set);
findCommonAncestor(predecessor, set, graph);
}
Set<Long> intersection = sets.values().stream().findFirst().get();
for (final Set<Long> value : sets.values()) {
intersection = Sets.intersection(value, intersection);
}
Long commonAncestor = new ArrayList<>(intersection).get(intersection.size() - 1);
WorkflowDetailResponseVO.NodeConfig parentNodeConfig = nodeConfigMap.get(graph.successors(commonAncestor).stream().findFirst().get());
parentNodeConfig.setChildNode(currentConfig);
mount = false;
} else {
mount = true;
}
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
}
if (!parentId.equals(SystemConstants.ROOT) && mount) {
previousNodeInfo.setChildNode(currentConfig);
}
currentConfig.getConditionNodes().sort(Comparator.comparing(WorkflowDetailResponseVO.NodeInfo::getPriorityLevel));
return currentConfig;
}
private void findCommonAncestor(Long predecessor, Set<Long> set, MutableGraph<Long> graph) {
Set<Long> predecessors = graph.predecessors(predecessor);
if (CollectionUtils.isEmpty(predecessors)) {
return;
}
set.addAll(predecessors);
findCommonAncestor(new ArrayList<>(predecessors).get(0), set, graph);
}
public void buildGraph(List<Long> parentIds, LinkedBlockingDeque<Long> deque, String groupName, Long workflowId,
NodeConfig nodeConfig, MutableGraph<Long> graph) {
if (Objects.isNull(nodeConfig)) {
return;
}
// 获取节点信息
List<NodeInfo> conditionNodes = nodeConfig.getConditionNodes();
if (!CollectionUtils.isEmpty(conditionNodes)) {
for (final NodeInfo nodeInfo : conditionNodes) {
WorkflowNode workflowNode = WorkflowConverter.INSTANCE.toWorkflowNode(nodeInfo);
workflowNode.setWorkflowId(workflowId);
workflowNode.setGroupName(groupName);
workflowNode.setNodeType(nodeConfig.getNodeType());
if (WorkflowNodeTypeEnum.CONDITION.getType() == nodeConfig.getNodeType()) {
workflowNode.setJobId(SystemConstants.CONDITION_JOB_ID);
}
Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode),
() -> new EasyRetryServerException("新增工作流节点失败"));
// 添加节点
graph.addNode(workflowNode.getId());
for (final Long parentId : parentIds) {
// 添加边
graph.putEdge(parentId, workflowNode.getId());
}
log.warn("workflowNodeId:[{}] parentIds:[{}]",
workflowNode.getId(), JsonUtil.toJsonString(parentIds));
NodeConfig childNode = nodeInfo.getChildNode();
if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) {
buildGraph(Lists.newArrayList(workflowNode.getId()), deque, groupName, workflowId, childNode,
graph);
} else {
// 叶子节点记录一下
deque.add(workflowNode.getId());
}
}
}
NodeConfig childNode = nodeConfig.getChildNode();
if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) {
// 应该是conditionNodes里面叶子节点的选择
List<Long> list = Lists.newArrayList();
deque.drainTo(list);
buildGraph(list, deque, groupName, workflowId, childNode, graph);
}
}
}