From bc4ed268267486d183df0eac9226783ca11a9afe Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sat, 20 Jan 2024 22:39:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=E4=BF=AE=E5=A4=8D=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E9=87=8D=E8=AF=95=E6=97=B6=E5=AF=BC=E8=87=B4=E5=86=B3?= =?UTF-8?q?=E7=AD=96=E9=94=99=E8=AF=AF=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/AbstractWorkflowExecutor.java | 39 +++++++++++++++---- .../workflow/WorkflowExecutorContext.java | 4 ++ .../support/handler/WorkflowBatchHandler.java | 10 ++--- .../web/service/impl/JobLogServiceImpl.java | 10 ++++- 4 files changed, 48 insertions(+), 15 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index 1d39a386..e6e52f62 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -3,10 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; -import com.aizuda.easy.retry.common.core.enums.JobArgsTypeEnum; -import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; -import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.enums.*; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; @@ -30,12 +27,16 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.util.CollectionUtils; import java.text.MessageFormat; import java.time.Duration; +import java.util.List; import java.util.Objects; import java.util.Optional; +import static com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION; + /** * @author xiaowoniu * @date 2023-12-24 08:15:19 @@ -64,10 +65,32 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init distributedLockHandler.lockWithDisposableAndRetry( () -> { - Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() - .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId()) - .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId()) - ); + long total = 0; + // 条件节点存在并发问题,需要特殊处理 + if (WorkflowNodeTypeEnum.DECISION.getType() == context.getNodeType()) { + List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() + .select(JobTaskBatch::getOperationReason) + .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId()) + .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId()) + ); + + if (!CollectionUtils.isEmpty(jobTaskBatches)) { + total = jobTaskBatches.size(); + JobTaskBatch jobTaskBatch = jobTaskBatches.get(0); + if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) { + context.setEvaluationResult(Boolean.FALSE); + } else { + context.setEvaluationResult(Boolean.TRUE); + } + } + + } else { + total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId()) + .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId()) + ); + } + if (total > 0) { log.warn("任务节点[{}]已被执行,请勿重复执行", context.getWorkflowNodeId()); return; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java index 46c5c83a..35793444 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java @@ -98,4 +98,8 @@ public class WorkflowExecutorContext { */ private Integer taskExecutorScene; + /** + * 1、任务节点 2、条件节点 3、回调节点 + */ + private Integer nodeType; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java index ddb62c4e..716a58ca 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import static com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION; import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE; /** @@ -56,7 +57,7 @@ public class WorkflowBatchHandler { return complete(workflowTaskBatchId, null); } - public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) { + public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) { workflowTaskBatch = Optional.ofNullable(workflowTaskBatch) .orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId)); Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); @@ -103,7 +104,7 @@ public class WorkflowBatchHandler { if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) { // 只要叶子节点不是无需处理的都是失败 if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason() - && JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) { + && JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) { taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); } } @@ -131,7 +132,7 @@ public class WorkflowBatchHandler { for (JobTaskBatch jobTaskBatch : jobTaskBatchList) { // 只要是无需处理的说明后面的子节点都不需要处理了,isNeedProcess为false - if (JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) { + if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) { isNeedProcess = false; continue; } @@ -227,8 +228,7 @@ public class WorkflowBatchHandler { // 判定条件节点是否已经执行完成 JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId); if (Objects.nonNull(parentJobTaskBatch) && - JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() - == parentJobTaskBatch.getOperationReason()) { + WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(parentJobTaskBatch.getOperationReason())) { return; } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java index 32982666..cda816c6 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java @@ -47,7 +47,7 @@ public class JobLogServiceImpl implements JobLogService { .ge(JobLogMessage::getJobId, queryVO.getJobId()) .eq(JobLogMessage::getTaskId, queryVO.getTaskId()); - queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId); + queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByAsc(JobLogMessage::getId); PageDTO selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper); List records = selectPage.getRecords(); if (CollectionUtils.isEmpty(records)) { @@ -86,7 +86,13 @@ public class JobLogServiceImpl implements JobLogService { long nextStartId = 0; List messages = Lists.newArrayList(); - List jobLogMessages = jobLogMessageMapper.selectBatchIds(ids); + List jobLogMessages = jobLogMessageMapper.selectList( + new LambdaQueryWrapper() + .in(JobLogMessage::getId, ids) + .orderByAsc(JobLogMessage::getRealTime) + .orderByAsc(JobLogMessage::getId) + ); + for (final JobLogMessage jobLogMessage : jobLogMessages) { List originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);