diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
index 1d39a386..e6e52f62 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
@@ -3,10 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
-import com.aizuda.easy.retry.common.core.enums.JobArgsTypeEnum;
-import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
-import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
-import com.aizuda.easy.retry.common.core.enums.StatusEnum;
+import com.aizuda.easy.retry.common.core.enums.*;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
@@ -30,12 +27,16 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
+import org.springframework.util.CollectionUtils;
import java.text.MessageFormat;
import java.time.Duration;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import static com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
+
/**
* @author xiaowoniu
* @date 2023-12-24 08:15:19
@@ -64,10 +65,32 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
distributedLockHandler.lockWithDisposableAndRetry(
() -> {
- Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
- .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
- .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
- );
+ long total = 0;
+ // 条件节点存在并发问题,需要特殊处理
+ if (WorkflowNodeTypeEnum.DECISION.getType() == context.getNodeType()) {
+ List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
+ .select(JobTaskBatch::getOperationReason)
+ .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
+ .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
+ );
+
+ if (!CollectionUtils.isEmpty(jobTaskBatches)) {
+ total = jobTaskBatches.size();
+ JobTaskBatch jobTaskBatch = jobTaskBatches.get(0);
+ if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
+ context.setEvaluationResult(Boolean.FALSE);
+ } else {
+ context.setEvaluationResult(Boolean.TRUE);
+ }
+ }
+
+ } else {
+ total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
+ .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
+ .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
+ );
+ }
+
if (total > 0) {
log.warn("任务节点[{}]已被执行,请勿重复执行", context.getWorkflowNodeId());
return;
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java
index 46c5c83a..35793444 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java
@@ -98,4 +98,8 @@ public class WorkflowExecutorContext {
*/
private Integer taskExecutorScene;
+ /**
+ * 1、任务节点 2、条件节点 3、回调节点
+ */
+ private Integer nodeType;
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java
index ddb62c4e..716a58ca 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java
@@ -37,6 +37,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
/**
@@ -56,7 +57,7 @@ public class WorkflowBatchHandler {
return complete(workflowTaskBatchId, null);
}
- public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) {
+ public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) {
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
@@ -103,7 +104,7 @@ public class WorkflowBatchHandler {
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) {
// 只要叶子节点不是无需处理的都是失败
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason()
- && JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) {
+ && JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
}
}
@@ -131,7 +132,7 @@ public class WorkflowBatchHandler {
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
// 只要是无需处理的说明后面的子节点都不需要处理了,isNeedProcess为false
- if (JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
+ if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
isNeedProcess = false;
continue;
}
@@ -227,8 +228,7 @@ public class WorkflowBatchHandler {
// 判定条件节点是否已经执行完成
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId);
if (Objects.nonNull(parentJobTaskBatch) &&
- JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()
- == parentJobTaskBatch.getOperationReason()) {
+ WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(parentJobTaskBatch.getOperationReason())) {
return;
}
diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java
index 32982666..cda816c6 100644
--- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java
+++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java
@@ -47,7 +47,7 @@ public class JobLogServiceImpl implements JobLogService {
.ge(JobLogMessage::getJobId, queryVO.getJobId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId());
- queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId);
+ queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByAsc(JobLogMessage::getId);
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper);
List<JobLogMessage> records = selectPage.getRecords();
if (CollectionUtils.isEmpty(records)) {
@@ -86,7 +86,13 @@ public class JobLogServiceImpl implements JobLogService {
long nextStartId = 0;
List<String> messages = Lists.newArrayList();
- List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectBatchIds(ids);
+ List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectList(
+ new LambdaQueryWrapper<JobLogMessage>()
+ .in(JobLogMessage::getId, ids)
+ .orderByAsc(JobLogMessage::getRealTime)
+ .orderByAsc(JobLogMessage::getId)
+ );
+
for (final JobLogMessage jobLogMessage : jobLogMessages) {
List<String> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);