fix(sj_1.1.0): 支持工作流全局上下文

This commit is contained in:
opensnail 2024-06-17 18:32:26 +08:00
parent e394d65ec0
commit 83c771f74e
17 changed files with 89 additions and 68 deletions

View File

@ -364,6 +364,7 @@ CREATE TABLE `sj_job_task`
`leaf` tinyint NOT NULL DEFAULT '1' COMMENT '叶子节点',
`task_name` varchar(255) NOT NULL DEFAULT '' COMMENT '任务名称',
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
`wf_context` text DEFAULT NULL COMMENT '工作流全局上下文',
`result_message` text NOT NULL COMMENT '执行结果',
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
`args_type` tinyint NOT NULL DEFAULT 1 COMMENT '参数类型 ',
@ -511,8 +512,10 @@ CREATE TABLE `sj_workflow_task_batch`
`task_batch_status` tinyint(4) NOT NULL DEFAULT 0 COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT 0 COMMENT '操作原因',
`flow_info` text DEFAULT NULL COMMENT '流程信息',
`wf_context` text DEFAULT NULL COMMENT '全局上下文',
`execution_at` bigint(13) NOT NULL DEFAULT 0 COMMENT '任务执行时间',
`ext_attrs` varchar(256) NULL DEFAULT '' COMMENT '扩展字段',
`version` int(11) NOT NULL DEFAULT 1 COMMENT '版本号',
`deleted` tinyint(4) NOT NULL DEFAULT 0 COMMENT '逻辑删除 1、删除',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',

View File

@ -70,4 +70,8 @@ public class DispatchJobRequest {
*/
private boolean isRetry;
/**
* 工作流上下文
*/
private String wkContext;
}

View File

@ -41,4 +41,8 @@ public class DispatchJobResultRequest {
*/
private boolean isRetry;
/**
* 工作流上下文
*/
private String wkContext;
}

View File

@ -13,13 +13,8 @@ import lombok.Data;
public class CallbackParamsDTO {
/**
* 执行结果
* 工作流上下文
*/
private String resultMessage;
/**
* 客户端ID
*/
private String clientInfo;
private String wfContext;
}

View File

@ -101,6 +101,12 @@ public class JobTask implements Serializable {
*/
private Integer mrStage;
/**
* 冗余工作流上下文
* : 采用空间换时间的方式冗余部分上下文减少更新并发
*/
private String wfContext;
/**
* 扩展字段
*/

View File

@ -99,4 +99,8 @@ public class RealJobExecutorDTO extends BaseDTO {
*/
private boolean isRetry;
/**
* 工作流上下文
*/
private String wkContext;
}

View File

@ -46,4 +46,9 @@ public class ClientCallbackContext {
private Integer retryScene;
private boolean isRetry;
/**
* 工作流上下文
*/
private String wkContext;
}

View File

@ -34,9 +34,11 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
@ -72,6 +74,7 @@ public class JobExecutorActor extends AbstractActor {
private final TransactionTemplate transactionTemplate;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobTaskBatchHandler jobTaskBatchHandler;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
@Override
public Receive createReceive() {
@ -145,9 +148,20 @@ public class JobExecutorActor extends AbstractActor {
return;
}
// 获取工作流的上下文
WorkflowTaskBatch workflowTaskBatch = null;
Long workflowTaskBatchId = taskExecute.getWorkflowTaskBatchId();
if (Objects.nonNull(workflowTaskBatchId)) {
workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, taskExecute.getWorkflowTaskBatchId())
);
}
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList));
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList, workflowTaskBatch));
} finally {
log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
final int finalTaskStatus = taskStatus;
@ -174,13 +188,17 @@ public class JobExecutorActor extends AbstractActor {
}
@NotNull
private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List<JobTask> taskList) {
private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List<JobTask> taskList,
final WorkflowTaskBatch workflowTaskBatch) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setJobId(job.getId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
if (Objects.nonNull(workflowTaskBatch)) {
context.setWkContext(workflowTaskBatch.getWfContext());
}
return context;
}

View File

