diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/DefaultSystemVariableProvider.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/DefaultSystemVariableProvider.java new file mode 100644 index 00000000..33f665ff --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/DefaultSystemVariableProvider.java @@ -0,0 +1,134 @@ +package com.aizuda.snailjob.server.common.util; + +import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; +import com.aizuda.snailjob.template.datasource.persistence.mapper.SystemUserMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.SystemUserPermissionMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.SystemVariableMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.SystemUser; +import com.aizuda.snailjob.template.datasource.persistence.po.SystemUserPermission; +import com.aizuda.snailjob.template.datasource.persistence.po.SystemVariable; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +@Component +@RequiredArgsConstructor +public class DefaultSystemVariableProvider implements SystemVariableProvider { + + private final SystemVariableMapper systemVariableMapper; + private final SystemUserMapper systemUserMapper; + private final SystemUserPermissionMapper systemUserPermissionMapper; + + @Override + public Map getUserVariables(Long userId, String namespaceId) { + // Web工程使用,需要权限控制 + SystemUser systemUser = systemUserMapper.selectById(userId); + if (Objects.isNull(systemUser)) { + return Collections.emptyMap(); + } + + List variables = getVariablesByUserPermission(systemUser, namespaceId); + return variables.stream() + .collect(Collectors.toMap( + SystemVariable::getVariableName, + SystemVariable::getVariableValue, + (existing, replacement) -> existing + )); + } + + @Override + public Map getUserVariableTypes(Long userId, String namespaceId) { + // 获取用户变量类型映射 + SystemUser systemUser = systemUserMapper.selectById(userId); + if (Objects.isNull(systemUser)) { + return Collections.emptyMap(); + } + + List variables = getVariablesByUserPermission(systemUser, namespaceId); + return variables.stream() + .collect(Collectors.toMap( + SystemVariable::getVariableName, + SystemVariable::getVariableType, + (existing, replacement) -> existing + )); + } + + @Override + public Map getGroupVariables(String groupName, String namespaceId) { + // 工作流执行时使用,不需要权限控制,直接获取组变量和admin变量 + List variables = getVariablesByGroup(groupName, namespaceId); + return variables.stream() + .collect(Collectors.toMap( + SystemVariable::getVariableName, + SystemVariable::getVariableValue, + (existing, replacement) -> existing + )); + } + + @Override + public Map getGroupVariableTypes(String groupName, String namespaceId) { + // 获取组变量类型映射 + List variables = getVariablesByGroup(groupName, namespaceId); + return variables.stream() + .collect(Collectors.toMap( + SystemVariable::getVariableName, + SystemVariable::getVariableType, + (existing, replacement) -> existing + )); + } + + /** + * 根据用户权限获取变量 + */ + private List getVariablesByUserPermission(SystemUser systemUser, String namespaceId) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(SystemVariable::getNamespaceId, namespaceId); + + // 根据用户角色过滤 + if (systemUser.getRole() != 2) { // 不是admin用户 + List userPermissions = systemUserPermissionMapper.selectList( + new LambdaQueryWrapper() + .select(SystemUserPermission::getGroupName) + .eq(SystemUserPermission::getSystemUserId, systemUser.getId()) + .eq(SystemUserPermission::getNamespaceId, namespaceId) + ); + + List userGroupNames = StreamUtils.toList(userPermissions, SystemUserPermission::getGroupName); + if (CollUtil.isNotEmpty(userGroupNames)) { + wrapper.and(w -> { + for (String groupName : userGroupNames) { + w.or().like(SystemVariable::getGroupNames, groupName); + } + w.or().exists("SELECT 1 FROM sj_system_user su WHERE su.id = sj_system_variable.system_user_id AND su.role = 2"); + }); + } else { + wrapper.exists("SELECT 1 FROM sj_system_user su WHERE su.id = sj_system_variable.system_user_id AND su.role = 2"); + } + } + + return systemVariableMapper.selectList(wrapper); + } + + /** + * 根据组名获取变量 + */ + private List getVariablesByGroup(String groupName, String namespaceId) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(SystemVariable::getNamespaceId, namespaceId) + .and(w -> { + // 该组的变量 + w.like(SystemVariable::getGroupNames, groupName) + // admin创建的变量(所有组都可见) + .or().exists("SELECT 1 FROM sj_system_user su WHERE su.id = sj_system_variable.system_user_id AND su.role = 2"); + }); + + return systemVariableMapper.selectList(wrapper); + } +} \ No newline at end of file diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/SystemVariableProvider.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/SystemVariableProvider.java new file mode 100644 index 00000000..4d04f4d5 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/SystemVariableProvider.java @@ -0,0 +1,26 @@ +package com.aizuda.snailjob.server.common.util; + +import java.util.Map; + +public interface SystemVariableProvider { + + /** + * 获取用户变量(Web工程使用,需要权限控制) + */ + Map getUserVariables(Long userId, String namespaceId); + + /** + * 获取用户变量类型 + */ + Map getUserVariableTypes(Long userId, String namespaceId); + + /** + * 根据组名获取变量(工作流执行时使用,不需要权限控制) + */ + Map getGroupVariables(String groupName, String namespaceId); + + /** + * 根据组名获取变量类型 + */ + Map getGroupVariableTypes(String groupName, String namespaceId); +} \ No newline at end of file diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/VariableReplacementUtil.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/VariableReplacementUtil.java new file mode 100644 index 00000000..63435673 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/VariableReplacementUtil.java @@ -0,0 +1,207 @@ +package com.aizuda.snailjob.server.common.util; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.expression.ExpressionEngine; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Component +@RequiredArgsConstructor +public class VariableReplacementUtil { + + private final SystemVariableProvider systemVariableProvider; + + private static final Pattern VARIABLE_PATTERN = Pattern.compile("\\$\\{([^}]+)\\}"); + + // 变量类型常量 + private static final int STRING_TYPE = 1; + private static final int EXPRESSION_TYPE = 2; + + /** + * Web工程使用:基于用户ID替换变量(需要权限控制) + */ + public String replaceVariablesByUser(String context, Long userId, String namespaceId) { + if (StrUtil.isBlank(context)) { + return context; + } + + Map userVariables = systemVariableProvider.getUserVariables(userId, namespaceId); + Map variableTypes = systemVariableProvider.getUserVariableTypes(userId, namespaceId); + return doReplace(context, userVariables, variableTypes); + } + + /** + * 工作流执行时使用:基于组名替换变量(不需要权限控制) + */ + public String replaceVariablesByGroup(String context, String groupName, String namespaceId) { + if (StrUtil.isBlank(context)) { + return context; + } + + Map groupVariables = systemVariableProvider.getGroupVariables(groupName, namespaceId); + Map variableTypes = systemVariableProvider.getGroupVariableTypes(groupName, namespaceId); + return doReplace(context, groupVariables, variableTypes); + } + + private String doReplace(String context, Map variables, Map variableTypes) { + Matcher matcher = VARIABLE_PATTERN.matcher(context); + StringBuffer result = new StringBuffer(); + + while (matcher.find()) { + String variableName = matcher.group(1); + String replacement = getVariableValue(variableName, variables, variableTypes); + matcher.appendReplacement(result, replacement); + } + matcher.appendTail(result); + + return result.toString(); + } + + private String getVariableValue(String variableName, Map variables, Map variableTypes) { + String variableValue = variables.get(variableName); + Integer variableType = variableTypes.get(variableName); + + if (variableValue != null && variableType != null) { + if (variableType == STRING_TYPE) { + // 字符串类型,直接返回 + return variableValue; + } else if (variableType == EXPRESSION_TYPE) { + // 表达式类型,进行表达式计算 + return evaluateExpression(variableValue); + } + } + + // 系统内置变量(向后兼容) + return getBuiltInVariable(variableName); + } + + /** + * 计算表达式 + */ + private String evaluateExpression(String expression) { + try { + // 创建时间上下文 + Map timeContext = createTimeContext(); + + // 直接使用SpEL引擎,避免循环依赖 + ExpressionEngine realExpressionEngine = ExpressionTypeEnum.SPEL.getExpressionEngine(); + + // 将上下文转换为JSON字符串,因为表达式引擎期望字符串参数 + String contextJson = JsonUtil.toJsonString(timeContext); + Object result = realExpressionEngine.eval(expression, contextJson); + return String.valueOf(result); + } catch (Exception e) { + // 表达式计算失败,返回原始值 + return expression; + } + } + + + /** + * 创建时间上下文,提供时间相关的变量和函数 + */ + private Map createTimeContext() { + Map context = new HashMap<>(); + LocalDateTime now = LocalDateTime.now(); + + // 基础时间变量 + context.put("now", now); + context.put("date", now.format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD))); + context.put("time", now.format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD_HH_MM_SS))); + context.put("timestamp", System.currentTimeMillis()); + + // 时间格式化函数 + context.put("formatDate", new DateFormatFunction()); + context.put("formatTime", new TimeFormatFunction()); + + // 时间计算函数 + context.put("plusDays", new PlusDaysFunction()); + context.put("minusDays", new MinusDaysFunction()); + context.put("plusMonths", new PlusMonthsFunction()); + context.put("minusMonths", new MinusMonthsFunction()); + + // 随机数函数 + context.put("random", new RandomFunction()); + + // 系统属性 + context.put("systemProperty", new SystemPropertyFunction()); + context.put("env", System.getenv()); + + return context; + } + + /** + * 获取内置变量(向后兼容) + */ + private String getBuiltInVariable(String variableName) { + switch (variableName) { + case "date": + return LocalDateTime.now().format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD)); + case "time": + return LocalDateTime.now().format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD_HH_MM_SS)); + case "timestamp": + return String.valueOf(System.currentTimeMillis()); + default: + return "${" + variableName + "}"; + } + } + + // 内部函数类 + public static class DateFormatFunction { + public String format(String pattern) { + return LocalDateTime.now().format(DateTimeFormatter.ofPattern(pattern)); + } + } + + public static class TimeFormatFunction { + public String format(String pattern) { + return LocalDateTime.now().format(DateTimeFormatter.ofPattern(pattern)); + } + } + + public static class PlusDaysFunction { + public String apply(int days) { + return LocalDateTime.now().plusDays(days).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD)); + } + } + + public static class MinusDaysFunction { + public String apply(int days) { + return LocalDateTime.now().minusDays(days).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD)); + } + } + + public static class PlusMonthsFunction { + public String apply(int months) { + return LocalDateTime.now().plusMonths(months).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD)); + } + } + + public static class MinusMonthsFunction { + public String apply(int months) { + return LocalDateTime.now().minusMonths(months).format(DateTimeFormatter.ofPattern(SystemConstants.YYYY_MM_DD)); + } + } + + public static class RandomFunction { + public int between(int min, int max) { + return (int) (Math.random() * (max - min + 1)) + min; + } + } + + public static class SystemPropertyFunction { + public String get(String key) { + return System.getProperty(key, ""); + } + } +} \ No newline at end of file diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 5324557b..655f3ed8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; +import com.aizuda.snailjob.server.common.util.VariableReplacementUtil; import org.apache.pekko.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; @@ -78,6 +79,7 @@ public class JobExecutorActor extends AbstractActor { private final JobTaskBatchHandler jobTaskBatchHandler; private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + private static VariableReplacementUtil variableReplacementUtil; @Override public Receive createReceive() { return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> { @@ -222,6 +224,16 @@ public class JobExecutorActor extends AbstractActor { if (Objects.nonNull(workflowTaskBatch)) { context.setWfContext(workflowTaskBatch.getWfContext()); } + //工作流上下文变量更新 + if (Objects.nonNull(workflowTaskBatch)) { + VariableReplacementUtil variableReplacementUtil = getVariableReplacementUtil(); + String replacedContext = variableReplacementUtil.replaceVariablesByGroup( + workflowTaskBatch.getWfContext(), + workflowTaskBatch.getGroupName(), + workflowTaskBatch.getNamespaceId() + ); + context.setWfContext(replacedContext); + } return context; } @@ -244,5 +256,8 @@ public class JobExecutorActor extends AbstractActor { .build())); } } + private static VariableReplacementUtil getVariableReplacementUtil() { + return SnailSpringContext.getContext().getBean(VariableReplacementUtil.class); + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index f516988a..ccac7a6e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -1,5 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.server.common.util.VariableReplacementUtil; import org.apache.pekko.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; @@ -27,6 +29,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -54,7 +57,6 @@ public class ReduceActor extends AbstractActor { private final JobTaskMapper jobTaskMapper; private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final JobTaskBatchHandler jobTaskBatchHandler; - @Override public Receive createReceive() { return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> { @@ -138,7 +140,18 @@ public class ReduceActor extends AbstractActor { context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId()); context.setWorkflowNodeId(reduceTask.getWorkflowNodeId()); - context.setWfContext(wfContext); +// 工作流上下文变量更新 +// context.setWfContext(wfContext); + VariableReplacementUtil variableReplacementUtil = getVariableReplacementUtil(); + String replacedContext = variableReplacementUtil.replaceVariablesByGroup( + wfContext, + context.getGroupName(), + context.getNamespaceId() + ); + context.setWfContext(replacedContext); return context; } + private static VariableReplacementUtil getVariableReplacementUtil() { + return SnailSpringContext.getContext().getBean(VariableReplacementUtil.class); + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 3d4c251b..7ad85373 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; +import com.aizuda.snailjob.server.common.util.VariableReplacementUtil; import org.apache.pekko.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; @@ -31,6 +32,7 @@ import com.google.common.collect.Sets; import com.google.common.graph.MutableGraph; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -65,7 +67,8 @@ public class WorkflowExecutorActor extends AbstractActor { private final JobMapper jobMapper; private final JobTaskBatchMapper jobTaskBatchMapper; private final WorkflowBatchHandler workflowBatchHandler; - + @Autowired + private VariableReplacementUtil variableReplacementUtil; @Override public Receive createReceive() { return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> { @@ -209,7 +212,15 @@ public class WorkflowExecutorActor extends AbstractActor { context.setEvaluationResult(evaluationResult); context.setTaskBatchId(taskExecute.getTaskBatchId()); context.setTaskExecutorScene(taskExecute.getTaskExecutorScene()); - context.setWfContext(workflowTaskBatch.getWfContext()); + + //工作流上下文变量更新 +// context.setWfContext(workflowTaskBatch.getWfContext()); + String replacedWfContext = variableReplacementUtil.replaceVariablesByGroup( + workflowTaskBatch.getWfContext(), + workflowTaskBatch.getGroupName(), + workflowTaskBatch.getNamespaceId() + ); + context.setWfContext(replacedWfContext); // 这里父节点取最新的批次判断状态 if (CollUtil.isNotEmpty(parentJobTaskBatchList)) { fillParentOperationReason(allJobTaskBatchList, parentJobTaskBatchList, parentWorkflowNode, context); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index 9ef8179a..4df06f64 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -9,6 +9,7 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.dto.CallbackConfig; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.rpc.okhttp.RequestInterceptor; +import com.aizuda.snailjob.server.common.util.VariableReplacementUtil; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO; @@ -16,6 +17,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.github.rholder.retry.*; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -43,7 +45,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { private static final String CALLBACK_TIMEOUT = "10"; private final RestTemplate restTemplate; - + @Autowired + private VariableReplacementUtil variableReplacementUtil; @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { return WorkflowNodeTypeEnum.CALLBACK; @@ -89,8 +92,14 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { // 设置回调超时时间 requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT); + //工作流上下文变量更新 CallbackParamsDTO callbackParamsDTO = new CallbackParamsDTO(); - callbackParamsDTO.setWfContext(context.getWfContext()); + String replacedContext = variableReplacementUtil.replaceVariablesByGroup( + context.getWfContext(), + context.getGroupName(), + context.getNamespaceId() + ); + callbackParamsDTO.setWfContext(replacedContext); try { Map uriVariables = new HashMap<>(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java index 9a17e4a6..763917bb 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java @@ -13,6 +13,7 @@ import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.util.VariableReplacementUtil; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler; @@ -23,6 +24,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Objects; @@ -42,6 +44,8 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF @Slf4j public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + @Autowired + private VariableReplacementUtil variableReplacementUtil; @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { @@ -82,7 +86,13 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { if (Objects.isNull(workflowTaskBatch)) { operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason(); } else { - wfContext = workflowTaskBatch.getWfContext(); + //工作流上下文变量更新 + wfContext = variableReplacementUtil.replaceVariablesByGroup( + workflowTaskBatch.getWfContext(), + context.getGroupName(), + context.getNamespaceId() + ); + ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType()); Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("Expression engine does not exist")); ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index e82c4050..e38e6fb1 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.net.url.UrlQuery; import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; @@ -15,6 +16,7 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; import com.aizuda.snailjob.server.common.util.HttpHeaderUtil; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; +import com.aizuda.snailjob.server.common.util.VariableReplacementUtil; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; @@ -108,7 +110,14 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { .eq(WorkflowTaskBatch::getId, mapTaskRequest.getWorkflowTaskBatchId()) ); Assert.notNull(workflowTaskBatch, ()-> new SnailJobServerException("workflowTaskBatch is null. id:[{}]", mapTaskRequest.getWorkflowTaskBatchId())); - newWfContext = workflowTaskBatch.getWfContext(); +// newWfContext = workflowTaskBatch.getWfContext(); + //工作流上下文变量更新 + VariableReplacementUtil variableReplacementUtil = getVariableReplacementUtil(); + newWfContext = variableReplacementUtil.replaceVariablesByGroup( + workflowTaskBatch.getWfContext(), + HttpHeaderUtil.getGroupName(headers), + HttpHeaderUtil.getNamespace(headers) + ); } // 执行任务 @@ -129,5 +138,8 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { context.setWfContext(newWfContext); return context; } + private static VariableReplacementUtil getVariableReplacementUtil() { + return SnailSpringContext.getContext().getBean(VariableReplacementUtil.class); + } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowServiceImpl.java index 736ebc86..26bf7689 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/WorkflowServiceImpl.java @@ -22,10 +22,7 @@ import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; -import com.aizuda.snailjob.server.common.util.CronUtils; -import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.common.util.GraphUtils; -import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; +import com.aizuda.snailjob.server.common.util.*; import com.aizuda.snailjob.server.web.model.request.UserSessionVO; import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO; @@ -57,6 +54,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; @@ -87,7 +85,8 @@ public class WorkflowServiceImpl implements WorkflowService { private final AccessTemplate accessTemplate; private final GroupHandler groupHandler; private final JobSummaryMapper jobSummaryMapper; - + @Autowired + private VariableReplacementUtil variableReplacementUtil; private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) { checkExecuteInterval(workflowRequestVO); @@ -298,6 +297,12 @@ public class WorkflowServiceImpl implements WorkflowService { prepareDTO.setOperationReason(JobOperationReasonEnum.MANUAL_TRIGGER.getReason()); String tmpWfContext = triggerVO.getTmpWfContext(); if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){ + //工作流上下文变量更新 + tmpWfContext = variableReplacementUtil.replaceVariablesByGroup( + triggerVO.getTmpWfContext(), + workflow.getGroupName(), + workflow.getNamespaceId() + ); prepareDTO.setWfContext(tmpWfContext); } terminalWorkflowPrepareHandler.handler(prepareDTO);