From 383f21ff8d89ae6c9b3c65ac07697d7c1cdefda7 Mon Sep 17 00:00:00 2001 From: srzou Date: Mon, 24 Mar 2025 14:46:43 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.5.0):=E5=A2=9E=E5=8A=A0=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E6=9F=A5=E8=AF=A2=E6=97=B6=E9=97=B4=E8=BD=AE=EF=BC=8C?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E6=97=B6=E9=97=B4=E8=BD=AE=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E6=9F=A5=E8=AF=A2=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common}/idempotent/TimerIdempotent.java | 2 +- .../service/impl/DatabaseLogService.java | 46 ++++++----- .../common/timer/JobTaskLogTimerTask.java | 54 +++++++++++++ .../server/common/timer/LogTimerWheel.java | 79 +++++++++++++++++++ .../job/task/support/timer/JobTimerWheel.java | 2 +- 5 files changed, 160 insertions(+), 23 deletions(-) rename snail-job-server/{snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support => snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common}/idempotent/TimerIdempotent.java (94%) create mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/JobTaskLogTimerTask.java create mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/LogTimerWheel.java diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/idempotent/TimerIdempotent.java similarity index 94% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/idempotent/TimerIdempotent.java index f361165c2..27264b033 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/idempotent/TimerIdempotent.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/idempotent/TimerIdempotent.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.job.task.support.idempotent; +package com.aizuda.snailjob.server.common.idempotent; import com.aizuda.snailjob.server.common.IdempotentStrategy; import com.google.common.cache.Cache; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java index 8639d9941..6a08dde84 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java @@ -9,6 +9,8 @@ import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO; import com.aizuda.snailjob.server.common.dto.JobLogDTO; import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter; +import com.aizuda.snailjob.server.common.timer.JobTaskLogTimerTask; +import com.aizuda.snailjob.server.common.timer.LogTimerWheel; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; @@ -22,13 +24,13 @@ import com.google.common.collect.Lists; import jakarta.websocket.Session; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import java.io.IOException; +import java.time.Duration; import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; @@ -41,14 +43,12 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL * @Filename:DatabaseLogService * @since 1.5.0 */ - @Slf4j @RequiredArgsConstructor public class DatabaseLogService implements LogService { + private static final Long DELAY_MILLS = 5000L; private final JobLogMessageMapper jobLogMessageMapper; private final JobTaskBatchMapper jobTaskBatchMapper; - // 创建一个调度线程池 - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); /** * 保存单调日志 @@ -95,7 +95,7 @@ public class DatabaseLogService implements LogService { @Override public void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException { Boolean taskBatchComplete = false; - while (!taskBatchComplete){ + while (!taskBatchComplete) { PageDTO pageDTO = new PageDTO<>(1, queryVO.getSize()); PageDTO selectPage = jobLogMessageMapper.selectPage(pageDTO, @@ -116,18 +116,15 @@ public class DatabaseLogService implements LogService { JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); if (Objects.isNull(jobTaskBatch) - || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && + || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now())) ) { jobLogResponseVO.setFinished(Boolean.TRUE); jobLogResponseVO.setNextStartId(queryVO.getStartId()); jobLogResponseVO.setFromIndex(0); session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO)); - System.out.println("结束了"); return; - }else { - // 如果没有完成,就等五秒执行 - System.out.println("异步执行"); + } else { scheduleNextAttempt(queryVO, session); return; } @@ -191,15 +188,22 @@ public class DatabaseLogService implements LogService { } + /** + * 使用时间轮5秒再进行日志查询 + * + * @param queryVO + * @param session + */ private void scheduleNextAttempt(JobLogQueryVO queryVO, Session session) { - scheduler.schedule(() -> { - try { - // 再次调用查询 - getJobLogPage(queryVO, session); - } catch (IOException e) { - e.printStackTrace(); - } - // 5秒后执行 - }, 5, TimeUnit.SECONDS); + if (TransactionSynchronizationManager.isActualTransactionActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(int status) { + LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS)); + } + }); + } else { + LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS)); + } } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/JobTaskLogTimerTask.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/JobTaskLogTimerTask.java new file mode 100644 index 000000000..2072deb9a --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/JobTaskLogTimerTask.java @@ -0,0 +1,54 @@ +package com.aizuda.snailjob.server.common.timer; + +import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.TimerTask; +import com.aizuda.snailjob.server.common.service.LogService; +import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import io.netty.util.Timeout; +import jakarta.websocket.Session; +import lombok.AllArgsConstructor; + +import java.text.MessageFormat; +import java.time.LocalDateTime; +import java.util.Map; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.common.timer + * @Project:snail-job + * @Date:2025/3/24 14:08 + * @Filename:JobTaskLogTimerTask + * @since 1.5.0 + */ +@AllArgsConstructor +public class JobTaskLogTimerTask implements TimerTask { + private static final String SID = "sid"; + private static final String SCENE = "scene"; + private static final String IDEMPOTENT_KEY_PREFIX = "jobTaskLog_{0}_{1}_{2}"; + private JobLogQueryVO logQueryVO; + private Session session; + + @Override + public void run(final Timeout timeout) throws Exception { + SnailJobLog.LOCAL.debug("开始执行定时任务日志查询. 当前时间:[{}] jobTaskId:[{}]", LocalDateTime.now(), logQueryVO.getTaskBatchId()); + + try { + LogTimerWheel.clearCache(idempotentKey()); + LogService logService = SnailSpringContext.getBean(LogService.class); + logService.getJobLogPage(logQueryVO, session); + + } catch (Exception e) { + SnailJobLog.LOCAL.error("定时任务日志查询执行失败", e); + } + } + + @Override + public String idempotentKey() { + Map userProperties = session.getUserProperties(); + String sid = (String) userProperties.get(SID); + String scene = (String) userProperties.get(SCENE); + Long jobTaskId = logQueryVO.getTaskBatchId(); + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, scene, jobTaskId); + } +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/LogTimerWheel.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/LogTimerWheel.java new file mode 100644 index 000000000..f8518e19f --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/LogTimerWheel.java @@ -0,0 +1,79 @@ +package com.aizuda.snailjob.server.common.timer; + +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.TimerTask; +import com.aizuda.snailjob.server.common.idempotent.TimerIdempotent; +import io.netty.util.HashedWheelTimer; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; + +import java.time.Duration; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.job.task.support.timer + * @Project:snail-job + * @Date:2025/3/24 14:00 + * @Filename:LogTimerWheel + * @since 1.5.0 + */ +public class LogTimerWheel { + private static final int TICK_DURATION = 100; + private static final String THREAD_NAME_PREFIX = "log-timer-wheel-"; + private static HashedWheelTimer timer = null; + private static final ThreadPoolExecutor executor = + new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX)); + + private static final TimerIdempotent idempotent = new TimerIdempotent(); + + static { + timer = new HashedWheelTimer( + new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, + TimeUnit.MILLISECONDS, 512, true, -1, executor); + timer.start(); + } + + /** + * 定时任务批次日志添加时间轮 + * + * @param task 任务 + * @param delay 延迟时间 + */ + public static synchronized void registerWithJobTaskLog(Supplier> task, Duration delay) { + TimerTask timerTask = task.get(); + register(timerTask.idempotentKey(), timerTask, delay); + } + + public static synchronized void register(String idempotentKey, TimerTask task, Duration delay) { + + register(idempotentKey, hashedWheelTimer -> { + SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] idempotentKey:[{}]", delay, idempotentKey); + timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS); + }); + } + + public static synchronized void register(String idempotentKey, Consumer consumer) { + + if (!isExisted(idempotentKey)) { + try { + consumer.accept(timer); + idempotent.set(idempotentKey); + } catch (Exception e) { + SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", idempotentKey, e); + } + } + } + + public static boolean isExisted(String idempotentKey) { + return idempotent.isExist(idempotentKey); + } + + public static void clearCache(String idempotentKey) { + idempotent.clear(idempotentKey); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java index c8dcf3d97..733466ac1 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/JobTimerWheel.java @@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.timer; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; -import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent; +import com.aizuda.snailjob.server.common.idempotent.TimerIdempotent; import io.netty.util.HashedWheelTimer; import org.springframework.scheduling.concurrent.CustomizableThreadFactory;