feat: 2.6.0

1. 优化工作流线程池配置
This commit is contained in:
byteblogs168 2024-01-18 23:14:03 +08:00
parent 6a7d3900d3
commit e9067eda6b
4 changed files with 25 additions and 31 deletions

View File

@ -55,6 +55,8 @@ public class ActorGenerator {
private static final String JOB_TASK_EXECUTOR_DISPATCHER = "akka.actor.job-task-executor-dispatcher";
private static final String JOB_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.job-task-executor-result-dispatcher";
private static final String JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER = "akka.actor.job-task-executor-call-client-dispatcher";
private static final String WORKFLOW_TASK_DISPATCHER = "akka.actor.workflow-task-prepare-dispatcher";
private static final String WORKFLOW_TASK_EXECUTOR_DISPATCHER = "akka.actor.workflow-task-executor-dispatcher";
/*----------------------------------------分布式任务调度 END----------------------------------------*/
@ -203,7 +205,7 @@ public class ActorGenerator {
*/
public static ActorRef workflowTaskPrepareActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(WORKFLOW_TASK_PREPARE_ACTOR)
.withDispatcher(JOB_TASK_DISPATCHER));
.withDispatcher(WORKFLOW_TASK_DISPATCHER));
}
/**
@ -228,7 +230,7 @@ public class ActorGenerator {
return getJobActorSystem()
.actorOf(getSpringExtension()
.props(WORKFLOW_EXECUTOR_ACTOR)
.withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER)
.withDispatcher(WORKFLOW_TASK_EXECUTOR_DISPATCHER)
);
}

View File

@ -5,7 +5,6 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
@ -106,33 +105,6 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
return null;
}
private void failRetry(ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
// 更新重试次数
JobTask updateJobTask = new JobTask();
updateJobTask.setRetryCount(1);
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId())
));
if (success) {
}
return;
}
}
}
protected abstract void doCallback(ClientCallbackContext context);
@Override

View File

@ -91,7 +91,6 @@ public class JobTaskBatchHandler {
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
// 这里取第一个的任务执行结果
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);

View File

@ -99,5 +99,26 @@ akka {
throughput = 10
}
workflow-task-prepare-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 32
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
workflow-task-executor-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 32
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
}
}