From 4fb9ac010a472ba7ffbfeabceae2a7f4ec1d7613 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 26 Jan 2024 14:29:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E6=B5=81=E6=89=A7=E8=A1=8C=E4=B8=A2=E5=BC=83?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch/WorkflowExecutorActor.java | 7 ++++ .../task/support/timer/WorkflowTimerTask.java | 25 --------------- .../src/main/resources/easyretry.conf | 32 +++++++++---------- 3 files changed, 23 insertions(+), 41 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index 715f410c..578acc11 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -35,6 +35,7 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Objects; @@ -62,7 +63,9 @@ public class WorkflowExecutorActor extends AbstractActor { return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> { log.info("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute)); try { + doExecutor(taskExecute); + } catch (Exception e) { EasyRetryLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e); handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason()); @@ -77,6 +80,8 @@ public class WorkflowExecutorActor extends AbstractActor { WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId()); Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); + handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason()); + // 获取DAG图 String flowInfo = workflowTaskBatch.getFlowInfo(); MutableGraph graph = MutableGraphCache.getOrDefault(workflowTaskBatch.getId(), flowInfo); @@ -168,8 +173,10 @@ public class WorkflowExecutorActor extends AbstractActor { jobTaskBatch.setExecutionAt(DateUtils.toNowMilli()); jobTaskBatch.setTaskBatchStatus(taskStatus); jobTaskBatch.setOperationReason(operationReason); + jobTaskBatch.setUpdateDt(LocalDateTime.now()); Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch), () -> new EasyRetryServerException("更新任务失败")); } + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java index 9f817743..5077f610 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java @@ -1,18 +1,10 @@ package com.aizuda.easy.retry.server.job.task.support.timer; import akka.actor.ActorRef; -import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.constant.SystemConstants; -import com.aizuda.easy.retry.common.core.context.SpringContext; -import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; -import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; -import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; -import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import io.netty.util.Timeout; import io.netty.util.TimerTask; import lombok.AllArgsConstructor; @@ -38,10 +30,6 @@ public class WorkflowTimerTask implements TimerTask { try { - int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); - int operationReason = JobOperationReasonEnum.NONE.getReason(); - handlerTaskBatch(workflowTimerTaskDTO.getWorkflowTaskBatchId(), taskStatus, operationReason); - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId()); @@ -55,17 +43,4 @@ public class WorkflowTimerTask implements TimerTask { } } - private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) { - - WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); - jobTaskBatch.setId(workflowTaskBatchId); - jobTaskBatch.setExecutionAt(DateUtils.toNowMilli()); - jobTaskBatch.setTaskBatchStatus(taskStatus); - jobTaskBatch.setOperationReason(operationReason); - jobTaskBatch.setUpdateDt(LocalDateTime.now()); - Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch), - () -> new EasyRetryServerException("更新任务失败")); - - } - } diff --git a/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf b/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf index 83b3e1af..8b51461a 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf +++ b/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf @@ -6,7 +6,7 @@ akka { thread-pool-executor { core-pool-size-min = 8 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -17,7 +17,7 @@ akka { thread-pool-executor { core-pool-size-min = 8 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -28,7 +28,7 @@ akka { thread-pool-executor { core-pool-size-min = 16 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -39,7 +39,7 @@ akka { thread-pool-executor { core-pool-size-min = 32 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -48,9 +48,9 @@ akka { type = "Dispatcher" executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 8 + core-pool-size-min = 32 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -59,9 +59,9 @@ akka { type = "Dispatcher" executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 32 + core-pool-size-min = 64 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -70,9 +70,9 @@ akka { type = "Dispatcher" executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 32 + core-pool-size-min = 64 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -81,9 +81,9 @@ akka { type = "Dispatcher" executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 32 + core-pool-size-min = 64 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -92,9 +92,9 @@ akka { type = "Dispatcher" executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 8 + core-pool-size-min = 64 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -105,7 +105,7 @@ akka { thread-pool-executor { core-pool-size-min = 64 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 } @@ -116,7 +116,7 @@ akka { thread-pool-executor { core-pool-size-min = 128 core-pool-size-factor = 2.0 - core-pool-size-max = 128 + core-pool-size-max = 256 } throughput = 10 }