From aeaa73776f8810dff3a688b549735491419254c9 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Tue, 18 Jun 2024 18:38:01 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0):=20=E4=BC=98=E5=8C=96=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E5=85=A8=E5=B1=80=E4=B8=8A=E4=B8=8B=E6=96=87?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/job/core/client/JobEndPoint.java | 16 +++++ .../snailjob/client/job/core/dto/JobArgs.java | 15 +++++ .../core/executor/AbstractJobExecutor.java | 5 ++ .../executor/JobExecutorFutureCallback.java | 8 +++ .../model/request/DispatchJobRequest.java | 2 +- .../request/DispatchJobResultRequest.java | 2 +- .../common/core/model/JobContext.java | 11 ++++ .../snailjob/common/core/util/JsonUtil.java | 11 ++++ .../job/task/dto/RealJobExecutorDTO.java | 2 +- .../job/task/support/JobTaskConverter.java | 3 +- .../callback/ClientCallbackContext.java | 2 +- .../support/dispatch/JobExecutorActor.java | 2 +- .../dispatch/JobExecutorResultActor.java | 1 - .../dispatch/WorkflowExecutorActor.java | 6 +- .../executor/job/JobExecutorContext.java | 2 +- .../workflow/AbstractWorkflowExecutor.java | 8 ++- .../support/handler/JobTaskBatchHandler.java | 61 +++++++++++-------- .../support/handler/WorkflowBatchHandler.java | 54 +++++++++++----- 18 files changed, 161 insertions(+), 50 deletions(-) diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 3025ab44b..84c892455 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.client.job.core.client; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.client.common.annotation.Mapping; import com.aizuda.snailjob.client.common.annotation.SnailEndPoint; import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager; @@ -19,12 +20,15 @@ import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.enums.LogTypeEnum; +import com.google.common.collect.Maps; import jakarta.validation.Valid; import org.springframework.validation.annotation.Validated; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH; @@ -116,6 +120,18 @@ public class JobEndPoint { jobContext.setRetryScene(dispatchJob.getRetryScene()); jobContext.setTaskName(dispatchJob.getTaskName()); jobContext.setMrStage(dispatchJob.getMrStage()); + + String wfContext = dispatchJob.getWfContext(); + if (StrUtil.isNotBlank(wfContext)) { + try { + jobContext.setWfContext(JsonUtil.parseConcurrentHashMap(wfContext)); + } catch (Exception e) { + SnailJobLog.REMOTE.warn("workflow context parse error", e); + } + } else { + jobContext.setWfContext(Maps.newConcurrentMap()); + } + return jobContext; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java index d183bc642..e7523b19a 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java @@ -1,7 +1,11 @@ package com.aizuda.snailjob.client.job.core.dto; +import cn.hutool.core.util.StrUtil; import lombok.Data; +import java.util.Map; +import java.util.Objects; + /** * @author: opensnail * @date : 2023-10-18 16:53 @@ -15,4 +19,15 @@ public class JobArgs { private String executorInfo; private Long taskBatchId; + + private Map wfContext; + + public void appendContext(String key, Object value) { + if (Objects.isNull(wfContext) || StrUtil.isBlank(key) || Objects.isNull(value)) { + return; + } + + wfContext.put(key, value); + } + } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 22bbd034d..d1e281613 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.client.job.core.executor; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager; import com.aizuda.snailjob.client.job.core.IJobExecutor; import com.aizuda.snailjob.client.job.core.cache.FutureCache; @@ -12,6 +13,7 @@ import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.model.JobContext; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.enums.LogTypeEnum; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; @@ -60,6 +62,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor { jobArgs = buildJobArgs(jobContext); } + jobArgs.setWfContext(jobContext.getWfContext()); + try { // 初始化调度信息(日志上报LogUtil) initLogContext(jobContext); @@ -117,6 +121,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { jobArgs.setArgsStr(jobContext.getArgsStr()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); + jobArgs.setWfContext(jobContext.getWfContext()); return jobArgs; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java index 923852143..1d0641df5 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.client.job.core.executor; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DatePattern; import com.aizuda.snailjob.client.common.cache.GroupVersionCache; import com.aizuda.snailjob.client.common.config.SnailJobProperties; @@ -32,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CancellationException; @@ -160,6 +162,12 @@ public class JobExecutorFutureCallback implements FutureCallback dispatchJobRequest.setTaskStatus(status); dispatchJobRequest.setRetry(jobContext.isRetry()); dispatchJobRequest.setRetryScene(jobContext.getRetryScene()); + // 传递上下文 + Map wfContext = jobContext.getWfContext(); + if (CollUtil.isNotEmpty(wfContext)) { + dispatchJobRequest.setWfContext(JsonUtil.toJsonString(wfContext)); + } + return dispatchJobRequest; } diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java index 48cccb939..d3b5cb56b 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java @@ -73,5 +73,5 @@ public class DispatchJobRequest { /** * 工作流上下文 */ - private String wkContext; + private String wfContext; } diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java index f3d723de8..701d4fa2f 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java @@ -44,5 +44,5 @@ public class DispatchJobResultRequest { /** * 工作流上下文 */ - private String wkContext; + private String wfContext; } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java index 5b976dc1f..e9ee837d0 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.common.core.model; import lombok.Data; import java.util.List; +import java.util.Map; /** * @author: opensnail @@ -59,5 +60,15 @@ public class JobContext { */ private String taskName; + /** + * 动态分片所处的阶段 + */ private Integer mrStage; + + /** + * 工作流全局上下文 + */ + private Map wfContext; + + } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java index 6332bcdb2..a54307d69 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static com.aizuda.snailjob.common.core.constant.SystemConstants.YYYY_MM_DD; import static com.aizuda.snailjob.common.core.constant.SystemConstants.YYYY_MM_DD_HH_MM_SS; @@ -72,6 +73,16 @@ public class JsonUtil { return JsonMapper.toJavaObject(jsonString, HashMap.class); } + /** + * 将JSON字符串转ConcurrentHashMap 对象 + * + * @param jsonString + * @return + */ + public static Map parseConcurrentHashMap(String jsonString) { + return JsonMapper.toJavaObject(jsonString, ConcurrentHashMap.class); + } + /** * 将JSON字符串转Map 对象 * diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java index fa1a47f1d..620e144c4 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java @@ -102,5 +102,5 @@ public class RealJobExecutorDTO extends BaseDTO { /** * 工作流上下文 */ - private String wkContext; + private String wfContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java index 0b9069b25..f98052657 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java @@ -106,7 +106,8 @@ public interface JobTaskConverter { @Mapping(source = "jobTask.extAttrs", target = "extAttrs"), @Mapping(source = "jobTask.namespaceId", target = "namespaceId"), @Mapping(source = "jobTask.taskName", target = "taskName"), - @Mapping(source = "jobTask.mrStage", target = "mrStage") + @Mapping(source = "jobTask.mrStage", target = "mrStage"), + @Mapping(source = "context.wfContext", target = "wfContext") }) RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java index ed3f4f7ec..178df6eed 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java @@ -50,5 +50,5 @@ public class ClientCallbackContext { /** * 工作流上下文 */ - private String wkContext; + private String wfContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 47f795fa5..e7b5118fd 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -198,7 +198,7 @@ public class JobExecutorActor extends AbstractActor { context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); if (Objects.nonNull(workflowTaskBatch)) { - context.setWkContext(workflowTaskBatch.getWfContext()); + context.setWfContext(workflowTaskBatch.getWfContext()); } return context; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 483fe7269..3710f644b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -41,7 +41,6 @@ public class JobExecutorResultActor extends AbstractActor { private static final String KEY = "job_complete_{0}_{1}"; private final JobTaskMapper jobTaskMapper; private final JobTaskBatchHandler jobTaskBatchHandler; - private final WorkflowBatchHandler workflowBatchHandler; private final DistributedLockHandler distributedLockHandler; @Override diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 157a88fd8..fdb2dbb7a 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -118,7 +118,7 @@ public class WorkflowExecutorActor extends AbstractActor { // 添加父节点,为了判断父节点的处理状态 List allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, - JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason) + JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) .in(JobTaskBatch::getWorkflowNodeId, Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId()))) @@ -167,6 +167,10 @@ public class WorkflowExecutorActor extends AbstractActor { List jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId)); Map jobMap = StreamUtils.toIdentityMap(jobs, Job::getId); + // TODO 合并job task的结果到全局上下文中 + workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch, + StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId)); + // 只会条件节点会使用 Object evaluationResult = null; for (WorkflowNode workflowNode : workflowNodes) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java index 8a1e67345..14e8c982c 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java @@ -93,6 +93,6 @@ public class JobExecutorContext { /** * 工作流上下文 */ - private String wkContext; + private String wfContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index 06a23f81d..2bb1d608d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum; import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor; @@ -56,6 +57,9 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init @Override public void execute(WorkflowExecutorContext context) { + + // 若多个兄弟节点的情况下,同时处理完成则每个节点都有可能来执行后继节点, + // 因此这里这里添加分布式锁 distributedLockHandler.lockWithDisposableAndRetry( () -> { long total = 0; @@ -69,6 +73,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init if (CollUtil.isNotEmpty(jobTaskBatches)) { total = jobTaskBatches.size(); + // ToDo JobTaskBatch jobTaskBatch = jobTaskBatches.get(0); if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) { context.setEvaluationResult(Boolean.FALSE); @@ -89,9 +94,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init return; } - // 合并job task的结果到全局上下文中 - workflowBatchHandler.mergeAllWorkflowContext(context.getWorkflowTaskBatchId(), context.getTaskBatchId()); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 8c941e547..f64613a22 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*; /** @@ -59,13 +60,23 @@ public class JobTaskBatchHandler { @Transactional public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) { + // 幂等处理 + Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() + .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) + .in(JobTaskBatch::getTaskBatchStatus, COMPLETED) + ); + if (countJobTaskBatch > 0) { + // 批次已经完成了,不需要重复更新 + return true; + } + List jobTasks = jobTaskMapper.selectList( - new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getMrStage) - .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); + new LambdaQueryWrapper() + .select(JobTask::getTaskStatus, JobTask::getMrStage) + .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); if (CollUtil.isEmpty(jobTasks) || - jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { + jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { return false; } @@ -73,7 +84,7 @@ public class JobTaskBatchHandler { jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId()); Map statusCountMap = jobTasks.stream() - .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); + .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); @@ -85,7 +96,8 @@ public class JobTaskBatchHandler { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); } else { // todo 调试完成删除 - SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks)); + SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), + JsonUtil.toJsonString(jobTasks)); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); if (needReduceTask(completeJobBatchDTO, jobTasks)) { @@ -107,9 +119,9 @@ public class JobTaskBatchHandler { jobTaskBatch.setUpdateDt(LocalDateTime.now()); return 1 == jobTaskBatchMapper.update(jobTaskBatch, - new LambdaUpdateWrapper() - .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) ); } @@ -149,14 +161,15 @@ public class JobTaskBatchHandler { private static boolean isAllMapTask(final List jobTasks) { return jobTasks.size() == jobTasks.stream() - .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage()) - .count(); + .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage()) + .count(); } private static boolean isALeastOneReduceTask(final List jobTasks) { return jobTasks.stream() - .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage()) - .count() > 1; + .filter( + jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage()) + .count() > 1; } /** @@ -167,21 +180,21 @@ public class JobTaskBatchHandler { */ public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { if (Objects.isNull(job) - || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - // 是否是常驻任务 - || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) - // 防止任务已经分配到其他节点导致的任务重复执行 - || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) + || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + // 是否是常驻任务 + || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) + // 防止任务已经分配到其他节点导致的任务重复执行 + || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) ) { return; } long count = groupConfigMapper.selectCount(new LambdaQueryWrapper() - .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) - .eq(GroupConfig::getGroupName, job.getGroupName()) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); + .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) + .eq(GroupConfig::getGroupName, job.getGroupName()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); if (count == 0) { return; } @@ -208,7 +221,7 @@ public class JobTaskBatchHandler { Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000); log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, - DateUtils.toNowMilli() % 1000); + DateUtils.toNowMilli() % 1000); job.setNextTriggerAt(nextTriggerAt); JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration); ResidentTaskCache.refresh(job.getId(), nextTriggerAt); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index 85d781b86..3d2de4765 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -61,7 +61,7 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_C @Component @RequiredArgsConstructor public class WorkflowBatchHandler { - private static final String KEY = "update_wf_context_{}"; + private static final String KEY = "update_wf_context_{0}"; private final DistributedLockHandler distributedLockHandler; private final WorkflowTaskBatchMapper workflowTaskBatchMapper; @@ -320,8 +320,14 @@ public class WorkflowBatchHandler { } } - public void mergeWorkflowContextAndRetry(Long workflowTaskBatchId, String waitMergeContext) { - if (StrUtil.isBlank(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) { + /** + * 合并工作流上下文若合并失败先自旋3次1.5s, 若失败了升级到悲观锁 + * + * @param workflowTaskBatch 工作流批次 + * @param taskBatchIds 批次列表 + */ + public void mergeWorkflowContextAndRetry(WorkflowTaskBatch workflowTaskBatch, Set taskBatchIds) { + if (CollUtil.isEmpty(taskBatchIds)) { return; } @@ -349,42 +355,62 @@ public class WorkflowBatchHandler { }).build(); try { - retryer.call(() -> mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class))); + retryer.call(() -> mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds)); } catch (Exception e) { - SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]", - workflowTaskBatchId, waitMergeContext, e); + SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] taskBatchIds:[{}]", + workflowTaskBatch.getId(), taskBatchIds, e); if (e.getClass().isAssignableFrom(RetryException.class)) { // 如果自旋失败,就使用悲观锁 distributedLockHandler.lockWithDisposableAndRetry(() -> { - mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class)); - }, MessageFormat.format(KEY, workflowTaskBatchId), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); + mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds); + }, MessageFormat.format(KEY, workflowTaskBatch.getId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); } } } - public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) { + public boolean mergeAllWorkflowContext(WorkflowTaskBatch workflowTaskBatch, Set taskBatchIds) { + if (CollUtil.isEmpty(taskBatchIds)) { + return true; + } + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() .select(JobTask::getWfContext, JobTask::getId) - .eq(JobTask::getTaskBatchId, taskBatchId)); + .in(JobTask::getTaskBatchId, taskBatchIds) + ); if (CollUtil.isEmpty(jobTasks)) { return true; } Set> maps = jobTasks.stream().map(r -> { try { - return JsonUtil.parseHashMap(r.getWfContext(), Object.class); + if (StrUtil.isNotBlank(r.getWfContext())) { + return JsonUtil.parseHashMap(r.getWfContext(), Object.class); + } } catch (Exception e) { - SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage()); + SnailJobLog.LOCAL.warn("taskId:[{}] result value is not a JSON object. result:[{}]", r.getId(), r.getResultMessage()); } return new HashMap(); }).collect(Collectors.toSet()); - Map mergeMap = Maps.newHashMap(); + Map mergeMap; + if (StrUtil.isBlank(workflowTaskBatch.getWfContext())) { + mergeMap = Maps.newHashMap(); + } else { + mergeMap = JsonUtil.parseHashMap(workflowTaskBatch.getWfContext()); + } + for (Map map : maps) { mergeMaps(mergeMap, map); } - return mergeWorkflowContext(workflowTaskBatchId, mergeMap); + WorkflowTaskBatch waitUpdateWorkflowTaskBatch = new WorkflowTaskBatch(); + waitUpdateWorkflowTaskBatch.setId(workflowTaskBatch.getId()); + waitUpdateWorkflowTaskBatch.setWfContext(JsonUtil.toJsonString(mergeMap)); + waitUpdateWorkflowTaskBatch.setVersion(1); + return 1 == workflowTaskBatchMapper.update(waitUpdateWorkflowTaskBatch, new LambdaQueryWrapper() + .eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId()) + .eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion()) + ); } /**