From 83c771f74ec6c17efe1b0a54e6b990534549e008 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Mon, 17 Jun 2024 18:32:26 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0):=20=E6=94=AF=E6=8C=81=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E5=85=A8=E5=B1=80=E4=B8=8A=E4=B8=8B=E6=96=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/snail_job_mysql.sql | 3 ++ .../model/request/DispatchJobRequest.java | 4 ++ .../request/DispatchJobResultRequest.java | 4 ++ .../server/model/dto/CallbackParamsDTO.java | 9 +--- .../datasource/persistence/po/JobTask.java | 6 +++ .../job/task/dto/RealJobExecutorDTO.java | 4 ++ .../callback/ClientCallbackContext.java | 5 ++ .../support/dispatch/JobExecutorActor.java | 22 ++++++++- .../dispatch/JobExecutorResultActor.java | 1 + .../dispatch/WorkflowExecutorActor.java | 2 +- .../executor/job/JobExecutorContext.java | 5 ++ .../workflow/AbstractWorkflowExecutor.java | 1 + .../workflow/CallbackWorkflowExecutor.java | 14 ++---- .../workflow/DecisionWorkflowExecutor.java | 49 ++----------------- .../workflow/WorkflowExecutorContext.java | 5 ++ .../batch/JobTaskBatchGeneratorContext.java | 5 ++ .../support/handler/WorkflowBatchHandler.java | 18 +++++-- 17 files changed, 89 insertions(+), 68 deletions(-) diff --git a/doc/sql/snail_job_mysql.sql b/doc/sql/snail_job_mysql.sql index af1af5911..940156313 100644 --- a/doc/sql/snail_job_mysql.sql +++ b/doc/sql/snail_job_mysql.sql @@ -364,6 +364,7 @@ CREATE TABLE `sj_job_task` `leaf` tinyint NOT NULL DEFAULT '1' COMMENT '叶子节点', `task_name` varchar(255) NOT NULL DEFAULT '' COMMENT '任务名称', `client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port', + `wf_context` text DEFAULT NULL COMMENT '工作流全局上下文', `result_message` text NOT NULL COMMENT '执行结果', `args_str` text DEFAULT NULL COMMENT '执行方法参数', `args_type` tinyint NOT NULL DEFAULT 1 COMMENT '参数类型 ', @@ -511,8 +512,10 @@ CREATE TABLE `sj_workflow_task_batch` `task_batch_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '任务批次状态 0、失败 1、成功', `operation_reason` tinyint(4) NOT NULL DEFAULT 0 COMMENT '操作原因', `flow_info` text DEFAULT NULL COMMENT '流程信息', + `wf_context` text DEFAULT NULL COMMENT '全局上下文', `execution_at` bigint(13) NOT NULL DEFAULT 0 COMMENT '任务执行时间', `ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段', + `version` int(11) NOT NULL DEFAULT 1 COMMENT '版本号', `deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除 1、删除', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java index b6ccf2d41..48cccb939 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java @@ -70,4 +70,8 @@ public class DispatchJobRequest { */ private boolean isRetry; + /** + * 工作流上下文 + */ + private String wkContext; } diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java index 61c35fc13..f3d723de8 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java @@ -41,4 +41,8 @@ public class DispatchJobResultRequest { */ private boolean isRetry; + /** + * 工作流上下文 + */ + private String wkContext; } diff --git a/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/CallbackParamsDTO.java b/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/CallbackParamsDTO.java index 562ee12df..060407f0d 100644 --- a/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/CallbackParamsDTO.java +++ b/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/CallbackParamsDTO.java @@ -13,13 +13,8 @@ import lombok.Data; public class CallbackParamsDTO { /** - * 执行结果 + * 工作流上下文 */ - private String resultMessage; - - /** - * 客户端ID - */ - private String clientInfo; + private String wfContext; } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java index 00891a6c8..358f68fb9 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java @@ -101,6 +101,12 @@ public class JobTask implements Serializable { */ private Integer mrStage; + /** + * 冗余工作流上下文 + * 注: 采用空间换时间的方式冗余部分上下文,减少更新并发 + */ + private String wfContext; + /** * 扩展字段 */ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java index a7e865908..fa1a47f1d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java @@ -99,4 +99,8 @@ public class RealJobExecutorDTO extends BaseDTO { */ private boolean isRetry; + /** + * 工作流上下文 + */ + private String wkContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java index e2a5b4bcf..ed3f4f7ec 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java @@ -46,4 +46,9 @@ public class ClientCallbackContext { private Integer retryScene; private boolean isRetry; + + /** + * 工作流上下文 + */ + private String wkContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index d49a1873e..3d84440b8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -34,9 +34,11 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; @@ -72,6 +74,7 @@ public class JobExecutorActor extends AbstractActor { private final TransactionTemplate transactionTemplate; private final WorkflowBatchHandler workflowBatchHandler; private final JobTaskBatchHandler jobTaskBatchHandler; + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; @Override public Receive createReceive() { @@ -145,9 +148,20 @@ public class JobExecutorActor extends AbstractActor { return; } + // 获取工作流的上下文 + WorkflowTaskBatch workflowTaskBatch = null; + Long workflowTaskBatchId = taskExecute.getWorkflowTaskBatchId(); + if (Objects.nonNull(workflowTaskBatchId)) { + workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext) + .eq(WorkflowTaskBatch::getId, taskExecute.getWorkflowTaskBatchId()) + ); + } + // 执行任务 JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); - jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList)); + jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList, workflowTaskBatch)); } finally { log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute)); final int finalTaskStatus = taskStatus; @@ -174,13 +188,17 @@ public class JobExecutorActor extends AbstractActor { } @NotNull - private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List taskList) { + private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List taskList, + final WorkflowTaskBatch workflowTaskBatch) { JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); context.setTaskList(taskList); context.setTaskBatchId(taskExecute.getTaskBatchId()); context.setJobId(job.getId()); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); + if (Objects.nonNull(workflowTaskBatch)) { + context.setWkContext(workflowTaskBatch.getWfContext()); + } return context; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index db8146270..483fe7269 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -55,6 +55,7 @@ public class JobExecutorResultActor extends AbstractActor { JobTask jobTask = new JobTask(); jobTask.setTaskStatus(result.getTaskStatus()); + jobTask.setWfContext(result.getWfContext()); if (Objects.nonNull(result.getResult())) { jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 596992b81..c316a2020 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -182,7 +182,7 @@ public class WorkflowExecutorActor extends AbstractActor { context.setEvaluationResult(evaluationResult); context.setTaskBatchId(taskExecute.getTaskBatchId()); context.setTaskExecutorScene(taskExecute.getTaskExecutorScene()); - + context.setWfContext(workflowTaskBatch.getWfContext()); workflowExecutor.execute(context); evaluationResult = context.getEvaluationResult(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java index dc0b01336..8a1e67345 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java @@ -90,4 +90,9 @@ public class JobExecutorContext { private Long workflowNodeId; + /** + * 工作流上下文 + */ + private String wkContext; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index 6c26f76b6..06a23f81d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -146,6 +146,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init jobTask.setClientInfo(StrUtil.EMPTY); jobTask.setTaskBatchId(jobTaskBatch.getId()); jobTask.setArgsType(JobArgsTypeEnum.TEXT.getArgsType()); + // TODO 待定是否删除 jobTask.setArgsStr(Optional.ofNullable(context.getTaskResult()).orElse(StrUtil.EMPTY)); jobTask.setTaskStatus(context.getJobTaskStatus()); jobTask.setResultMessage(String.valueOf(context.getEvaluationResult())); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index 8b38cac50..7d9a96455 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -87,12 +87,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { // 设置回调超时时间 requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT); - List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage, JobTask::getClientInfo) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); - List callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks); - - context.setTaskResult(JsonUtil.toJsonString(callbackParamsList)); + CallbackParamsDTO callbackParamsDTO = new CallbackParamsDTO(); + callbackParamsDTO.setWfContext(context.getWfContext()); try { Map uriVariables = new HashMap<>(); @@ -100,13 +96,13 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { ResponseEntity response = buildRetryer(decisionConfig).call( () -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST, - new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables)); + new HttpEntity<>(callbackParamsDTO, requestHeaders), String.class, uriVariables)); result = response.getBody(); SnailJobLog.LOCAL.info("回调结果. webHook:[{}],结果: [{}]", decisionConfig.getWebhook(), result); } catch (Exception e) { SnailJobLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), - context.getTaskResult(), e); + context.getWfContext(), e); context.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus()); context.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR.getReason()); @@ -163,7 +159,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { jobLogMetaDTO.setTaskId(jobTask.getId()); if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) { SnailJobLog.REMOTE.info("节点[{}]回调成功.\n回调参数:{} \n回调结果:[{}] <|>{}<|>", - context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), jobLogMetaDTO); + context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO); } else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) { SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 任务状态已关闭 <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java index 035d64b97..61b478fc5 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java @@ -1,6 +1,5 @@ package com.aizuda.snailjob.server.job.task.support.executor.workflow; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants; @@ -13,23 +12,16 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum; -import com.aizuda.snailjob.server.common.enums.LogicalConditionEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler; -import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.Objects; import java.util.Optional; /** @@ -76,45 +68,12 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在")); ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine); ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler); - - List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); - - - List taskResult = Lists.newArrayList(); - Boolean tempResult = null; - if (CollUtil.isEmpty(jobTasks)) { - tempResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), StrUtil.EMPTY)).orElse(Boolean.FALSE); - } else { - for (JobTask jobTask : jobTasks) { - taskResult.add(jobTask.getResultMessage()); - boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE); - - if (Objects.isNull(tempResult)) { - tempResult = execResult; - } - - if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) { - tempResult = tempResult && execResult; - } else { - tempResult = tempResult || execResult; - if (tempResult) { - break; - } - } - - log.debug("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result); - } - } - - context.setTaskResult(JsonUtil.toJsonString(taskResult)); - result = Optional.ofNullable(tempResult).orElse(Boolean.FALSE); + result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), context.getWfContext())).orElse(Boolean.FALSE); if (!result) { operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason(); } } catch (Exception e) { - log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getTaskResult(), e); + log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e); taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason(); jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); @@ -160,10 +119,10 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus() || JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() == context.getOperationReason()) { SnailJobLog.REMOTE.info("节点Id:[{}] 决策完成. 上下文:[{}] 决策结果:[{}] <|>{}<|>", - context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), jobLogMetaDTO); + context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO); } else { SnailJobLog.REMOTE.error("节点Id:[{}] 决策失败. 上下文:[{}] 失败原因:[{}] <|>{}<|>", - context.getWorkflowNodeId(), context.getTaskResult(), context.getLogMessage(), jobLogMetaDTO); + context.getWorkflowNodeId(), context.getWfContext(), context.getLogMessage(), jobLogMetaDTO); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java index fb1abc51e..bd3491c0a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java @@ -102,4 +102,9 @@ public class WorkflowExecutorContext { * 1、任务节点 2、条件节点 3、回调节点 */ private Integer nodeType; + + /** + * 工作流全局上下文 + */ + private String wfContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java index a9055d762..099b36bf3 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java @@ -58,5 +58,10 @@ public class JobTaskBatchGeneratorContext { */ private Long parentWorkflowNodeId; + /** + * 工作流上下文 + */ + private String wfContext; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index 8b9699b96..85d781b86 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -43,6 +43,8 @@ import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import java.io.IOException; +import java.text.MessageFormat; +import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -59,7 +61,9 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_C @Component @RequiredArgsConstructor public class WorkflowBatchHandler { + private static final String KEY = "update_wf_context_{}"; + private final DistributedLockHandler distributedLockHandler; private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final JobMapper jobMapper; private final JobTaskBatchMapper jobTaskBatchMapper; @@ -321,12 +325,13 @@ public class WorkflowBatchHandler { return; } + // 自旋更新 Retryer retryer = RetryerBuilder.newBuilder() .retryIfResult(result -> result.equals(Boolean.FALSE)) .retryIfException(ex -> true) .withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS)) // 重试3秒 - .withStopStrategy(StopStrategies.stopAfterAttempt(6)) + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) .withRetryListener(new RetryListener() { @Override public void onRetry(final Attempt attempt) { @@ -348,12 +353,18 @@ public class WorkflowBatchHandler { } catch (Exception e) { SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]", workflowTaskBatchId, waitMergeContext, e); + if (e.getClass().isAssignableFrom(RetryException.class)) { + // 如果自旋失败,就使用悲观锁 + distributedLockHandler.lockWithDisposableAndRetry(() -> { + mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class)); + }, MessageFormat.format(KEY, workflowTaskBatchId), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); + } } } public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) { List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage, JobTask::getId) + .select(JobTask::getWfContext, JobTask::getId) .eq(JobTask::getTaskBatchId, taskBatchId)); if (CollUtil.isEmpty(jobTasks)) { return true; @@ -361,7 +372,7 @@ public class WorkflowBatchHandler { Set> maps = jobTasks.stream().map(r -> { try { - return JsonUtil.parseHashMap(r.getResultMessage(), Object.class); + return JsonUtil.parseHashMap(r.getWfContext(), Object.class); } catch (Exception e) { SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage()); } @@ -387,7 +398,6 @@ public class WorkflowBatchHandler { return true; } - WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( new LambdaQueryWrapper() .select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getVersion)