feat: 2.4.0

1. 优化任务执行阶段,改为在执行阶段除了
This commit is contained in:
byteblogs168 2023-10-27 18:33:26 +08:00
parent 7e3130147c
commit b15f977cf4
2 changed files with 124 additions and 106 deletions

View File

@ -1,20 +1,42 @@
package com.aizuda.easy.retry.server.job.task.support.dispatch; package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobExecutor; import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorContext; import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorFactory; import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** /**
* @author: www.byteblogs.com * @author: www.byteblogs.com
@ -27,12 +49,22 @@ import org.springframework.stereotype.Component;
public class JobExecutorActor extends AbstractActor { public class JobExecutorActor extends AbstractActor {
@Autowired @Autowired
private JobMapper jobMapper; private JobMapper jobMapper;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private TransactionTemplate transactionTemplate;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> { return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
try { try {
doExecute(taskExecute); transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
doExecute(taskExecute);
}
});
} catch (Exception e) { } catch (Exception e) {
LogUtils.error(log, "job executor exception. [{}]", taskExecute, e); LogUtils.error(log, "job executor exception. [{}]", taskExecute, e);
} finally { } finally {
@ -42,13 +74,86 @@ public class JobExecutorActor extends AbstractActor {
} }
private void doExecute(final TaskExecuteDTO taskExecute) { private void doExecute(final TaskExecuteDTO taskExecute) {
Job job = jobMapper.selectById(taskExecute.getJobId());
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
context.setTaskBatchId(taskExecute.getTaskBatchId()); .eq(Job::getJobStatus, StatusEnum.YES.getStatus())
context.setJobId(job.getId()); .eq(Job::getId, taskExecute.getJobId())
jobExecutor.execute(context); );
try {
// 更新批次的状态
updateBatchStatus(taskExecute, job);
// 如果任务已经关闭则不需要执行
if (Objects.isNull(job)) {
return;
}
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setJobId(job.getId());
jobExecutor.execute(context);
} finally {
doHandlerResidentTask(job, taskExecute);
}
} }
private void updateBatchStatus(final TaskExecuteDTO taskExecute, final Job job) {
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
if (Objects.isNull(job)) {
log.warn("任务已经关闭不允许执行. jobId:[{}]", taskExecute.getJobId());
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason();
}
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(taskExecute.getTaskBatchId());
jobTaskBatch.setExecutionAt(LocalDateTime.now());
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
}
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)) {
return;
}
// 是否是常驻任务
if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setGroupName(taskExecuteDTO.getGroupName());
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
LocalDateTime preTriggerAt = ResidentTaskCache.get(job.getId());
if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) {
preTriggerAt = job.getNextTriggerAt();
}
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerType(job.getTriggerType());
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(preTriggerAt);
LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext);
// 获取时间差的毫秒数
Duration duration = Duration.between(preTriggerAt, nextTriggerAt);
long milliseconds = duration.toMillis();
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - System.currentTimeMillis() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}
} }

View File

@ -1,34 +1,15 @@
package com.aizuda.easy.retry.server.job.task.support.timer; package com.aizuda.easy.retry.server.job.task.support.timer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* @author: www.byteblogs.com * @author: www.byteblogs.com
@ -41,93 +22,25 @@ public class JobTimerTask implements TimerTask {
private JobTimerTaskDTO jobTimerTaskDTO; private JobTimerTaskDTO jobTimerTaskDTO;
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
@Override @Override
public void run(final Timeout timeout) throws Exception { public void run(final Timeout timeout) throws Exception {
// 执行任务调度 // 执行任务调度
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
executor.execute(() -> { try {
Job job = null; // 清除时间轮的缓存
try { JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
// 清除时间轮的缓存
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class); TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
job = jobMapper.selectOne(new LambdaQueryWrapper<Job>() taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId());
.eq(Job::getJobStatus, StatusEnum.YES.getStatus()) taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName());
.eq(Job::getId, jobTimerTaskDTO.getJobId()) taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId());
); ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); } catch (Exception e) {
int operationReason = JobOperationReasonEnum.NONE.getReason(); log.error("任务调度执行失败", e);
if (Objects.isNull(job)) {
log.warn("任务已经关闭不允许执行. jobId:[{}]", jobTimerTaskDTO.getJobId());
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason();
}
JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBeanByType(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(jobTimerTaskDTO.getTaskBatchId());
jobTaskBatch.setExecutionAt(LocalDateTime.now());
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
// 如果任务已经关闭则不需要执行
if (Objects.isNull(job)) {
return;
}
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId());
taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName());
taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
} finally {
// 处理常驻任务
doHandlerResidentTask(job);
}
});
}
private void doHandlerResidentTask(Job job) {
if (Objects.nonNull(job)) {
// 是否是常驻任务
if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) {
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
LocalDateTime preTriggerAt = ResidentTaskCache.get(jobTimerTaskDTO.getJobId());
if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) {
preTriggerAt = job.getNextTriggerAt();
}
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerType(job.getTriggerType());
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(preTriggerAt);
LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext);
// 获取时间差的毫秒数
Duration duration = Duration.between(preTriggerAt, nextTriggerAt);
long milliseconds = duration.toMillis();
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - System.currentTimeMillis() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(jobTimerTaskDTO.getJobId(), nextTriggerAt);
}
} }
} }
} }