feat: 2.6.0

1.修复节点重试时导致决策错误问题
This commit is contained in:
byteblogs168 2024-01-20 22:39:46 +08:00
parent ce68cd733d
commit 28096b946d
4 changed files with 48 additions and 15 deletions

View File

@ -3,10 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.JobArgsTypeEnum; import com.aizuda.easy.retry.common.core.enums.*;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
@ -30,12 +27,16 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.Duration; import java.time.Duration;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import static com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
/** /**
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:15:19 * @date 2023-12-24 08:15:19
@ -64,10 +65,32 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
distributedLockHandler.lockWithDisposableAndRetry( distributedLockHandler.lockWithDisposableAndRetry(
() -> { () -> {
Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>() long total = 0;
// 条件节点存在并发问题需要特殊处理
if (WorkflowNodeTypeEnum.DECISION.getType() == context.getNodeType()) {
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getOperationReason)
.eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId()) .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
.eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId()) .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
); );
if (!CollectionUtils.isEmpty(jobTaskBatches)) {
total = jobTaskBatches.size();
JobTaskBatch jobTaskBatch = jobTaskBatches.get(0);
if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
context.setEvaluationResult(Boolean.FALSE);
} else {
context.setEvaluationResult(Boolean.TRUE);
}
}
} else {
total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
.eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
);
}
if (total > 0) { if (total > 0) {
log.warn("任务节点[{}]已被执行,请勿重复执行", context.getWorkflowNodeId()); log.warn("任务节点[{}]已被执行,请勿重复执行", context.getWorkflowNodeId());
return; return;

View File

@ -98,4 +98,8 @@ public class WorkflowExecutorContext {
*/ */
private Integer taskExecutorScene; private Integer taskExecutorScene;
/**
* 1任务节点 2条件节点 3回调节点
*/
private Integer nodeType;
} }

View File

@ -37,6 +37,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE; import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
/** /**
@ -131,7 +132,7 @@ public class WorkflowBatchHandler {
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) { for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
// 只要是无需处理的说明后面的子节点都不需要处理了isNeedProcess为false // 只要是无需处理的说明后面的子节点都不需要处理了isNeedProcess为false
if (JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) { if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
isNeedProcess = false; isNeedProcess = false;
continue; continue;
} }
@ -227,8 +228,7 @@ public class WorkflowBatchHandler {
// 判定条件节点是否已经执行完成 // 判定条件节点是否已经执行完成
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId); JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId);
if (Objects.nonNull(parentJobTaskBatch) && if (Objects.nonNull(parentJobTaskBatch) &&
JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(parentJobTaskBatch.getOperationReason())) {
== parentJobTaskBatch.getOperationReason()) {
return; return;
} }

View File

@ -47,7 +47,7 @@ public class JobLogServiceImpl implements JobLogService {
.ge(JobLogMessage::getJobId, queryVO.getJobId()) .ge(JobLogMessage::getJobId, queryVO.getJobId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId()); .eq(JobLogMessage::getTaskId, queryVO.getTaskId());
queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId); queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByAsc(JobLogMessage::getId);
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper); PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper);
List<JobLogMessage> records = selectPage.getRecords(); List<JobLogMessage> records = selectPage.getRecords();
if (CollectionUtils.isEmpty(records)) { if (CollectionUtils.isEmpty(records)) {
@ -86,7 +86,13 @@ public class JobLogServiceImpl implements JobLogService {
long nextStartId = 0; long nextStartId = 0;
List<String> messages = Lists.newArrayList(); List<String> messages = Lists.newArrayList();
List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectBatchIds(ids); List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectList(
new LambdaQueryWrapper<JobLogMessage>()
.in(JobLogMessage::getId, ids)
.orderByAsc(JobLogMessage::getRealTime)
.orderByAsc(JobLogMessage::getId)
);
for (final JobLogMessage jobLogMessage : jobLogMessages) { for (final JobLogMessage jobLogMessage : jobLogMessages) {
List<String> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class); List<String> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);