From a7029da547d312f0263619696d2da5d6d8230426 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sun, 16 Jun 2024 23:54:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.1.0):=20=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E5=85=A8=E5=B1=80=E4=B8=8A=E4=B8=8B=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../persistence/po/WorkflowTaskBatch.java | 16 +++ .../job/task/dto/JobExecutorResultDTO.java | 2 + .../dispatch/JobExecutorResultActor.java | 7 +- .../workflow/AbstractWorkflowExecutor.java | 3 + .../workflow/DecisionWorkflowExecutor.java | 5 + .../support/handler/WorkflowBatchHandler.java | 112 ++++++++++++++++++ 6 files changed, 143 insertions(+), 2 deletions(-) diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/WorkflowTaskBatch.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/WorkflowTaskBatch.java index 9399b4ffe..52be09b80 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/WorkflowTaskBatch.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/WorkflowTaskBatch.java @@ -1,10 +1,15 @@ package com.aizuda.snailjob.template.datasource.persistence.po; import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; +import java.io.Serial; +import java.io.Serializable; +import java.time.LocalDateTime; + /** * 工作流批次 * @@ -66,4 +71,15 @@ public class WorkflowTaskBatch extends CreateUpdateDt { */ private Integer deleted; + /** + * 版本号 + */ + @TableField(value = "version", update = "%s+1") + private Integer version; + + /** + * 全局上下文 + */ + private String wfContext; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java index b0951098a..41b8b8037 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java @@ -42,4 +42,6 @@ public class JobExecutorResultDTO { private Integer isLeaf; + private String wfContext; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index ba16a26f4..db8146270 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -14,14 +14,13 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler; import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -42,6 +41,7 @@ public class JobExecutorResultActor extends AbstractActor { private static final String KEY = "job_complete_{0}_{1}"; private final JobTaskMapper jobTaskMapper; private final JobTaskBatchHandler jobTaskBatchHandler; + private final WorkflowBatchHandler workflowBatchHandler; private final DistributedLockHandler distributedLockHandler; @Override @@ -63,6 +63,9 @@ public class JobExecutorResultActor extends AbstractActor { new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), () -> new SnailJobServerException("更新任务实例失败")); + // 更新工作流的全局上下文 如果并发更新失败则需要自旋重试更新 +// workflowBatchHandler.mergeWorkflowContextAndRetry(result.getWorkflowTaskBatchId(), result.getWfContext()); + // 除MAP和MAP_REDUCE 任务之外,其他任务都是叶子节点 if (Objects.nonNull(result.getIsLeaf()) && StatusEnum.NO.getStatus().equals(result.getIsLeaf())) { return; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index badd404cd..6c26f76b6 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -89,6 +89,9 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init return; } + // 合并job task的结果到全局上下文中 + workflowBatchHandler.mergeAllWorkflowContext(context.getWorkflowTaskBatchId(), context.getTaskBatchId()); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java index 65a629746..035d64b97 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java @@ -17,6 +17,8 @@ import com.aizuda.snailjob.server.common.enums.LogicalConditionEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler; +import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; @@ -41,6 +43,7 @@ import java.util.Optional; public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { private final JobTaskMapper jobTaskMapper; + @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { return WorkflowNodeTypeEnum.DECISION; @@ -77,6 +80,8 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() .select(JobTask::getResultMessage) .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); + + List taskResult = Lists.newArrayList(); Boolean tempResult = null; if (CollUtil.isEmpty(jobTasks)) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index 7efd3f678..8b9699b96 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -3,11 +3,13 @@ package com.aizuda.snailjob.server.job.task.support.handler; import akka.actor.ActorRef; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; @@ -24,12 +26,16 @@ import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.github.rholder.retry.*; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.graph.MutableGraph; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -38,6 +44,9 @@ import org.springframework.transaction.support.TransactionSynchronizationManager import java.io.IOException; import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE; @@ -54,6 +63,7 @@ public class WorkflowBatchHandler { private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final JobMapper jobMapper; private final JobTaskBatchMapper jobTaskBatchMapper; + private final JobTaskMapper jobTaskMapper; private static boolean checkLeafCompleted(MutableGraph graph, Map> currentWorkflowNodeMap, Set parentIds) { @@ -305,4 +315,106 @@ public class WorkflowBatchHandler { SnailJobLog.LOCAL.error("任务调度执行失败", e); } } + + public void mergeWorkflowContextAndRetry(Long workflowTaskBatchId, String waitMergeContext) { + if (StrUtil.isBlank(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) { + return; + } + + Retryer retryer = RetryerBuilder.newBuilder() + .retryIfResult(result -> result.equals(Boolean.FALSE)) + .retryIfException(ex -> true) + .withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS)) + // 重试3秒 + .withStopStrategy(StopStrategies.stopAfterAttempt(6)) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(final Attempt attempt) { + Object result = null; + if (attempt.hasResult()) { + try { + result = attempt.get(); + } catch (ExecutionException ignored) { + } + } + + SnailJobLog.LOCAL.info("第【{}】次尝试更新上下文. result:[{}] treadName:[{}]", + attempt.getAttemptNumber(), result, Thread.currentThread().getName()); + } + }).build(); + + try { + retryer.call(() -> mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class))); + } catch (Exception e) { + SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]", + workflowTaskBatchId, waitMergeContext, e); + } + } + + public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) { + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() + .select(JobTask::getResultMessage, JobTask::getId) + .eq(JobTask::getTaskBatchId, taskBatchId)); + if (CollUtil.isEmpty(jobTasks)) { + return true; + } + + Set> maps = jobTasks.stream().map(r -> { + try { + return JsonUtil.parseHashMap(r.getResultMessage(), Object.class); + } catch (Exception e) { + SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage()); + } + return new HashMap(); + }).collect(Collectors.toSet()); + + Map mergeMap = Maps.newHashMap(); + for (Map map : maps) { + mergeMaps(mergeMap, map); + } + + return mergeWorkflowContext(workflowTaskBatchId, mergeMap); + } + + /** + * 合并客户端上报的上下问题信息 + * + * @param workflowTaskBatchId 工作流批次 + * @param waitMergeContext 待合并的上下文 + */ + public boolean mergeWorkflowContext(Long workflowTaskBatchId, Map waitMergeContext) { + if (CollUtil.isEmpty(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) { + return true; + } + + + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getVersion) + .eq(WorkflowTaskBatch::getId, workflowTaskBatchId) + ); + + if (Objects.isNull(workflowTaskBatch)) { + return true; + } + + String wfContext = workflowTaskBatch.getWfContext(); + if (StrUtil.isNotBlank(wfContext)) { + mergeMaps(waitMergeContext, JsonUtil.parseHashMap(wfContext)); + } + + int version = workflowTaskBatch.getVersion(); + workflowTaskBatch.setWfContext(JsonUtil.toJsonString(waitMergeContext)); + workflowTaskBatch.setVersion(null); + return 1 == workflowTaskBatchMapper.update(workflowTaskBatch, new LambdaQueryWrapper() + .eq(WorkflowTaskBatch::getId, workflowTaskBatchId) + .eq(WorkflowTaskBatch::getVersion, version) + ); + } + + public static void mergeMaps(Map mainMap, Map waitMergeMap) { + for (Map.Entry entry : waitMergeMap.entrySet()) { + mainMap.merge(entry.getKey(), entry.getValue(), (v1, v2) -> v2); + } + } }