From 6d62ba29da54cffcec4870732092bd4f8d47f201 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Wed, 16 Apr 2025 23:20:55 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.5.0-beta1):=20=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=8E=A8=E9=80=81=E6=94=B9=E4=B8=BA=E5=88=86=E9=A1=B5=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/log/JobLogMessageAccess.java | 8 +- .../access/log/config/LogConfig.java | 4 +- .../dataobject/log/LogPageQueryDO.java | 4 +- .../server/common/vo/JobLogQueryVO.java | 3 +- .../web/controller/JobLogController.java | 30 --- .../web/listener/WsRequestListener.java | 4 +- .../server/web/service/JobLogService.java | 5 +- .../web/service/impl/JobLogServiceImpl.java | 206 +++++------------- .../server/web/timer/JobTaskLogTimerTask.java | 2 +- 9 files changed, 63 insertions(+), 203 deletions(-) delete mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java index 6d2d7c089..1021a97bf 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java @@ -31,7 +31,6 @@ import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType; @RequiredArgsConstructor public class JobLogMessageAccess implements JobLogAccess { private final JobLogMessageMapper jobLogMessageMapper; - private final JobTaskBatchMapper jobTaskBatchMapper; @Override public boolean supports(String operationType) { @@ -55,12 +54,13 @@ public class JobLogMessageAccess implements JobLogAccess { LogPageQueryDO logPageQueryDO = (LogPageQueryDO) queryDO; PageDTO selectPage = jobLogMessageMapper.selectPage( - new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()), + new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize(), logPageQueryDO.isSearchCount()), new LambdaQueryWrapper() - .ge(JobLogMessage::getId, logPageQueryDO.getStartId()) + .ge(JobLogMessage::getRealTime, logPageQueryDO.getStartRealTime()) .eq(JobLogMessage::getTaskBatchId, logPageQueryDO.getTaskBatchId()) .eq(JobLogMessage::getTaskId, logPageQueryDO.getTaskId()) - .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime)); + .orderByAsc(JobLogMessage::getId) + .orderByAsc(JobLogMessage::getRealTime)); List records = selectPage.getRecords(); PageResponseDO responseDO = new PageResponseDO<>(); diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java index d88b9ba4a..eeacbd802 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java @@ -18,8 +18,8 @@ public class LogConfig { @ConditionalOnMissingBean @Bean - public JobLogAccess defaultJobLogAccess(JobLogMessageMapper jobLogMessageMapper, JobTaskBatchMapper jobTaskBatchMapper) { - return new JobLogMessageAccess(jobLogMessageMapper, jobTaskBatchMapper); + public JobLogAccess defaultJobLogAccess(JobLogMessageMapper jobLogMessageMapper) { + return new JobLogMessageAccess(jobLogMessageMapper); } @ConditionalOnMissingBean diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java index 5038bae1e..22af52df5 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java @@ -15,9 +15,9 @@ import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @Data public class LogPageQueryDO extends PageQueryDO { - private Long startId; + private Long startRealTime; private Long jobId; private Long taskBatchId; private Long taskId; - private Integer fromIndex; + private boolean searchCount; } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java index af7376309..de63afaad 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java @@ -12,10 +12,9 @@ import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @Data public class JobLogQueryVO extends BaseQueryVO { - private Long startId; private Long jobId; private Long taskBatchId; private Long taskId; - private Integer fromIndex; private String sid; + private Long startRealTime; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java deleted file mode 100644 index 2389d5178..000000000 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.aizuda.snailjob.server.web.controller; - -import com.aizuda.snailjob.server.web.annotation.LoginRequired; -import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; -import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; -import com.aizuda.snailjob.server.web.service.JobLogService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -/** - * @author: opensnail - * @date : 2023-10-12 09:56 - * @since : 2.4.0 - */ -@RestController -@RequestMapping("/job") -public class JobLogController { - - @Autowired - private JobLogService jobLogService; - - @GetMapping("/log/list") - @LoginRequired - public JobLogResponseVO getJobLogPage(JobLogQueryVO jobQueryVO) { - return jobLogService.getJobLogPage(jobQueryVO); - } - -} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java index 395b906e8..ed6440b43 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java @@ -40,9 +40,9 @@ public class WsRequestListener { String message = requestVO.getMessage(); JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class); jobLogQueryVO.setSid(requestVO.getSid()); - jobLogQueryVO.setStartId(0L); + jobLogQueryVO.setStartRealTime(0L); try { - jobLogService.getJobLogPageV2(jobLogQueryVO); + jobLogService.getJobLogPage(jobLogQueryVO); } catch (Exception e) { log.warn("send log error", e); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java index be60b5b15..858b0f470 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java @@ -1,7 +1,6 @@ package com.aizuda.snailjob.server.web.service; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; -import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; /** * @author: opensnail @@ -10,8 +9,6 @@ import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; */ public interface JobLogService { - JobLogResponseVO getJobLogPage(JobLogQueryVO jobQueryVO); - - void getJobLogPageV2(JobLogQueryVO jobQueryVO); + void getJobLogPage(JobLogQueryVO jobQueryVO); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java index 9f388c4b5..6ad178d69 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java @@ -1,39 +1,26 @@ package com.aizuda.snailjob.server.web.service.impl; -import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.constant.LogFieldConstants; -import com.aizuda.snailjob.server.common.dto.PartitionTask; import com.aizuda.snailjob.server.web.timer.JobTaskLogTimerTask; import com.aizuda.snailjob.server.web.timer.LogTimerWheel; -import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; -import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; -import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask; import com.aizuda.snailjob.server.web.model.event.WsSendEvent; import com.aizuda.snailjob.server.web.service.JobLogService; -import com.aizuda.snailjob.server.web.service.convert.LogMessagePartitionTaskConverter; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import com.google.common.base.Predicate; -import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; -import java.util.function.Consumer; -import java.util.stream.Collectors; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; @@ -46,167 +33,74 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL @RequiredArgsConstructor public class JobLogServiceImpl implements JobLogService { private static final Long DELAY_MILLS = 5000L; - private final JobLogMessageMapper jobLogMessageMapper; private final JobTaskBatchMapper jobTaskBatchMapper; private final AccessTemplate accessTemplate; @Override - public JobLogResponseVO getJobLogPage(final JobLogQueryVO queryVO) { - - PageDTO pageDTO = new PageDTO<>(1, queryVO.getSize()); - - PageDTO selectPage = jobLogMessageMapper.selectPage(pageDTO, - new LambdaQueryWrapper() - .select(JobLogMessage::getId, JobLogMessage::getLogNum) - .ge(JobLogMessage::getId, queryVO.getStartId()) - .eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()) -// .ge(JobLogMessage::getJobId, queryVO.getJobId()) - .eq(JobLogMessage::getTaskId, queryVO.getTaskId()) - .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime)); - List records = selectPage.getRecords(); - if (CollUtil.isEmpty(records)) { - - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne( - new LambdaQueryWrapper() - .eq(JobTaskBatch::getId, queryVO.getTaskBatchId()) - ); - - JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); - - if (Objects.isNull(jobTaskBatch) - || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && - jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now())) - - ) { - jobLogResponseVO.setFinished(Boolean.TRUE); - } - - jobLogResponseVO.setNextStartId(queryVO.getStartId()); - jobLogResponseVO.setFromIndex(0); - return jobLogResponseVO; - } - - Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0); - JobLogMessage firstRecord = records.get(0); - List ids = Lists.newArrayList(firstRecord.getId()); - int total = firstRecord.getLogNum() - fromIndex; - for (int i = 1; i < records.size(); i++) { - JobLogMessage record = records.get(i); - if (total + record.getLogNum() > queryVO.getSize()) { - break; - } - - total += record.getLogNum(); - ids.add(record.getId()); - } - - long nextStartId = 0; - List> messages = Lists.newArrayList(); - List jobLogMessages = jobLogMessageMapper.selectList( - new LambdaQueryWrapper() - .in(JobLogMessage::getId, ids) - .orderByAsc(JobLogMessage::getId) - .orderByAsc(JobLogMessage::getRealTime) - ); - - for (final JobLogMessage jobLogMessage : jobLogMessages) { - - List> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class); - int size = originalList.size() - fromIndex; - List> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) - .collect(Collectors.toList()); - - if (messages.size() + size >= queryVO.getSize()) { - messages.addAll(pageList); - nextStartId = jobLogMessage.getId(); - fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1; - break; - } - - messages.addAll(pageList); - nextStartId = jobLogMessage.getId() + 1; - fromIndex = 0; - } - - messages = messages.stream() - .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) - .collect(Collectors.toList()); - - JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); - jobLogResponseVO.setMessage(messages); - jobLogResponseVO.setNextStartId(nextStartId); - jobLogResponseVO.setFromIndex(fromIndex); - return jobLogResponseVO; - } - - @Override - public void getJobLogPageV2(JobLogQueryVO queryVO) { + public void getJobLogPage(JobLogQueryVO queryVO) { String sid = queryVO.getSid(); LogPageQueryDO pageQueryDO = new LogPageQueryDO(); pageQueryDO.setPage(1); pageQueryDO.setSize(queryVO.getSize()); pageQueryDO.setTaskBatchId(queryVO.getTaskBatchId()); pageQueryDO.setTaskId(queryVO.getTaskId()); - pageQueryDO.setStartId(queryVO.getStartId()); - PartitionTaskUtils.process(startId -> { - // 记录下次起始时间 - queryVO.setStartId(startId); - pageQueryDO.setStartId(startId); - // 拉去数据 - PageResponseDO pageResponseDO = accessTemplate.getJobLogMessageAccess() - .listPage(pageQueryDO); - List rows = pageResponseDO.getRows(); - return LogMessagePartitionTaskConverter.INSTANCE.toJobLogMessagePartitionTasks(rows); - }, new Consumer<>() { - @Override - public void accept(List partitionTasks) { + pageQueryDO.setStartRealTime(queryVO.getStartRealTime()); + pageQueryDO.setSearchCount(true); + PageResponseDO pageResponseDO = accessTemplate.getJobLogMessageAccess() + .listPage(pageQueryDO); - List partitionTaskList = (List) partitionTasks; + long total = pageResponseDO.getTotal(); - for (JobLogMessagePartitionTask logMessagePartitionTask : partitionTaskList) { - // 发生日志内容到前端 - String message = logMessagePartitionTask.getMessage(); - List> logContents = JsonUtil.parseObject(message, List.class); - logContents = logContents.stream() - .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) - .toList(); - for (Map logContent : logContents) { - // send发消息 - WsSendEvent sendEvent = new WsSendEvent(this); - sendEvent.setSid(sid); - sendEvent.setMessage(JsonUtil.toJsonString(logContent)); - SnailSpringContext.getContext().publishEvent(sendEvent); - } - } + int totalPage = (int) ((total + queryVO.getSize() - 1) / queryVO.getSize()); - } - }, new Predicate<>() { - @Override - public boolean apply(List rows) { - - // 决策是否完成 - if (!CollUtil.isEmpty(rows)) { - return false; - } - - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne( - new LambdaQueryWrapper().eq(JobTaskBatch::getId, queryVO.getTaskBatchId())); - - if (Objects.isNull(jobTaskBatch) - || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && - jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) { - // 发生完成标识 + Long lastRealTime = null; + for (int i = 1; i < totalPage; i++) { + for (JobLogMessageDO jobLogMessageDO : pageResponseDO.getRows()) { + // 循环覆盖,最后一个肯定是最大的 + lastRealTime = jobLogMessageDO.getRealTime(); + // 发生日志内容到前端 + String message = jobLogMessageDO.getMessage(); + List> logContents = JsonUtil.parseObject(message, List.class); + logContents = logContents.stream() + .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) + .toList(); + for (Map logContent : logContents) { + // send发消息 WsSendEvent sendEvent = new WsSendEvent(this); - sendEvent.setMessage("END"); sendEvent.setSid(sid); + sendEvent.setMessage(JsonUtil.toJsonString(logContent)); SnailSpringContext.getContext().publishEvent(sendEvent); - return true; - } else { - scheduleNextAttempt(queryVO, sid); - return true; } } - }, queryVO.getStartId()); + + // 继续查询下一页 + pageQueryDO.setSearchCount(false); + pageQueryDO.setPage((i - 1) * queryVO.getSize()); + pageResponseDO = accessTemplate.getJobLogMessageAccess() + .listPage(pageQueryDO); + } + + + // 这里判断是否继续查询 + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne( + new LambdaQueryWrapper().eq(JobTaskBatch::getId, queryVO.getTaskBatchId())); + + // 结束查询 + if (Objects.isNull(jobTaskBatch) + || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && + jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) { + // 发生完成标识 + WsSendEvent sendEvent = new WsSendEvent(this); + sendEvent.setMessage("END"); + sendEvent.setSid(sid); + SnailSpringContext.getContext().publishEvent(sendEvent); + } else { + // 覆盖作为下次查询的起始条件 + pageQueryDO.setStartRealTime(lastRealTime); + // 继续查询 + scheduleNextAttempt(queryVO, sid); + } + } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java index 117e623bf..8c28946dd 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java @@ -33,7 +33,7 @@ public class JobTaskLogTimerTask implements TimerTask { try { LogTimerWheel.clearCache(idempotentKey()); JobLogService logService = SnailSpringContext.getBean(JobLogService.class); - logService.getJobLogPageV2(logQueryVO); + logService.getJobLogPage(logQueryVO); } catch (Exception e) { SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e); }