fix(sj_1.1.0): 支持工作流全局上下文
This commit is contained in:
parent
58efe3db69
commit
dd479f2811
@ -364,6 +364,7 @@ CREATE TABLE `sj_job_task`
|
|||||||
`leaf` tinyint NOT NULL DEFAULT '1' COMMENT '叶子节点',
|
`leaf` tinyint NOT NULL DEFAULT '1' COMMENT '叶子节点',
|
||||||
`task_name` varchar(255) NOT NULL DEFAULT '' COMMENT '任务名称',
|
`task_name` varchar(255) NOT NULL DEFAULT '' COMMENT '任务名称',
|
||||||
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
|
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
|
||||||
|
`wf_context` text DEFAULT NULL COMMENT '工作流全局上下文',
|
||||||
`result_message` text NOT NULL COMMENT '执行结果',
|
`result_message` text NOT NULL COMMENT '执行结果',
|
||||||
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
|
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
|
||||||
`args_type` tinyint NOT NULL DEFAULT 1 COMMENT '参数类型 ',
|
`args_type` tinyint NOT NULL DEFAULT 1 COMMENT '参数类型 ',
|
||||||
@ -511,8 +512,10 @@ CREATE TABLE `sj_workflow_task_batch`
|
|||||||
`task_batch_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '任务批次状态 0、失败 1、成功',
|
`task_batch_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '任务批次状态 0、失败 1、成功',
|
||||||
`operation_reason` tinyint(4) NOT NULL DEFAULT 0 COMMENT '操作原因',
|
`operation_reason` tinyint(4) NOT NULL DEFAULT 0 COMMENT '操作原因',
|
||||||
`flow_info` text DEFAULT NULL COMMENT '流程信息',
|
`flow_info` text DEFAULT NULL COMMENT '流程信息',
|
||||||
|
`wf_context` text DEFAULT NULL COMMENT '全局上下文',
|
||||||
`execution_at` bigint(13) NOT NULL DEFAULT 0 COMMENT '任务执行时间',
|
`execution_at` bigint(13) NOT NULL DEFAULT 0 COMMENT '任务执行时间',
|
||||||
`ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段',
|
`ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段',
|
||||||
|
`version` int(11) NOT NULL DEFAULT 1 COMMENT '版本号',
|
||||||
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除 1、删除',
|
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除 1、删除',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
@ -70,4 +70,8 @@ public class DispatchJobRequest {
|
|||||||
*/
|
*/
|
||||||
private boolean isRetry;
|
private boolean isRetry;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流上下文
|
||||||
|
*/
|
||||||
|
private String wkContext;
|
||||||
}
|
}
|
||||||
|
@ -41,4 +41,8 @@ public class DispatchJobResultRequest {
|
|||||||
*/
|
*/
|
||||||
private boolean isRetry;
|
private boolean isRetry;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流上下文
|
||||||
|
*/
|
||||||
|
private String wkContext;
|
||||||
}
|
}
|
||||||
|
@ -13,13 +13,8 @@ import lombok.Data;
|
|||||||
public class CallbackParamsDTO {
|
public class CallbackParamsDTO {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 执行结果
|
* 工作流上下文
|
||||||
*/
|
*/
|
||||||
private String resultMessage;
|
private String wfContext;
|
||||||
|
|
||||||
/**
|
|
||||||
* 客户端ID
|
|
||||||
*/
|
|
||||||
private String clientInfo;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -98,6 +98,12 @@ public class JobTask extends CreateUpdateDt {
|
|||||||
*/
|
*/
|
||||||
private Integer mrStage;
|
private Integer mrStage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 冗余工作流上下文
|
||||||
|
* 注: 采用空间换时间的方式冗余部分上下文,减少更新并发
|
||||||
|
*/
|
||||||
|
private String wfContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扩展字段
|
* 扩展字段
|
||||||
*/
|
*/
|
||||||
|
@ -99,4 +99,8 @@ public class RealJobExecutorDTO extends BaseDTO {
|
|||||||
*/
|
*/
|
||||||
private boolean isRetry;
|
private boolean isRetry;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流上下文
|
||||||
|
*/
|
||||||
|
private String wkContext;
|
||||||
}
|
}
|
||||||
|
@ -46,4 +46,9 @@ public class ClientCallbackContext {
|
|||||||
private Integer retryScene;
|
private Integer retryScene;
|
||||||
|
|
||||||
private boolean isRetry;
|
private boolean isRetry;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流上下文
|
||||||
|
*/
|
||||||
|
private String wkContext;
|
||||||
}
|
}
|
||||||
|
@ -34,9 +34,11 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask;
|
|||||||
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
|
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
@ -72,6 +74,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
private final TransactionTemplate transactionTemplate;
|
private final TransactionTemplate transactionTemplate;
|
||||||
private final WorkflowBatchHandler workflowBatchHandler;
|
private final WorkflowBatchHandler workflowBatchHandler;
|
||||||
private final JobTaskBatchHandler jobTaskBatchHandler;
|
private final JobTaskBatchHandler jobTaskBatchHandler;
|
||||||
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
@ -145,9 +148,20 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取工作流的上下文
|
||||||
|
WorkflowTaskBatch workflowTaskBatch = null;
|
||||||
|
Long workflowTaskBatchId = taskExecute.getWorkflowTaskBatchId();
|
||||||
|
if (Objects.nonNull(workflowTaskBatchId)) {
|
||||||
|
workflowTaskBatch = workflowTaskBatchMapper.selectOne(
|
||||||
|
new LambdaQueryWrapper<WorkflowTaskBatch>()
|
||||||
|
.select(WorkflowTaskBatch::getWfContext)
|
||||||
|
.eq(WorkflowTaskBatch::getId, taskExecute.getWorkflowTaskBatchId())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// 执行任务
|
// 执行任务
|
||||||
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
|
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
|
||||||
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList));
|
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList, workflowTaskBatch));
|
||||||
} finally {
|
} finally {
|
||||||
log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
|
log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
|
||||||
final int finalTaskStatus = taskStatus;
|
final int finalTaskStatus = taskStatus;
|
||||||
@ -174,13 +188,17 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List<JobTask> taskList) {
|
private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List<JobTask> taskList,
|
||||||
|
final WorkflowTaskBatch workflowTaskBatch) {
|
||||||
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
||||||
context.setTaskList(taskList);
|
context.setTaskList(taskList);
|
||||||
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
context.setJobId(job.getId());
|
context.setJobId(job.getId());
|
||||||
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
||||||
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
|
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
|
||||||
|
if (Objects.nonNull(workflowTaskBatch)) {
|
||||||
|
context.setWkContext(workflowTaskBatch.getWfContext());
|
||||||
|
}
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,6 +55,7 @@ public class JobExecutorResultActor extends AbstractActor {
|
|||||||
|
|
||||||
JobTask jobTask = new JobTask();
|
JobTask jobTask = new JobTask();
|
||||||
jobTask.setTaskStatus(result.getTaskStatus());
|
jobTask.setTaskStatus(result.getTaskStatus());
|
||||||
|
jobTask.setWfContext(result.getWfContext());
|
||||||
if (Objects.nonNull(result.getResult())) {
|
if (Objects.nonNull(result.getResult())) {
|
||||||
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
|
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
context.setEvaluationResult(evaluationResult);
|
context.setEvaluationResult(evaluationResult);
|
||||||
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
|
context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
|
||||||
|
context.setWfContext(workflowTaskBatch.getWfContext());
|
||||||
workflowExecutor.execute(context);
|
workflowExecutor.execute(context);
|
||||||
|
|
||||||
evaluationResult = context.getEvaluationResult();
|
evaluationResult = context.getEvaluationResult();
|
||||||
|
@ -90,4 +90,9 @@ public class JobExecutorContext {
|
|||||||
|
|
||||||
private Long workflowNodeId;
|
private Long workflowNodeId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流上下文
|
||||||
|
*/
|
||||||
|
private String wkContext;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -146,6 +146,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
|
|||||||
jobTask.setClientInfo(StrUtil.EMPTY);
|
jobTask.setClientInfo(StrUtil.EMPTY);
|
||||||
jobTask.setTaskBatchId(jobTaskBatch.getId());
|
jobTask.setTaskBatchId(jobTaskBatch.getId());
|
||||||
jobTask.setArgsType(JobArgsTypeEnum.TEXT.getArgsType());
|
jobTask.setArgsType(JobArgsTypeEnum.TEXT.getArgsType());
|
||||||
|
// TODO 待定是否删除
|
||||||
jobTask.setArgsStr(Optional.ofNullable(context.getTaskResult()).orElse(StrUtil.EMPTY));
|
jobTask.setArgsStr(Optional.ofNullable(context.getTaskResult()).orElse(StrUtil.EMPTY));
|
||||||
jobTask.setTaskStatus(context.getJobTaskStatus());
|
jobTask.setTaskStatus(context.getJobTaskStatus());
|
||||||
jobTask.setResultMessage(String.valueOf(context.getEvaluationResult()));
|
jobTask.setResultMessage(String.valueOf(context.getEvaluationResult()));
|
||||||
|
@ -87,12 +87,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
// 设置回调超时时间
|
// 设置回调超时时间
|
||||||
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
|
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
|
||||||
|
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
CallbackParamsDTO callbackParamsDTO = new CallbackParamsDTO();
|
||||||
.select(JobTask::getResultMessage, JobTask::getClientInfo)
|
callbackParamsDTO.setWfContext(context.getWfContext());
|
||||||
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
|
|
||||||
List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks);
|
|
||||||
|
|
||||||
context.setTaskResult(JsonUtil.toJsonString(callbackParamsList));
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Map<String, String> uriVariables = new HashMap<>();
|
Map<String, String> uriVariables = new HashMap<>();
|
||||||
@ -100,13 +96,13 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
|
|
||||||
ResponseEntity<String> response = buildRetryer(decisionConfig).call(
|
ResponseEntity<String> response = buildRetryer(decisionConfig).call(
|
||||||
() -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
|
() -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
|
||||||
new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables));
|
new HttpEntity<>(callbackParamsDTO, requestHeaders), String.class, uriVariables));
|
||||||
|
|
||||||
result = response.getBody();
|
result = response.getBody();
|
||||||
SnailJobLog.LOCAL.info("回调结果. webHook:[{}],结果: [{}]", decisionConfig.getWebhook(), result);
|
SnailJobLog.LOCAL.info("回调结果. webHook:[{}],结果: [{}]", decisionConfig.getWebhook(), result);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(),
|
SnailJobLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(),
|
||||||
context.getTaskResult(), e);
|
context.getWfContext(), e);
|
||||||
|
|
||||||
context.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
|
context.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
|
||||||
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR.getReason());
|
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR.getReason());
|
||||||
@ -163,7 +159,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
jobLogMetaDTO.setTaskId(jobTask.getId());
|
jobLogMetaDTO.setTaskId(jobTask.getId());
|
||||||
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
|
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
|
||||||
SnailJobLog.REMOTE.info("节点[{}]回调成功.\n回调参数:{} \n回调结果:[{}] <|>{}<|>",
|
SnailJobLog.REMOTE.info("节点[{}]回调成功.\n回调参数:{} \n回调结果:[{}] <|>{}<|>",
|
||||||
context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), jobLogMetaDTO);
|
context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
|
||||||
} else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) {
|
} else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) {
|
||||||
SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 任务状态已关闭 <|>{}<|>",
|
SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 任务状态已关闭 <|>{}<|>",
|
||||||
context.getWorkflowNodeId(), jobLogMetaDTO);
|
context.getWorkflowNodeId(), jobLogMetaDTO);
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
|
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollUtil;
|
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||||
@ -13,23 +12,16 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
|
|||||||
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
|
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
|
||||||
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
||||||
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
|
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
|
||||||
import com.aizuda.snailjob.server.common.enums.LogicalConditionEnum;
|
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
|
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
|
||||||
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
|
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
|
||||||
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
|
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -76,45 +68,12 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在"));
|
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在"));
|
||||||
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
|
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
|
||||||
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
|
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
|
||||||
|
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), context.getWfContext())).orElse(Boolean.FALSE);
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
|
||||||
.select(JobTask::getResultMessage)
|
|
||||||
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
|
|
||||||
|
|
||||||
|
|
||||||
List<String> taskResult = Lists.newArrayList();
|
|
||||||
Boolean tempResult = null;
|
|
||||||
if (CollUtil.isEmpty(jobTasks)) {
|
|
||||||
tempResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), StrUtil.EMPTY)).orElse(Boolean.FALSE);
|
|
||||||
} else {
|
|
||||||
for (JobTask jobTask : jobTasks) {
|
|
||||||
taskResult.add(jobTask.getResultMessage());
|
|
||||||
boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
|
|
||||||
|
|
||||||
if (Objects.isNull(tempResult)) {
|
|
||||||
tempResult = execResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
|
|
||||||
tempResult = tempResult && execResult;
|
|
||||||
} else {
|
|
||||||
tempResult = tempResult || execResult;
|
|
||||||
if (tempResult) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
context.setTaskResult(JsonUtil.toJsonString(taskResult));
|
|
||||||
result = Optional.ofNullable(tempResult).orElse(Boolean.FALSE);
|
|
||||||
if (!result) {
|
if (!result) {
|
||||||
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
|
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getTaskResult(), e);
|
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e);
|
||||||
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
|
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
|
||||||
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
|
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
|
||||||
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
|
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
|
||||||
@ -160,10 +119,10 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()
|
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()
|
||||||
|| JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() == context.getOperationReason()) {
|
|| JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() == context.getOperationReason()) {
|
||||||
SnailJobLog.REMOTE.info("节点Id:[{}] 决策完成. 上下文:[{}] 决策结果:[{}] <|>{}<|>",
|
SnailJobLog.REMOTE.info("节点Id:[{}] 决策完成. 上下文:[{}] 决策结果:[{}] <|>{}<|>",
|
||||||
context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), jobLogMetaDTO);
|
context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
|
||||||
} else {
|
} else {
|
||||||
SnailJobLog.REMOTE.error("节点Id:[{}] 决策失败. 上下文:[{}] 失败原因:[{}] <|>{}<|>",
|
SnailJobLog.REMOTE.error("节点Id:[{}] 决策失败. 上下文:[{}] 失败原因:[{}] <|>{}<|>",
|
||||||
context.getWorkflowNodeId(), context.getTaskResult(), context.getLogMessage(), jobLogMetaDTO);
|
context.getWorkflowNodeId(), context.getWfContext(), context.getLogMessage(), jobLogMetaDTO);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,4 +102,9 @@ public class WorkflowExecutorContext {
|
|||||||
* 1、任务节点 2、条件节点 3、回调节点
|
* 1、任务节点 2、条件节点 3、回调节点
|
||||||
*/
|
*/
|
||||||
private Integer nodeType;
|
private Integer nodeType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流全局上下文
|
||||||
|
*/
|
||||||
|
private String wfContext;
|
||||||
}
|
}
|
||||||
|
@ -58,5 +58,10 @@ public class JobTaskBatchGeneratorContext {
|
|||||||
*/
|
*/
|
||||||
private Long parentWorkflowNodeId;
|
private Long parentWorkflowNodeId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流上下文
|
||||||
|
*/
|
||||||
|
private String wfContext;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,8 @@ import org.springframework.transaction.support.TransactionSynchronization;
|
|||||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.text.MessageFormat;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -59,7 +61,9 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_C
|
|||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class WorkflowBatchHandler {
|
public class WorkflowBatchHandler {
|
||||||
|
private static final String KEY = "update_wf_context_{}";
|
||||||
|
|
||||||
|
private final DistributedLockHandler distributedLockHandler;
|
||||||
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
private final JobMapper jobMapper;
|
private final JobMapper jobMapper;
|
||||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||||
@ -321,12 +325,13 @@ public class WorkflowBatchHandler {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 自旋更新
|
||||||
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
|
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
|
||||||
.retryIfResult(result -> result.equals(Boolean.FALSE))
|
.retryIfResult(result -> result.equals(Boolean.FALSE))
|
||||||
.retryIfException(ex -> true)
|
.retryIfException(ex -> true)
|
||||||
.withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS))
|
.withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS))
|
||||||
// 重试3秒
|
// 重试3秒
|
||||||
.withStopStrategy(StopStrategies.stopAfterAttempt(6))
|
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
|
||||||
.withRetryListener(new RetryListener() {
|
.withRetryListener(new RetryListener() {
|
||||||
@Override
|
@Override
|
||||||
public <V> void onRetry(final Attempt<V> attempt) {
|
public <V> void onRetry(final Attempt<V> attempt) {
|
||||||
@ -348,12 +353,18 @@ public class WorkflowBatchHandler {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]",
|
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]",
|
||||||
workflowTaskBatchId, waitMergeContext, e);
|
workflowTaskBatchId, waitMergeContext, e);
|
||||||
|
if (e.getClass().isAssignableFrom(RetryException.class)) {
|
||||||
|
// 如果自旋失败,就使用悲观锁
|
||||||
|
distributedLockHandler.lockWithDisposableAndRetry(() -> {
|
||||||
|
mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class));
|
||||||
|
}, MessageFormat.format(KEY, workflowTaskBatchId), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) {
|
public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) {
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||||
.select(JobTask::getResultMessage, JobTask::getId)
|
.select(JobTask::getWfContext, JobTask::getId)
|
||||||
.eq(JobTask::getTaskBatchId, taskBatchId));
|
.eq(JobTask::getTaskBatchId, taskBatchId));
|
||||||
if (CollUtil.isEmpty(jobTasks)) {
|
if (CollUtil.isEmpty(jobTasks)) {
|
||||||
return true;
|
return true;
|
||||||
@ -361,7 +372,7 @@ public class WorkflowBatchHandler {
|
|||||||
|
|
||||||
Set<Map<String, Object>> maps = jobTasks.stream().map(r -> {
|
Set<Map<String, Object>> maps = jobTasks.stream().map(r -> {
|
||||||
try {
|
try {
|
||||||
return JsonUtil.parseHashMap(r.getResultMessage(), Object.class);
|
return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage());
|
SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage());
|
||||||
}
|
}
|
||||||
@ -387,7 +398,6 @@ public class WorkflowBatchHandler {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
|
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
|
||||||
new LambdaQueryWrapper<WorkflowTaskBatch>()
|
new LambdaQueryWrapper<WorkflowTaskBatch>()
|
||||||
.select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getVersion)
|
.select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getVersion)
|
||||||
|
Loading…
Reference in New Issue
Block a user