feat: 2.6.0
1. 优化工作流线程池配置
This commit is contained in:
parent
09f73cf943
commit
c20a63a7cc
@ -55,6 +55,8 @@ public class ActorGenerator {
|
||||
private static final String JOB_TASK_EXECUTOR_DISPATCHER = "akka.actor.job-task-executor-dispatcher";
|
||||
private static final String JOB_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.job-task-executor-result-dispatcher";
|
||||
private static final String JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER = "akka.actor.job-task-executor-call-client-dispatcher";
|
||||
private static final String WORKFLOW_TASK_DISPATCHER = "akka.actor.workflow-task-prepare-dispatcher";
|
||||
private static final String WORKFLOW_TASK_EXECUTOR_DISPATCHER = "akka.actor.workflow-task-executor-dispatcher";
|
||||
|
||||
/*----------------------------------------分布式任务调度 END----------------------------------------*/
|
||||
|
||||
@ -203,7 +205,7 @@ public class ActorGenerator {
|
||||
*/
|
||||
public static ActorRef workflowTaskPrepareActor() {
|
||||
return getJobActorSystem().actorOf(getSpringExtension().props(WORKFLOW_TASK_PREPARE_ACTOR)
|
||||
.withDispatcher(JOB_TASK_DISPATCHER));
|
||||
.withDispatcher(WORKFLOW_TASK_DISPATCHER));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -228,7 +230,7 @@ public class ActorGenerator {
|
||||
return getJobActorSystem()
|
||||
.actorOf(getSpringExtension()
|
||||
.props(WORKFLOW_EXECUTOR_ACTOR)
|
||||
.withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER)
|
||||
.withDispatcher(WORKFLOW_TASK_EXECUTOR_DISPATCHER)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,6 @@ import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.easy.retry.common.log.EasyRetryLog;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
|
||||
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
|
||||
@ -106,33 +105,6 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
|
||||
return null;
|
||||
}
|
||||
|
||||
private void failRetry(ClientCallbackContext context) {
|
||||
|
||||
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
|
||||
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
|
||||
Job job = jobMapper.selectById(context.getJobId());
|
||||
if (jobTask == null || job == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
||||
// 更新重试次数
|
||||
JobTask updateJobTask = new JobTask();
|
||||
updateJobTask.setRetryCount(1);
|
||||
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
|
||||
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
|
||||
.eq(JobTask::getId, context.getTaskId())
|
||||
));
|
||||
|
||||
if (success) {
|
||||
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doCallback(ClientCallbackContext context);
|
||||
|
||||
@Override
|
||||
|
@ -91,7 +91,6 @@ public class JobTaskBatchHandler {
|
||||
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
|
||||
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
|
||||
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
|
||||
// 这里取第一个的任务执行结果
|
||||
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());
|
||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||
actorRef.tell(taskExecuteDTO, actorRef);
|
||||
|
@ -99,5 +99,26 @@ akka {
|
||||
throughput = 10
|
||||
}
|
||||
|
||||
workflow-task-prepare-dispatcher {
|
||||
type = "Dispatcher"
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 32
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
|
||||
workflow-task-executor-dispatcher {
|
||||
type = "Dispatcher"
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
core-pool-size-min = 32
|
||||
core-pool-size-factor = 2.0
|
||||
core-pool-size-max = 64
|
||||
}
|
||||
throughput = 10
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user