diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java index 89112cd7..029afa9d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; +import cn.hutool.core.util.RandomUtil; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.log.EasyRetryLog; @@ -21,7 +22,6 @@ import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; -import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -32,9 +32,11 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.math.RoundingMode; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.stream.Collectors; /** @@ -75,6 +77,7 @@ public class ScanWorkflowTaskActor extends AbstractActor { long now = DateUtils.toNowMilli(); for (PartitionTask partitionTask : partitionTasks) { WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask; + log.warn("监控时间. workflowId:[{}] now:[{}], dbnextTriggerAt:[{}]", workflowPartitionTaskDTO.getId(), now, workflowPartitionTaskDTO.getNextTriggerAt()); processWorkflow(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now); } @@ -82,6 +85,8 @@ public class ScanWorkflowTaskActor extends AbstractActor { workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs); for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) { + + log.warn("监控时间. workflowId:[{}] now:[{}], nextTriggerAt:[{}]", waitExecTask.getWorkflowId(), now, waitExecTask.getNextTriggerAt()); // 执行预处理阶段 ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor(); waitExecTask.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); @@ -93,13 +98,14 @@ public class ScanWorkflowTaskActor extends AbstractActor { List waitExecJobs, long now) { CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId()); - Workflow workflow = new Workflow(); - workflow.setId(partitionTask.getId()); - // 更新下次触发时间 Long nextTriggerAt = calculateNextTriggerTime(partitionTask, now); + + Workflow workflow = new Workflow(); + workflow.setId(partitionTask.getId()); workflow.setNextTriggerAt(nextTriggerAt); waitUpdateWorkflows.add(workflow); + waitExecJobs.add(WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(partitionTask)); } @@ -108,7 +114,8 @@ public class ScanWorkflowTaskActor extends AbstractActor { long nextTriggerAt = partitionTask.getNextTriggerAt(); if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { - nextTriggerAt = now; + long randomMs = (long) (RandomUtil.randomDouble(0, 4, 2, RoundingMode.UP) * 1000); + nextTriggerAt = now + randomMs; partitionTask.setNextTriggerAt(nextTriggerAt); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java index 978897c8..6e89d0f1 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java @@ -28,7 +28,7 @@ public class JobTimerWheel implements Lifecycle { private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-"; private static HashedWheelTimer timer = null; private static final ThreadPoolExecutor executor = - new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS, + new ThreadPoolExecutor(32, 32, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX)); private static final TimerIdempotent idempotent = new TimerIdempotent(); @@ -44,8 +44,8 @@ public class JobTimerWheel implements Lifecycle { public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) { if (!isExisted(taskType, uniqueId)) { + log.info("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); delay = delay < 0 ? 0 : delay; - log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId); try { timer.newTimeout(task, delay, unit); idempotent.set(uniqueId, uniqueId); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java index 39b7d896..9f817743 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java @@ -62,6 +62,7 @@ public class WorkflowTimerTask implements TimerTask { jobTaskBatch.setExecutionAt(DateUtils.toNowMilli()); jobTaskBatch.setTaskBatchStatus(taskStatus); jobTaskBatch.setOperationReason(operationReason); + jobTaskBatch.setUpdateDt(LocalDateTime.now()); Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch), () -> new EasyRetryServerException("更新任务失败")); diff --git a/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf b/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf index ea1844ea..19b1c9aa 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf +++ b/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf @@ -114,9 +114,9 @@ akka { type = "Dispatcher" executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 32 + core-pool-size-min = 64 core-pool-size-factor = 2.0 - core-pool-size-max = 64 + core-pool-size-max = 128 } throughput = 10 }