feat(1.5.0):增加日志查询时间轮,使用时间轮完成延迟查询功能

This commit is contained in:
srzou 2025-03-24 14:46:43 +08:00 committed by opensnail
parent 319275b9b8
commit 383f21ff8d
5 changed files with 160 additions and 23 deletions

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.server.job.task.support.idempotent;
package com.aizuda.snailjob.server.common.idempotent;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.google.common.cache.Cache;

View File

@ -9,6 +9,8 @@ import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter;
import com.aizuda.snailjob.server.common.timer.JobTaskLogTimerTask;
import com.aizuda.snailjob.server.common.timer.LogTimerWheel;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
@ -22,13 +24,13 @@ import com.google.common.collect.Lists;
import jakarta.websocket.Session;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
@ -41,14 +43,12 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL
* @FilenameDatabaseLogService
* @since 1.5.0
*/
@Slf4j
@RequiredArgsConstructor
public class DatabaseLogService implements LogService {
private static final Long DELAY_MILLS = 5000L;
private final JobLogMessageMapper jobLogMessageMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
// 创建一个调度线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 保存单调日志
@ -123,11 +123,8 @@ public class DatabaseLogService implements LogService {
jobLogResponseVO.setNextStartId(queryVO.getStartId());
jobLogResponseVO.setFromIndex(0);
session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
System.out.println("结束了");
return;
} else {
// 如果没有完成就等五秒执行
System.out.println("异步执行");
scheduleNextAttempt(queryVO, session);
return;
}
@ -191,15 +188,22 @@ public class DatabaseLogService implements LogService {
}
/**
* 使用时间轮5秒再进行日志查询
*
* @param queryVO
* @param session
*/
private void scheduleNextAttempt(JobLogQueryVO queryVO, Session session) {
scheduler.schedule(() -> {
try {
// 再次调用查询
getJobLogPage(queryVO, session);
} catch (IOException e) {
e.printStackTrace();
if (TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS));
}
});
} else {
LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS));
}
// 5秒后执行
}, 5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,54 @@
package com.aizuda.snailjob.server.common.timer;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import io.netty.util.Timeout;
import jakarta.websocket.Session;
import lombok.AllArgsConstructor;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.Map;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.common.timer
* @Projectsnail-job
* @Date2025/3/24 14:08
* @FilenameJobTaskLogTimerTask
* @since 1.5.0
*/
@AllArgsConstructor
public class JobTaskLogTimerTask implements TimerTask<String> {
private static final String SID = "sid";
private static final String SCENE = "scene";
private static final String IDEMPOTENT_KEY_PREFIX = "jobTaskLog_{0}_{1}_{2}";
private JobLogQueryVO logQueryVO;
private Session session;
@Override
public void run(final Timeout timeout) throws Exception {
SnailJobLog.LOCAL.debug("开始执行定时任务日志查询. 当前时间:[{}] jobTaskId:[{}]", LocalDateTime.now(), logQueryVO.getTaskBatchId());
try {
LogTimerWheel.clearCache(idempotentKey());
LogService logService = SnailSpringContext.getBean(LogService.class);
logService.getJobLogPage(logQueryVO, session);
} catch (Exception e) {
SnailJobLog.LOCAL.error("定时任务日志查询执行失败", e);
}
}
@Override
public String idempotentKey() {
Map<String, Object> userProperties = session.getUserProperties();
String sid = (String) userProperties.get(SID);
String scene = (String) userProperties.get(SCENE);
Long jobTaskId = logQueryVO.getTaskBatchId();
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, scene, jobTaskId);
}
}

View File

@ -0,0 +1,79 @@
package com.aizuda.snailjob.server.common.timer;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.idempotent.TimerIdempotent;
import io.netty.util.HashedWheelTimer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.job.task.support.timer
* @Projectsnail-job
* @Date2025/3/24 14:00
* @FilenameLogTimerWheel
* @since 1.5.0
*/
public class LogTimerWheel {
private static final int TICK_DURATION = 100;
private static final String THREAD_NAME_PREFIX = "log-timer-wheel-";
private static HashedWheelTimer timer = null;
private static final ThreadPoolExecutor executor =
new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX));
private static final TimerIdempotent idempotent = new TimerIdempotent();
static {
timer = new HashedWheelTimer(
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION,
TimeUnit.MILLISECONDS, 512, true, -1, executor);
timer.start();
}
/**
* 定时任务批次日志添加时间轮
*
* @param task 任务
* @param delay 延迟时间
*/
public static synchronized void registerWithJobTaskLog(Supplier<TimerTask<String>> task, Duration delay) {
TimerTask<String> timerTask = task.get();
register(timerTask.idempotentKey(), timerTask, delay);
}
public static synchronized void register(String idempotentKey, TimerTask<String> task, Duration delay) {
register(idempotentKey, hashedWheelTimer -> {
SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] idempotentKey:[{}]", delay, idempotentKey);
timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS);
});
}
public static synchronized void register(String idempotentKey, Consumer<HashedWheelTimer> consumer) {
if (!isExisted(idempotentKey)) {
try {
consumer.accept(timer);
idempotent.set(idempotentKey);
} catch (Exception e) {
SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", idempotentKey, e);
}
}
}
public static boolean isExisted(String idempotentKey) {
return idempotent.isExist(idempotentKey);
}
public static void clearCache(String idempotentKey) {
idempotent.clear(idempotentKey);
}
}

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent;
import com.aizuda.snailjob.server.common.idempotent.TimerIdempotent;
import io.netty.util.HashedWheelTimer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;