@ -55,6 +55,7 @@ public class JobExecutorResultActor extends AbstractActor {
JobTask jobTask = new JobTask();
jobTask.setTaskStatus(result.getTaskStatus());
jobTask.setWfContext(result.getWfContext());
if (Objects.nonNull(result.getResult())) {
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
}

View File

@ -182,7 +182,7 @@ public class WorkflowExecutorActor extends AbstractActor {
context.setEvaluationResult(evaluationResult);
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
context.setWfContext(workflowTaskBatch.getWfContext());
workflowExecutor.execute(context);
evaluationResult = context.getEvaluationResult();

View File

@ -90,4 +90,9 @@ public class JobExecutorContext {
private Long workflowNodeId;
/**
* 工作流上下文
*/
private String wkContext;
}

View File

@ -146,6 +146,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(JobArgsTypeEnum.TEXT.getArgsType());
// TODO 待定是否删除
jobTask.setArgsStr(Optional.ofNullable(context.getTaskResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(context.getJobTaskStatus());
jobTask.setResultMessage(String.valueOf(context.getEvaluationResult()));

View File

@ -87,12 +87,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
// 设置回调超时时间
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getClientInfo)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks);
context.setTaskResult(JsonUtil.toJsonString(callbackParamsList));
CallbackParamsDTO callbackParamsDTO = new CallbackParamsDTO();
callbackParamsDTO.setWfContext(context.getWfContext());
try {
Map<String, String> uriVariables = new HashMap<>();
@ -100,13 +96,13 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
ResponseEntity<String> response = buildRetryer(decisionConfig).call(
() -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables));
new HttpEntity<>(callbackParamsDTO, requestHeaders), String.class, uriVariables));
result = response.getBody();
SnailJobLog.LOCAL.info("回调结果. webHook:[{}],结果: [{}]", decisionConfig.getWebhook(), result);
} catch (Exception e) {
SnailJobLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(),
context.getTaskResult(), e);
context.getWfContext(), e);
context.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR.getReason());
@ -163,7 +159,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
jobLogMetaDTO.setTaskId(jobTask.getId());
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
SnailJobLog.REMOTE.info("节点[{}]回调成功.\n回调参数:{} \n回调结果:[{}] <|>{}<|>",
context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), jobLogMetaDTO);
context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
} else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) {
SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 任务状态已关闭 <|>{}<|>",
context.getWorkflowNodeId(), jobLogMetaDTO);

View File

@ -1,6 +1,5 @@
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
@ -13,23 +12,16 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
import com.aizuda.snailjob.server.common.enums.LogicalConditionEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
@ -76,45 +68,12 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<String> taskResult = Lists.newArrayList();
Boolean tempResult = null;
if (CollUtil.isEmpty(jobTasks)) {
tempResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), StrUtil.EMPTY)).orElse(Boolean.FALSE);
} else {
for (JobTask jobTask : jobTasks) {
taskResult.add(jobTask.getResultMessage());
boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
if (Objects.isNull(tempResult)) {
tempResult = execResult;
}
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
tempResult = tempResult && execResult;
} else {
tempResult = tempResult || execResult;
if (tempResult) {
break;
}
}
log.debug("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
}
}
context.setTaskResult(JsonUtil.toJsonString(taskResult));
result = Optional.ofNullable(tempResult).orElse(Boolean.FALSE);
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), context.getWfContext())).orElse(Boolean.FALSE);
if (!result) {
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
}
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getTaskResult(), e);
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
@ -160,10 +119,10 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()
|| JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() == context.getOperationReason()) {
SnailJobLog.REMOTE.info("节点Id:[{}] 决策完成. 上下文:[{}] 决策结果:[{}] <|>{}<|>",
context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), jobLogMetaDTO);
context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
} else {
SnailJobLog.REMOTE.error("节点Id:[{}] 决策失败. 上下文:[{}] 失败原因:[{}] <|>{}<|>",
context.getWorkflowNodeId(), context.getTaskResult(), context.getLogMessage(), jobLogMetaDTO);
context.getWorkflowNodeId(), context.getWfContext(), context.getLogMessage(), jobLogMetaDTO);
}
}

View File

@ -102,4 +102,9 @@ public class WorkflowExecutorContext {
* 1任务节点 2条件节点 3回调节点
*/
private Integer nodeType;
/**
* 工作流全局上下文
*/
private String wfContext;
}

View File

@ -58,5 +58,10 @@ public class JobTaskBatchGeneratorContext {
*/
private Long parentWorkflowNodeId;
/**
* 工作流上下文
*/
private String wfContext;
}

View File

@ -43,6 +43,8 @@ import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.io.IOException;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -59,7 +61,9 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_C
@Component
@RequiredArgsConstructor
public class WorkflowBatchHandler {
private static final String KEY = "update_wf_context_{}";
private final DistributedLockHandler distributedLockHandler;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
@ -321,12 +325,13 @@ public class WorkflowBatchHandler {
return;
}
// 自旋更新
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(result -> result.equals(Boolean.FALSE))
.retryIfException(ex -> true)
.withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS))
// 重试3秒
.withStopStrategy(StopStrategies.stopAfterAttempt(6))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(final Attempt<V> attempt) {
@ -348,12 +353,18 @@ public class WorkflowBatchHandler {
} catch (Exception e) {
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]",
workflowTaskBatchId, waitMergeContext, e);
if (e.getClass().isAssignableFrom(RetryException.class)) {
// 如果自旋失败就使用悲观锁
distributedLockHandler.lockWithDisposableAndRetry(() -> {
mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class));
}, MessageFormat.format(KEY, workflowTaskBatchId), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
}
}
}
public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getId)
.select(JobTask::getWfContext, JobTask::getId)
.eq(JobTask::getTaskBatchId, taskBatchId));
if (CollUtil.isEmpty(jobTasks)) {
return true;
@ -361,7 +372,7 @@ public class WorkflowBatchHandler {
Set<Map<String, Object>> maps = jobTasks.stream().map(r -> {
try {
return JsonUtil.parseHashMap(r.getResultMessage(), Object.class);
return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
} catch (Exception e) {
SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage());
}
@ -387,7 +398,6 @@ public class WorkflowBatchHandler {
return true;
}
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getVersion)