feat: 2.6.0
1. 优化工作流执行丢弃问题
This commit is contained in:
parent
555600f7d9
commit
a989640eca
@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
|
|||||||
|
|
||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.util.RandomUtil;
|
||||||
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.easy.retry.common.log.EasyRetryLog;
|
import com.aizuda.easy.retry.common.log.EasyRetryLog;
|
||||||
@ -21,7 +22,6 @@ import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
@ -32,9 +32,11 @@ import org.springframework.context.annotation.Scope;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import java.math.RoundingMode;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -75,6 +77,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
|
|||||||
long now = DateUtils.toNowMilli();
|
long now = DateUtils.toNowMilli();
|
||||||
for (PartitionTask partitionTask : partitionTasks) {
|
for (PartitionTask partitionTask : partitionTasks) {
|
||||||
WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask;
|
WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask;
|
||||||
|
log.warn("监控时间. workflowId:[{}] now:[{}], dbnextTriggerAt:[{}]", workflowPartitionTaskDTO.getId(), now, workflowPartitionTaskDTO.getNextTriggerAt());
|
||||||
processWorkflow(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
|
processWorkflow(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,6 +85,8 @@ public class ScanWorkflowTaskActor extends AbstractActor {
|
|||||||
workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
|
workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
|
||||||
|
|
||||||
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
|
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
|
||||||
|
|
||||||
|
log.warn("监控时间. workflowId:[{}] now:[{}], nextTriggerAt:[{}]", waitExecTask.getWorkflowId(), now, waitExecTask.getNextTriggerAt());
|
||||||
// 执行预处理阶段
|
// 执行预处理阶段
|
||||||
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
|
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
|
||||||
waitExecTask.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
|
waitExecTask.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
|
||||||
@ -93,13 +98,14 @@ public class ScanWorkflowTaskActor extends AbstractActor {
|
|||||||
List<WorkflowTaskPrepareDTO> waitExecJobs, long now) {
|
List<WorkflowTaskPrepareDTO> waitExecJobs, long now) {
|
||||||
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
|
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
|
||||||
|
|
||||||
Workflow workflow = new Workflow();
|
|
||||||
workflow.setId(partitionTask.getId());
|
|
||||||
|
|
||||||
// 更新下次触发时间
|
// 更新下次触发时间
|
||||||
Long nextTriggerAt = calculateNextTriggerTime(partitionTask, now);
|
Long nextTriggerAt = calculateNextTriggerTime(partitionTask, now);
|
||||||
|
|
||||||
|
Workflow workflow = new Workflow();
|
||||||
|
workflow.setId(partitionTask.getId());
|
||||||
workflow.setNextTriggerAt(nextTriggerAt);
|
workflow.setNextTriggerAt(nextTriggerAt);
|
||||||
waitUpdateWorkflows.add(workflow);
|
waitUpdateWorkflows.add(workflow);
|
||||||
|
|
||||||
waitExecJobs.add(WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(partitionTask));
|
waitExecJobs.add(WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(partitionTask));
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -108,7 +114,8 @@ public class ScanWorkflowTaskActor extends AbstractActor {
|
|||||||
|
|
||||||
long nextTriggerAt = partitionTask.getNextTriggerAt();
|
long nextTriggerAt = partitionTask.getNextTriggerAt();
|
||||||
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
||||||
nextTriggerAt = now;
|
long randomMs = (long) (RandomUtil.randomDouble(0, 4, 2, RoundingMode.UP) * 1000);
|
||||||
|
nextTriggerAt = now + randomMs;
|
||||||
partitionTask.setNextTriggerAt(nextTriggerAt);
|
partitionTask.setNextTriggerAt(nextTriggerAt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ public class JobTimerWheel implements Lifecycle {
|
|||||||
private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-";
|
private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-";
|
||||||
private static HashedWheelTimer timer = null;
|
private static HashedWheelTimer timer = null;
|
||||||
private static final ThreadPoolExecutor executor =
|
private static final ThreadPoolExecutor executor =
|
||||||
new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS,
|
new ThreadPoolExecutor(32, 32, 10, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX));
|
new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX));
|
||||||
|
|
||||||
private static final TimerIdempotent idempotent = new TimerIdempotent();
|
private static final TimerIdempotent idempotent = new TimerIdempotent();
|
||||||
@ -44,8 +44,8 @@ public class JobTimerWheel implements Lifecycle {
|
|||||||
public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
||||||
|
|
||||||
if (!isExisted(taskType, uniqueId)) {
|
if (!isExisted(taskType, uniqueId)) {
|
||||||
|
log.info("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
|
||||||
delay = delay < 0 ? 0 : delay;
|
delay = delay < 0 ? 0 : delay;
|
||||||
log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId);
|
|
||||||
try {
|
try {
|
||||||
timer.newTimeout(task, delay, unit);
|
timer.newTimeout(task, delay, unit);
|
||||||
idempotent.set(uniqueId, uniqueId);
|
idempotent.set(uniqueId, uniqueId);
|
||||||
|
@ -62,6 +62,7 @@ public class WorkflowTimerTask implements TimerTask {
|
|||||||
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
|
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
|
||||||
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||||
jobTaskBatch.setOperationReason(operationReason);
|
jobTaskBatch.setOperationReason(operationReason);
|
||||||
|
jobTaskBatch.setUpdateDt(LocalDateTime.now());
|
||||||
Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch),
|
Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch),
|
||||||
() -> new EasyRetryServerException("更新任务失败"));
|
() -> new EasyRetryServerException("更新任务失败"));
|
||||||
|
|
||||||
|
@ -114,9 +114,9 @@ akka {
|
|||||||
type = "Dispatcher"
|
type = "Dispatcher"
|
||||||
executor = "thread-pool-executor"
|
executor = "thread-pool-executor"
|
||||||
thread-pool-executor {
|
thread-pool-executor {
|
||||||
core-pool-size-min = 32
|
core-pool-size-min = 64
|
||||||
core-pool-size-factor = 2.0
|
core-pool-size-factor = 2.0
|
||||||
core-pool-size-max = 64
|
core-pool-size-max = 128
|
||||||
}
|
}
|
||||||
throughput = 10
|
throughput = 10
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user