工作流上下文变量更新++
This commit is contained in:
parent
e3d34c3ca6
commit
aa7a560a48
@ -0,0 +1,134 @@
|
|||||||
|
package com.aizuda.snailjob.server.common.util;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.SystemUserMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.SystemUserPermissionMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.SystemVariableMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.SystemUser;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.SystemUserPermission;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.SystemVariable;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class DefaultSystemVariableProvider implements SystemVariableProvider {
|
||||||
|
|
||||||
|
private final SystemVariableMapper systemVariableMapper;
|
||||||
|
private final SystemUserMapper systemUserMapper;
|
||||||
|
private final SystemUserPermissionMapper systemUserPermissionMapper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getUserVariables(Long userId, String namespaceId) {
|
||||||
|
// Web工程使用,需要权限控制
|
||||||
|
SystemUser systemUser = systemUserMapper.selectById(userId);
|
||||||
|
if (Objects.isNull(systemUser)) {
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SystemVariable> variables = getVariablesByUserPermission(systemUser, namespaceId);
|
||||||
|
return variables.stream()
|
||||||
|
.collect(Collectors.toMap(
|
||||||
|
SystemVariable::getVariableName,
|
||||||
|
SystemVariable::getVariableValue,
|
||||||
|
(existing, replacement) -> existing
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Integer> getUserVariableTypes(Long userId, String namespaceId) {
|
||||||
|
// 获取用户变量类型映射
|
||||||
|
SystemUser systemUser = systemUserMapper.selectById(userId);
|
||||||
|
if (Objects.isNull(systemUser)) {
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SystemVariable> variables = getVariablesByUserPermission(systemUser, namespaceId);
|
||||||
|
return variables.stream()
|
||||||
|
.collect(Collectors.toMap(
|
||||||
|
SystemVariable::getVariableName,
|
||||||
|
SystemVariable::getVariableType,
|
||||||
|
(existing, replacement) -> existing
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getGroupVariables(String groupName, String namespaceId) {
|
||||||
|
// 工作流执行时使用,不需要权限控制,直接获取组变量和admin变量
|
||||||
|
List<SystemVariable> variables = getVariablesByGroup(groupName, namespaceId);
|
||||||
|
return variables.stream()
|
||||||
|
.collect(Collectors.toMap(
|
||||||
|
SystemVariable::getVariableName,
|
||||||
|
SystemVariable::getVariableValue,
|
||||||
|
(existing, replacement) -> existing
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Integer> getGroupVariableTypes(String groupName, String namespaceId) {
|
||||||
|
// 获取组变量类型映射
|
||||||
|
List<SystemVariable> variables = getVariablesByGroup(groupName, namespaceId);
|
||||||
|
return variables.stream()
|
||||||
|
.collect(Collectors.toMap(
|
||||||
|
SystemVariable::getVariableName,
|
||||||
|
SystemVariable::getVariableType,
|
||||||
|
(existing, replacement) -> existing
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据用户权限获取变量
|
||||||
|
*/
|
||||||
|
private List<SystemVariable> getVariablesByUserPermission(SystemUser systemUser, String namespaceId) {
|
||||||
|
LambdaQueryWrapper<SystemVariable> wrapper = new LambdaQueryWrapper<SystemVariable>()
|
||||||
|
.eq(SystemVariable::getNamespaceId, namespaceId);
|
||||||
|
|
||||||
|
// 根据用户角色过滤
|
||||||
|
if (systemUser.getRole() != 2) { // 不是admin用户
|
||||||
|
List<SystemUserPermission> userPermissions = systemUserPermissionMapper.selectList(
|
||||||
|
new LambdaQueryWrapper<SystemUserPermission>()
|
||||||
|
.select(SystemUserPermission::getGroupName)
|
||||||
|
.eq(SystemUserPermission::getSystemUserId, systemUser.getId())
|
||||||
|
.eq(SystemUserPermission::getNamespaceId, namespaceId)
|
||||||
|
);
|
||||||
|
|
||||||
|
List<String> userGroupNames = StreamUtils.toList(userPermissions, SystemUserPermission::getGroupName);
|
||||||
|
if (CollUtil.isNotEmpty(userGroupNames)) {
|
||||||
|
wrapper.and(w -> {
|
||||||
|
for (String groupName : userGroupNames) {
|
||||||
|
w.or().like(SystemVariable::getGroupNames, groupName);
|
||||||
|
}
|
||||||
|
w.or().exists("SELECT 1 FROM sj_system_user su WHERE su.id = sj_system_variable.system_user_id AND su.role = 2");
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
wrapper.exists("SELECT 1 FROM sj_system_user su WHERE su.id = sj_system_variable.system_user_id AND su.role = 2");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return systemVariableMapper.selectList(wrapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据组名获取变量
|
||||||
|
*/
|
||||||
|
private List<SystemVariable> getVariablesByGroup(String groupName, String namespaceId) {
|
||||||
|
LambdaQueryWrapper<SystemVariable> wrapper = new LambdaQueryWrapper<SystemVariable>()
|
||||||
|
.eq(SystemVariable::getNamespaceId, namespaceId)
|
||||||
|
.and(w -> {
|
||||||
|
// 该组的变量
|
||||||
|
w.like(SystemVariable::getGroupNames, groupName)
|
||||||
|
// admin创建的变量(所有组都可见)
|
||||||
|
.or().exists("SELECT 1 FROM sj_system_user su WHERE su.id = sj_system_variable.system_user_id AND su.role = 2");
|
||||||
|
});
|
||||||
|
|
||||||
|
return systemVariableMapper.selectList(wrapper);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,26 @@
|
|||||||
|
package com.aizuda.snailjob.server.common.util;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public interface SystemVariableProvider {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取用户变量(Web工程使用,需要权限控制)
|
||||||
|
*/
|
||||||
|
Map<String, String> getUserVariables(Long userId, String namespaceId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取用户变量类型
|
||||||
|
*/
|
||||||
|
Map<String, Integer> getUserVariableTypes(Long userId, String namespaceId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据组名获取变量(工作流执行时使用,不需要权限控制)
|
||||||
|
*/
|
||||||
|
Map<String, String> getGroupVariables(String groupName, String namespaceId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据组名获取变量类型
|
||||||
|
*/
|
||||||
|
Map<String, Integer> getGroupVariableTypes(String groupName, String namespaceId);
|
||||||
|
}
|
||||||
@ -0,0 +1,207 @@
|
|||||||
|
package com.aizuda.snailjob.server.common.util;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||||
|
import com.aizuda.snailjob.common.core.expression.ExpressionEngine;
|
||||||
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
|
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class VariableReplacementUtil {
|
||||||
|
|
||||||
|
private final SystemVariableProvider systemVariableProvider;
|
||||||
|
|
||||||
|
private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([^}]+)\\}");
|
||||||
|
|
||||||
|
// 变量类型常量
|
||||||
|
private static final int STRING_TYPE = 1;
|
||||||
|
private static final int EXPRESSION_TYPE = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Web工程使用:基于用户ID替换变量(需要权限控制)
|
||||||
|
*/
|
||||||
|
public String replaceVariablesByUser(String context, Long userId, String namespaceId) {
|
||||||
|
if (StrUtil.isBlank(context)) {
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> userVariables = systemVariableProvider.getUserVariables(userId, namespaceId);
|
||||||
|
Map<String, Integer> variableTypes = systemVariableProvider.getUserVariableTypes(userId, namespaceId);
|
||||||
|
return doReplace(context, userVariables, variableTypes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流执行时使用:基于组名替换变量(不需要权限控制)
|
||||||
|
*/
|
||||||
|
public String replaceVariablesByGroup(String context, String groupName, String namespaceId) {
|
||||||
|
if (StrUtil.isBlank(context)) {
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> groupVariables = systemVariableProvider.getGroupVariables(groupName, namespaceId);
|
||||||
|
Map<String, Integer> variableTypes = systemVariableProvider.getGroupVariableTypes(groupName, namespaceId);
|
||||||
|
return doReplace(context, groupVariables, variableTypes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String doReplace(String context, Map<String, String> variables, Map<String, Integer> variableTypes) {
|
||||||
|
Matcher matcher = VARIABLE_PATTERN.matcher(context);
|
||||||
|
StringBuffer result = new StringBuffer();
|
||||||
|
|
||||||
|
while (matcher.find()) {
|
||||||
|
String variableName = matcher.group(1);
|
||||||
|
String replacement = getVariableValue(variableName, variables, variableTypes);
|
||||||
|
matcher.appendReplacement(result, replacement);
|
||||||
|
}
|
||||||
|
matcher.appendTail(result);
|
||||||
|
|
||||||
|
return result.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getVariableValue(String variableName, Map<String, String> variables, Map<String, Integer> variableTypes) {
|
||||||
|
String variableValue = variables.get(variableName);
|
||||||
|
Integer variableType = variableTypes.get(variableName);
|
||||||
|
|
||||||
|
if (variableValue != null && variableType != null) {
|
||||||
|
if (variableType == STRING_TYPE) {
|
||||||
|
// 字符串类型,直接返回
|
||||||
|
return variableValue;
|
||||||
|
} else if (variableType == EXPRESSION_TYPE) {
|
||||||
|
// 表达式类型,进行表达式计算
|
||||||
|
return evaluateExpression(variableValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 系统内置变量(向后兼容)
|
||||||
|
return getBuiltInVariable(variableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 计算表达式
|
||||||
|
*/
|
||||||
|
private String evaluateExpression(String expression) {
|
||||||
|
try {
|
||||||
|
// 创建时间上下文
|
||||||
|
Map<String, Object> timeContext = createTimeContext();
|
||||||
|
|
||||||
|
// 直接使用SpEL引擎,避免循环依赖
|
||||||
|
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.SPEL.getExpressionEngine();
|
||||||
|
|
||||||
|
// 将上下文转换为JSON字符串,因为表达式引擎期望字符串参数
|
||||||
|
String contextJson = JsonUtil.toJsonString(timeContext);
|
||||||
|
Object result = realExpressionEngine.eval(expression, contextJson);
|
||||||
|
return String.valueOf(result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 表达式计算失败,返回原始值
|
||||||
|
return expression;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建时间上下文,提供时间相关的变量和函数
|
||||||
|
*/
|
||||||
|
private Map<String, Object> createTimeContext() {
|
||||||
|
Map<String, Object> context = new HashMap<>();
|
||||||
|
LocalDateTime now = LocalDateTime.now();
|
||||||
|
|
||||||
|
// 基础时间变量
|
||||||
|
context.put("now", now);
|
||||||
|
context.put("date", now.format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD)));
|
||||||
|
context.put("time", now.format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD_HH_MM_SS)));
|
||||||
|
context.put("timestamp", System.currentTimeMillis());
|
||||||
|
|
||||||
|
// 时间格式化函数
|
||||||
|
context.put("formatDate", new DateFormatFunction());
|
||||||
|
context.put("formatTime", new TimeFormatFunction());
|
||||||
|
|
||||||
|
// 时间计算函数
|
||||||
|
context.put("plusDays", new PlusDaysFunction());
|
||||||
|
context.put("minusDays", new MinusDaysFunction());
|
||||||
|
context.put("plusMonths", new PlusMonthsFunction());
|
||||||
|
context.put("minusMonths", new MinusMonthsFunction());
|
||||||
|
|
||||||
|
// 随机数函数
|
||||||
|
context.put("random", new RandomFunction());
|
||||||
|
|
||||||
|
// 系统属性
|
||||||
|
context.put("systemProperty", new SystemPropertyFunction());
|
||||||
|
context.put("env", System.getenv());
|
||||||
|
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取内置变量(向后兼容)
|
||||||
|
*/
|
||||||
|
private String getBuiltInVariable(String variableName) {
|
||||||
|
switch (variableName) {
|
||||||
|
case "date":
|
||||||
|
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD));
|
||||||
|
case "time":
|
||||||
|
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD_HH_MM_SS));
|
||||||
|
case "timestamp":
|
||||||
|
return String.valueOf(System.currentTimeMillis());
|
||||||
|
default:
|
||||||
|
return "${" + variableName + "}";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 内部函数类
|
||||||
|
public static class DateFormatFunction {
|
||||||
|
public String format(String pattern) {
|
||||||
|
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(pattern));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TimeFormatFunction {
|
||||||
|
public String format(String pattern) {
|
||||||
|
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(pattern));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class PlusDaysFunction {
|
||||||
|
public String apply(int days) {
|
||||||
|
return LocalDateTime.now().plusDays(days).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MinusDaysFunction {
|
||||||
|
public String apply(int days) {
|
||||||
|
return LocalDateTime.now().minusDays(days).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class PlusMonthsFunction {
|
||||||
|
public String apply(int months) {
|
||||||
|
return LocalDateTime.now().plusMonths(months).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MinusMonthsFunction {
|
||||||
|
public String apply(int months) {
|
||||||
|
return LocalDateTime.now().minusMonths(months).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class RandomFunction {
|
||||||
|
public int between(int min, int max) {
|
||||||
|
return (int) (Math.random() * (max - min + 1)) + min;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class SystemPropertyFunction {
|
||||||
|
public String get(String key) {
|
||||||
|
return System.getProperty(key, "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,5 +1,6 @@
|
|||||||
package com.aizuda.snailjob.server.job.task.support.dispatch;
|
package com.aizuda.snailjob.server.job.task.support.dispatch;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.server.common.util.VariableReplacementUtil;
|
||||||
import org.apache.pekko.actor.AbstractActor;
|
import org.apache.pekko.actor.AbstractActor;
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
@ -78,6 +79,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
private final JobTaskBatchHandler jobTaskBatchHandler;
|
private final JobTaskBatchHandler jobTaskBatchHandler;
|
||||||
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
|
|
||||||
|
private static VariableReplacementUtil variableReplacementUtil;
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
|
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
|
||||||
@ -222,6 +224,16 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
if (Objects.nonNull(workflowTaskBatch)) {
|
if (Objects.nonNull(workflowTaskBatch)) {
|
||||||
context.setWfContext(workflowTaskBatch.getWfContext());
|
context.setWfContext(workflowTaskBatch.getWfContext());
|
||||||
}
|
}
|
||||||
|
//工作流上下文变量更新
|
||||||
|
if (Objects.nonNull(workflowTaskBatch)) {
|
||||||
|
VariableReplacementUtil variableReplacementUtil = getVariableReplacementUtil();
|
||||||
|
String replacedContext = variableReplacementUtil.replaceVariablesByGroup(
|
||||||
|
workflowTaskBatch.getWfContext(),
|
||||||
|
workflowTaskBatch.getGroupName(),
|
||||||
|
workflowTaskBatch.getNamespaceId()
|
||||||
|
);
|
||||||
|
context.setWfContext(replacedContext);
|
||||||
|
}
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,5 +256,8 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
.build()));
|
.build()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
private static VariableReplacementUtil getVariableReplacementUtil() {
|
||||||
|
return SnailSpringContext.getContext().getBean(VariableReplacementUtil.class);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,7 @@
|
|||||||
package com.aizuda.snailjob.server.job.task.support.dispatch;
|
package com.aizuda.snailjob.server.job.task.support.dispatch;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||||
|
import com.aizuda.snailjob.server.common.util.VariableReplacementUtil;
|
||||||
import org.apache.pekko.actor.AbstractActor;
|
import org.apache.pekko.actor.AbstractActor;
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
@ -27,6 +29,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
|
|||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -54,7 +57,6 @@ public class ReduceActor extends AbstractActor {
|
|||||||
private final JobTaskMapper jobTaskMapper;
|
private final JobTaskMapper jobTaskMapper;
|
||||||
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
private final JobTaskBatchHandler jobTaskBatchHandler;
|
private final JobTaskBatchHandler jobTaskBatchHandler;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
|
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
|
||||||
@ -138,7 +140,18 @@ public class ReduceActor extends AbstractActor {
|
|||||||
context.setTaskBatchId(reduceTask.getTaskBatchId());
|
context.setTaskBatchId(reduceTask.getTaskBatchId());
|
||||||
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
|
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
|
||||||
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
|
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
|
||||||
context.setWfContext(wfContext);
|
// 工作流上下文变量更新
|
||||||
|
// context.setWfContext(wfContext);
|
||||||
|
VariableReplacementUtil variableReplacementUtil = getVariableReplacementUtil();
|
||||||
|
String replacedContext = variableReplacementUtil.replaceVariablesByGroup(
|
||||||
|
wfContext,
|
||||||
|
context.getGroupName(),
|
||||||
|
context.getNamespaceId()
|
||||||
|
);
|
||||||
|
context.setWfContext(replacedContext);
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
private static VariableReplacementUtil getVariableReplacementUtil() {
|
||||||
|
return SnailSpringContext.getContext().getBean(VariableReplacementUtil.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package com.aizuda.snailjob.server.job.task.support.dispatch;
|
package com.aizuda.snailjob.server.job.task.support.dispatch;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.server.common.util.VariableReplacementUtil;
|
||||||
import org.apache.pekko.actor.AbstractActor;
|
import org.apache.pekko.actor.AbstractActor;
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
@ -31,6 +32,7 @@ import com.google.common.collect.Sets;
|
|||||||
import com.google.common.graph.MutableGraph;
|
import com.google.common.graph.MutableGraph;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -65,7 +67,8 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
private final JobMapper jobMapper;
|
private final JobMapper jobMapper;
|
||||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||||
private final WorkflowBatchHandler workflowBatchHandler;
|
private final WorkflowBatchHandler workflowBatchHandler;
|
||||||
|
@Autowired
|
||||||
|
private VariableReplacementUtil variableReplacementUtil;
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
|
return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
|
||||||
@ -209,7 +212,15 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
context.setEvaluationResult(evaluationResult);
|
context.setEvaluationResult(evaluationResult);
|
||||||
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
|
context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
|
||||||
context.setWfContext(workflowTaskBatch.getWfContext());
|
|
||||||
|
//工作流上下文变量更新
|
||||||
|
// context.setWfContext(workflowTaskBatch.getWfContext());
|
||||||
|
String replacedWfContext = variableReplacementUtil.replaceVariablesByGroup(
|
||||||
|
workflowTaskBatch.getWfContext(),
|
||||||
|
workflowTaskBatch.getGroupName(),
|
||||||
|
workflowTaskBatch.getNamespaceId()
|
||||||
|
);
|
||||||
|
context.setWfContext(replacedWfContext);
|
||||||
// 这里父节点取最新的批次判断状态
|
// 这里父节点取最新的批次判断状态
|
||||||
if (CollUtil.isNotEmpty(parentJobTaskBatchList)) {
|
if (CollUtil.isNotEmpty(parentJobTaskBatchList)) {
|
||||||
fillParentOperationReason(allJobTaskBatchList, parentJobTaskBatchList, parentWorkflowNode, context);
|
fillParentOperationReason(allJobTaskBatchList, parentJobTaskBatchList, parentWorkflowNode, context);
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
|
|||||||
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
|
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
|
||||||
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
||||||
import com.aizuda.snailjob.server.common.rpc.okhttp.RequestInterceptor;
|
import com.aizuda.snailjob.server.common.rpc.okhttp.RequestInterceptor;
|
||||||
|
import com.aizuda.snailjob.server.common.util.VariableReplacementUtil;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
|
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
|
||||||
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
|
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
|
||||||
import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO;
|
import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO;
|
||||||
@ -16,6 +17,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
|||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||||
import com.github.rholder.retry.*;
|
import com.github.rholder.retry.*;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.http.HttpEntity;
|
import org.springframework.http.HttpEntity;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
@ -43,7 +45,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
|
|
||||||
private static final String CALLBACK_TIMEOUT = "10";
|
private static final String CALLBACK_TIMEOUT = "10";
|
||||||
private final RestTemplate restTemplate;
|
private final RestTemplate restTemplate;
|
||||||
|
@Autowired
|
||||||
|
private VariableReplacementUtil variableReplacementUtil;
|
||||||
@Override
|
@Override
|
||||||
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
||||||
return WorkflowNodeTypeEnum.CALLBACK;
|
return WorkflowNodeTypeEnum.CALLBACK;
|
||||||
@ -89,8 +92,14 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
// 设置回调超时时间
|
// 设置回调超时时间
|
||||||
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
|
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
|
||||||
|
|
||||||
|
//工作流上下文变量更新
|
||||||
CallbackParamsDTO callbackParamsDTO = new CallbackParamsDTO();
|
CallbackParamsDTO callbackParamsDTO = new CallbackParamsDTO();
|
||||||
callbackParamsDTO.setWfContext(context.getWfContext());
|
String replacedContext = variableReplacementUtil.replaceVariablesByGroup(
|
||||||
|
context.getWfContext(),
|
||||||
|
context.getGroupName(),
|
||||||
|
context.getNamespaceId()
|
||||||
|
);
|
||||||
|
callbackParamsDTO.setWfContext(replacedContext);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Map<String, String> uriVariables = new HashMap<>();
|
Map<String, String> uriVariables = new HashMap<>();
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import com.aizuda.snailjob.server.common.dto.DecisionConfig;
|
|||||||
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
||||||
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
|
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
|
import com.aizuda.snailjob.server.common.util.VariableReplacementUtil;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
|
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
|
||||||
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
|
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
|
||||||
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
|
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
|
||||||
@ -23,6 +24,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
|
|||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@ -42,6 +44,8 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||||
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
|
@Autowired
|
||||||
|
private VariableReplacementUtil variableReplacementUtil;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
||||||
@ -82,7 +86,13 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
if (Objects.isNull(workflowTaskBatch)) {
|
if (Objects.isNull(workflowTaskBatch)) {
|
||||||
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
|
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
|
||||||
} else {
|
} else {
|
||||||
wfContext = workflowTaskBatch.getWfContext();
|
//工作流上下文变量更新
|
||||||
|
wfContext = variableReplacementUtil.replaceVariablesByGroup(
|
||||||
|
workflowTaskBatch.getWfContext(),
|
||||||
|
context.getGroupName(),
|
||||||
|
context.getNamespaceId()
|
||||||
|
);
|
||||||
|
|
||||||
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
|
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
|
||||||
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("Expression engine does not exist"));
|
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("Expression engine does not exist"));
|
||||||
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
|
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
|
|||||||
import cn.hutool.core.net.url.UrlQuery;
|
import cn.hutool.core.net.url.UrlQuery;
|
||||||
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
|
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
|
||||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||||
|
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
|
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
|
||||||
@ -15,6 +16,7 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
|||||||
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
|
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
|
||||||
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
|
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
|
||||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||||
|
import com.aizuda.snailjob.server.common.util.VariableReplacementUtil;
|
||||||
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
|
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
|
||||||
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
||||||
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
|
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
|
||||||
@ -108,7 +110,14 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
|
|||||||
.eq(WorkflowTaskBatch::getId, mapTaskRequest.getWorkflowTaskBatchId())
|
.eq(WorkflowTaskBatch::getId, mapTaskRequest.getWorkflowTaskBatchId())
|
||||||
);
|
);
|
||||||
Assert.notNull(workflowTaskBatch, ()-> new SnailJobServerException("workflowTaskBatch is null. id:[{}]", mapTaskRequest.getWorkflowTaskBatchId()));
|
Assert.notNull(workflowTaskBatch, ()-> new SnailJobServerException("workflowTaskBatch is null. id:[{}]", mapTaskRequest.getWorkflowTaskBatchId()));
|
||||||
newWfContext = workflowTaskBatch.getWfContext();
|
// newWfContext = workflowTaskBatch.getWfContext();
|
||||||
|
//工作流上下文变量更新
|
||||||
|
VariableReplacementUtil variableReplacementUtil = getVariableReplacementUtil();
|
||||||
|
newWfContext = variableReplacementUtil.replaceVariablesByGroup(
|
||||||
|
workflowTaskBatch.getWfContext(),
|
||||||
|
HttpHeaderUtil.getGroupName(headers),
|
||||||
|
HttpHeaderUtil.getNamespace(headers)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 执行任务
|
// 执行任务
|
||||||
@ -129,5 +138,8 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
|
|||||||
context.setWfContext(newWfContext);
|
context.setWfContext(newWfContext);
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
private static VariableReplacementUtil getVariableReplacementUtil() {
|
||||||
|
return SnailSpringContext.getContext().getBean(VariableReplacementUtil.class);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,10 +22,7 @@ import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
|
|||||||
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
|
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
|
||||||
import com.aizuda.snailjob.server.common.util.CronUtils;
|
import com.aizuda.snailjob.server.common.util.*;
|
||||||
import com.aizuda.snailjob.server.common.util.DateUtils;
|
|
||||||
import com.aizuda.snailjob.server.common.util.GraphUtils;
|
|
||||||
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
|
|
||||||
import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
|
import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
|
||||||
import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO;
|
import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
|
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
@ -57,6 +54,7 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
@ -87,7 +85,8 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
private final AccessTemplate accessTemplate;
|
private final AccessTemplate accessTemplate;
|
||||||
private final GroupHandler groupHandler;
|
private final GroupHandler groupHandler;
|
||||||
private final JobSummaryMapper jobSummaryMapper;
|
private final JobSummaryMapper jobSummaryMapper;
|
||||||
|
@Autowired
|
||||||
|
private VariableReplacementUtil variableReplacementUtil;
|
||||||
private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) {
|
private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) {
|
||||||
checkExecuteInterval(workflowRequestVO);
|
checkExecuteInterval(workflowRequestVO);
|
||||||
|
|
||||||
@ -298,6 +297,12 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
prepareDTO.setOperationReason(JobOperationReasonEnum.MANUAL_TRIGGER.getReason());
|
prepareDTO.setOperationReason(JobOperationReasonEnum.MANUAL_TRIGGER.getReason());
|
||||||
String tmpWfContext = triggerVO.getTmpWfContext();
|
String tmpWfContext = triggerVO.getTmpWfContext();
|
||||||
if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){
|
if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){
|
||||||
|
//工作流上下文变量更新
|
||||||
|
tmpWfContext = variableReplacementUtil.replaceVariablesByGroup(
|
||||||
|
triggerVO.getTmpWfContext(),
|
||||||
|
workflow.getGroupName(),
|
||||||
|
workflow.getNamespaceId()
|
||||||
|
);
|
||||||
prepareDTO.setWfContext(tmpWfContext);
|
prepareDTO.setWfContext(tmpWfContext);
|
||||||
}
|
}
|
||||||
terminalWorkflowPrepareHandler.handler(prepareDTO);
|
terminalWorkflowPrepareHandler.handler(prepareDTO);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user