feat: 2.6.0
1. 优化工作流执行丢弃问题
This commit is contained in:
parent
7883594fda
commit
4fb9ac010a
@ -35,6 +35,7 @@ import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -62,7 +63,9 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
|
||||
log.info("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute));
|
||||
try {
|
||||
|
||||
doExecutor(taskExecute);
|
||||
|
||||
} catch (Exception e) {
|
||||
EasyRetryLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
|
||||
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
|
||||
@ -77,6 +80,8 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
|
||||
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
|
||||
|
||||
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason());
|
||||
|
||||
// 获取DAG图
|
||||
String flowInfo = workflowTaskBatch.getFlowInfo();
|
||||
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(workflowTaskBatch.getId(), flowInfo);
|
||||
@ -168,8 +173,10 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
|
||||
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||
jobTaskBatch.setOperationReason(operationReason);
|
||||
jobTaskBatch.setUpdateDt(LocalDateTime.now());
|
||||
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
|
||||
() -> new EasyRetryServerException("更新任务失败"));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,18 +1,10 @@
|
||||
package com.aizuda.easy.retry.server.job.task.support.timer;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
import lombok.AllArgsConstructor;
|
||||
@ -38,10 +30,6 @@ public class WorkflowTimerTask implements TimerTask {
|
||||
|
||||
try {
|
||||
|
||||
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
|
||||
int operationReason = JobOperationReasonEnum.NONE.getReason();
|
||||
handlerTaskBatch(workflowTimerTaskDTO.getWorkflowTaskBatchId(), taskStatus, operationReason);
|
||||
|
||||
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
|
||||
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
|
||||
taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId());
|
||||
@ -55,17 +43,4 @@ public class WorkflowTimerTask implements TimerTask {
|
||||
}
|
||||
}
|
||||
|
||||
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
|
||||
|
||||
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
||||
jobTaskBatch.setId(workflowTaskBatchId);
|
||||
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
|
||||
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||
jobTaskBatch.setOperationReason(operationReason);
|
||||
jobTaskBatch.setUpdateDt(LocalDateTime.now());
|
||||
Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch),
|
||||
() -> new EasyRetryServerException("更新任务失败"));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ akka {
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 8
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -17,7 +17,7 @@ akka {
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 8
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -28,7 +28,7 @@ akka {
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 16
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -39,7 +39,7 @@ akka {
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 32
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -48,9 +48,9 @@ akka {
|
||||
type = "Dispatcher"
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 8
|
||||
core-pool-size-min = 32
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -59,9 +59,9 @@ akka {
|
||||
type = "Dispatcher"
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 32
|
||||
core-pool-size-min = 64
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -70,9 +70,9 @@ akka {
|
||||
type = "Dispatcher"
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 32
|
||||
core-pool-size-min = 64
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -81,9 +81,9 @@ akka {
|
||||
type = "Dispatcher"
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 32
|
||||
core-pool-size-min = 64
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -92,9 +92,9 @@ akka {
|
||||
type = "Dispatcher"
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 8
|
||||
core-pool-size-min = 64
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -105,7 +105,7 @@ akka {
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 64
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
core-pool-size-max = 128
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
@ -116,7 +116,7 @@ akka {
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 128
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 128
|
||||
core-pool-size-max = 256
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user