feat(sj_1.1.0): 工作流全局上下问题

This commit is contained in:
opensnail 2024-06-16 23:54:53 +08:00
parent 8d6213d165
commit a7029da547
6 changed files with 143 additions and 2 deletions

View File

@ -1,10 +1,15 @@
package com.aizuda.snailjob.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 工作流批次
*
@ -66,4 +71,15 @@ public class WorkflowTaskBatch extends CreateUpdateDt {
*/
private Integer deleted;
/**
* 版本号
*/
@TableField(value = "version", update = "%s+1")
private Integer version;
/**
* 全局上下文
*/
private String wfContext;
}

View File

@ -42,4 +42,6 @@ public class JobExecutorResultDTO {
private Integer isLeaf;
private String wfContext;
}

View File

@ -14,14 +14,13 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -42,6 +41,7 @@ public class JobExecutorResultActor extends AbstractActor {
private static final String KEY = "job_complete_{0}_{1}";
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchHandler jobTaskBatchHandler;
private final WorkflowBatchHandler workflowBatchHandler;
private final DistributedLockHandler distributedLockHandler;
@Override
@ -63,6 +63,9 @@ public class JobExecutorResultActor extends AbstractActor {
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
() -> new SnailJobServerException("更新任务实例失败"));
// 更新工作流的全局上下文 如果并发更新失败则需要自旋重试更新
// workflowBatchHandler.mergeWorkflowContextAndRetry(result.getWorkflowTaskBatchId(), result.getWfContext());
// 除MAP和MAP_REDUCE 任务之外其他任务都是叶子节点
if (Objects.nonNull(result.getIsLeaf()) && StatusEnum.NO.getStatus().equals(result.getIsLeaf())) {
return;

View File

@ -89,6 +89,9 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return;
}
// 合并job task的结果到全局上下文中
workflowBatchHandler.mergeAllWorkflowContext(context.getWorkflowTaskBatchId(), context.getTaskBatchId());
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {

View File

@ -17,6 +17,8 @@ import com.aizuda.snailjob.server.common.enums.LogicalConditionEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
@ -41,6 +43,7 @@ import java.util.Optional;
public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskMapper jobTaskMapper;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.DECISION;
@ -77,6 +80,8 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<String> taskResult = Lists.newArrayList();
Boolean tempResult = null;
if (CollUtil.isEmpty(jobTasks)) {

View File

@ -3,11 +3,13 @@ package com.aizuda.snailjob.server.job.task.support.handler;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
@ -24,12 +26,16 @@ import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.rholder.retry.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@ -38,6 +44,9 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
@ -54,6 +63,7 @@ public class WorkflowBatchHandler {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobTaskMapper jobTaskMapper;
private static boolean checkLeafCompleted(MutableGraph<Long> graph, Map<Long,
List<JobTaskBatch>> currentWorkflowNodeMap, Set<Long> parentIds) {
@ -305,4 +315,106 @@ public class WorkflowBatchHandler {
SnailJobLog.LOCAL.error("任务调度执行失败", e);
}
}
public void mergeWorkflowContextAndRetry(Long workflowTaskBatchId, String waitMergeContext) {
if (StrUtil.isBlank(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) {
return;
}
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(result -> result.equals(Boolean.FALSE))
.retryIfException(ex -> true)
.withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS))
// 重试3秒
.withStopStrategy(StopStrategies.stopAfterAttempt(6))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(final Attempt<V> attempt) {
Object result = null;
if (attempt.hasResult()) {
try {
result = attempt.get();
} catch (ExecutionException ignored) {
}
}
SnailJobLog.LOCAL.info("第【{}】次尝试更新上下文. result:[{}] treadName:[{}]",
attempt.getAttemptNumber(), result, Thread.currentThread().getName());
}
}).build();
try {
retryer.call(() -> mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class)));
} catch (Exception e) {
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]",
workflowTaskBatchId, waitMergeContext, e);
}
}
public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getId)
.eq(JobTask::getTaskBatchId, taskBatchId));
if (CollUtil.isEmpty(jobTasks)) {
return true;
}
Set<Map<String, Object>> maps = jobTasks.stream().map(r -> {
try {
return JsonUtil.parseHashMap(r.getResultMessage(), Object.class);
} catch (Exception e) {
SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage());
}
return new HashMap<String, Object>();
}).collect(Collectors.toSet());
Map<String, Object> mergeMap = Maps.newHashMap();
for (Map<String, Object> map : maps) {
mergeMaps(mergeMap, map);
}
return mergeWorkflowContext(workflowTaskBatchId, mergeMap);
}
/**
* 合并客户端上报的上下问题信息
*
* @param workflowTaskBatchId 工作流批次
* @param waitMergeContext 待合并的上下文
*/
public boolean mergeWorkflowContext(Long workflowTaskBatchId, Map<String, Object> waitMergeContext) {
if (CollUtil.isEmpty(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) {
return true;
}
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getVersion)
.eq(WorkflowTaskBatch::getId, workflowTaskBatchId)
);
if (Objects.isNull(workflowTaskBatch)) {
return true;
}
String wfContext = workflowTaskBatch.getWfContext();
if (StrUtil.isNotBlank(wfContext)) {
mergeMaps(waitMergeContext, JsonUtil.parseHashMap(wfContext));
}
int version = workflowTaskBatch.getVersion();
workflowTaskBatch.setWfContext(JsonUtil.toJsonString(waitMergeContext));
workflowTaskBatch.setVersion(null);
return 1 == workflowTaskBatchMapper.update(workflowTaskBatch, new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getId, workflowTaskBatchId)
.eq(WorkflowTaskBatch::getVersion, version)
);
}
public static void mergeMaps(Map<String, Object> mainMap, Map<String, Object> waitMergeMap) {
for (Map.Entry<String, Object> entry : waitMergeMap.entrySet()) {
mainMap.merge(entry.getKey(), entry.getValue(), (v1, v2) -> v2);
}
}
}