From 5d0675536d2c2c06a1d7886323956abe90fb63b7 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sat, 19 Apr 2025 16:38:18 +0800 Subject: [PATCH] =?UTF-8?q?fix(1.5.0-beta1):=20=E4=BC=98=E5=8C=96=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E6=97=A5=E5=BF=97=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../access/log/RetryTaskLogMessageAccess.java | 11 +- .../log/RetryTaskLogMessageQueryDO.java | 4 +- .../web/controller/RetryTaskController.java | 6 - .../web/listener/WsRequestListener.java | 4 +- .../request/RetryTaskLogMessageQueryVO.java | 8 +- .../server/web/service/RetryTaskService.java | 5 +- .../service/impl/RetryTaskServiceImpl.java | 183 +++++------------- .../web/timer/RetryTaskLogTimerTask.java | 3 +- 8 files changed, 61 insertions(+), 163 deletions(-) diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java index d4e2d2ac7..a398afa3d 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java @@ -7,12 +7,12 @@ import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.List; @@ -51,14 +51,15 @@ public class RetryTaskLogMessageAccess implements RetryLogAccess listPage(PageQueryDO queryDO) { RetryTaskLogMessageQueryDO logPageQueryDO = (RetryTaskLogMessageQueryDO) queryDO; PageDTO selectPage = retryTaskLogMessageMapper.selectPage( - new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()), + new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize(), logPageQueryDO.isSearchCount()), new LambdaQueryWrapper() - .ge(RetryTaskLogMessage::getId, logPageQueryDO.getStartId()) + .gt(RetryTaskLogMessage::getRealTime, logPageQueryDO.getStartRealTime()) .eq(RetryTaskLogMessage::getRetryTaskId, logPageQueryDO.getRetryTaskId()) - .orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)); + .orderByAsc(RetryTaskLogMessage::getId) + .orderByAsc(RetryTaskLogMessage::getRealTime)); List records = selectPage.getRecords(); PageResponseDO responseDO = new PageResponseDO<>(); diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageQueryDO.java index de6591535..7c7f9e4a6 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageQueryDO.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageQueryDO.java @@ -11,9 +11,9 @@ public class RetryTaskLogMessageQueryDO extends PageQueryDO { private Long retryTaskId; - private Long startId; + private Long startRealTime; - private Integer fromIndex; + private boolean searchCount; private String sid; } \ No newline at end of file diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java index 0e540378d..58e69d4e2 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/RetryTaskController.java @@ -33,12 +33,6 @@ public class RetryTaskController { return retryTaskService.getRetryTaskLogPage(queryVO); } - @LoginRequired - @GetMapping("/message/list") - public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) { - return retryTaskService.getRetryTaskLogMessagePage(queryVO); - } - @LoginRequired @GetMapping("{id}") public RetryTaskResponseVO getRetryTaskById(@PathVariable("id") Long id) { diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java index ed6440b43..a156aec6a 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java @@ -61,9 +61,9 @@ public class WsRequestListener { String message = requestVO.getMessage(); RetryTaskLogMessageQueryVO retryTaskLogMessageQueryVO = JsonUtil.parseObject(message, RetryTaskLogMessageQueryVO.class); retryTaskLogMessageQueryVO.setSid(requestVO.getSid()); - retryTaskLogMessageQueryVO.setStartId(0L); + retryTaskLogMessageQueryVO.setStartRealTime(0L); try { - retryTaskService.getRetryTaskLogMessagePageV2(retryTaskLogMessageQueryVO); + retryTaskService.getRetryTaskLogMessagePage(retryTaskLogMessageQueryVO); } catch (Exception e) { log.warn("send log error", e); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java index 218d75def..918773c2d 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java @@ -13,12 +13,8 @@ import lombok.EqualsAndHashCode; public class RetryTaskLogMessageQueryVO extends BaseQueryVO { private String groupName; - private Long retryTaskId; - - private Long startId; - - private Integer fromIndex; - + private Long startRealTime; private String sid; + } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java index 412724960..c40c89672 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java @@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.web.service; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; -import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; import java.util.List; @@ -18,9 +17,7 @@ public interface RetryTaskService { PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO); - RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); - - void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO); + void getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); RetryTaskResponseVO getRetryTaskById(Long id); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index 6bf81449a..0211b2e10 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -96,158 +96,69 @@ public class RetryTaskServiceImpl implements RetryTaskService { } @Override - public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage( - RetryTaskLogMessageQueryVO queryVO) { - if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) { - RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO(); - jobLogResponseVO.setNextStartId(0L); - jobLogResponseVO.setFromIndex(0); - return jobLogResponseVO; - } - - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); - - PageDTO pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); - PageDTO selectPage = retryTaskLogMessageMapper.selectPage(pageDTO, - new LambdaQueryWrapper() - .select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum) - .ge(RetryTaskLogMessage::getId, queryVO.getStartId()) - .eq(RetryTaskLogMessage::getNamespaceId, namespaceId) - .eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId()) - .eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName()) - .orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime) - .orderByDesc(RetryTaskLogMessage::getCreateDt)); - - List records = selectPage.getRecords(); - - if (CollUtil.isEmpty(records)) { - return new RetryTaskLogMessageResponseVO() - .setFinished(Boolean.TRUE) - .setNextStartId(queryVO.getStartId()) - .setFromIndex(0); - } - - Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0); - RetryTaskLogMessage firstRecord = records.get(0); - List ids = Lists.newArrayList(firstRecord.getId()); - int total = firstRecord.getLogNum() - fromIndex; - for (int i = 1; i < records.size(); i++) { - RetryTaskLogMessage record = records.get(i); - if (total + record.getLogNum() > queryVO.getSize()) { - break; - } - - total += record.getLogNum(); - ids.add(record.getId()); - } - - long nextStartId = 0; - List> messages = Lists.newArrayList(); - List jobLogMessages = retryTaskLogMessageMapper.selectList( - new LambdaQueryWrapper() - .in(RetryTaskLogMessage::getId, ids) - .orderByAsc(RetryTaskLogMessage::getId) - .orderByAsc(RetryTaskLogMessage::getRealTime) - ); - - for (final RetryTaskLogMessage retryTaskLogMessage : jobLogMessages) { - - List> originalList = JsonUtil.parseObject(retryTaskLogMessage.getMessage(), List.class); - int size = originalList.size() - fromIndex; - List> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) - .collect(Collectors.toList()); - if (messages.size() + size >= queryVO.getSize()) { - messages.addAll(pageList); - nextStartId = retryTaskLogMessage.getId(); - fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1; - break; - } - - messages.addAll(pageList); - nextStartId = retryTaskLogMessage.getId() + 1; - fromIndex = 0; - } - - messages = messages.stream() - .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) - .collect(Collectors.toList()); - - RetryTaskLogMessageResponseVO responseVO = new RetryTaskLogMessageResponseVO(); - responseVO.setMessage(messages); - responseVO.setNextStartId(nextStartId); - responseVO.setFromIndex(fromIndex); - - return responseVO; - } - - @Override - public void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO) { + public void getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) { String sid = queryVO.getSid(); RetryTaskLogMessageQueryDO pageQueryDO = new RetryTaskLogMessageQueryDO(); pageQueryDO.setPage(1); pageQueryDO.setSize(queryVO.getSize()); pageQueryDO.setRetryTaskId(queryVO.getRetryTaskId()); - pageQueryDO.setStartId(queryVO.getStartId()); - PartitionTaskUtils.process(startId -> { - // 记录下次起始时间 - queryVO.setStartId(startId); - pageQueryDO.setStartId(startId); - // 拉去数据 - PageResponseDO pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess() - .listPage(pageQueryDO); - List rows = pageResponseDO.getRows(); - return LogMessagePartitionTaskConverter.INSTANCE.toLogMessagePartitionTask(rows); - }, new Consumer<>() { - @Override - public void accept(List partitionTasks) { + pageQueryDO.setStartRealTime(queryVO.getStartRealTime()); + pageQueryDO.setSearchCount(true); + // 拉取数据 + PageResponseDO pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess() + .listPage(pageQueryDO); - List partitionTaskList = (List) partitionTasks; + long total = pageResponseDO.getTotal(); - for (LogMessagePartitionTask logMessagePartitionTask : partitionTaskList) { - // 发生日志内容到前端 - String message = logMessagePartitionTask.getMessage(); - List> logContents = JsonUtil.parseObject(message, List.class); - logContents = logContents.stream() - .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) - .toList(); - for (Map logContent : logContents) { - // send发消息 - WsSendEvent sendEvent = new WsSendEvent(this); - sendEvent.setSid(sid); - sendEvent.setMessage(JsonUtil.toJsonString(logContent)); - SnailSpringContext.getContext().publishEvent(sendEvent); - } - } + int totalPage = (int) ((total + queryVO.getSize() - 1) / queryVO.getSize()); - } - }, new Predicate<>() { - @Override - public boolean apply(List rows) { + Long lastRealTime = 0L; - // 决策是否完成 - if (!CollUtil.isEmpty(rows)) { - return false; - } + if (0 == totalPage && + (null != pageQueryDO.getStartRealTime() && 0 != pageQueryDO.getStartRealTime())){ + lastRealTime = pageQueryDO.getStartRealTime(); + } - RetryTask retryTask = retryTaskMapper.selectOne( - new LambdaQueryWrapper().eq(RetryTask::getId, queryVO.getRetryTaskId())); - - if (Objects.isNull(retryTask) - || (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus()) && - retryTask.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) { - // 发生完成标识 + for (int i = 1; i <= totalPage;) { + for (RetryTaskLogMessageDO retryTaskLogMessageDO : pageResponseDO.getRows()) { + // 发生日志内容到前端 + String message = retryTaskLogMessageDO.getMessage(); + List> logContents = JsonUtil.parseObject(message, List.class); + logContents = logContents.stream() + .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) + .toList(); + for (Map logContent : logContents) { + // send发消息 WsSendEvent sendEvent = new WsSendEvent(this); - sendEvent.setMessage("END"); sendEvent.setSid(sid); + sendEvent.setMessage(JsonUtil.toJsonString(logContent)); SnailSpringContext.getContext().publishEvent(sendEvent); - return true; - } else { - scheduleNextAttempt(queryVO, sid); - return true; } } - }, queryVO.getStartId()); + // 继续查询下一页 + pageQueryDO.setSearchCount(false); + pageQueryDO.setPage(++i); + pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess() + .listPage(pageQueryDO); + } + + RetryTask retryTask = retryTaskMapper.selectOne( + new LambdaQueryWrapper().eq(RetryTask::getId, queryVO.getRetryTaskId())); + + if (Objects.isNull(retryTask) + || (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus()) && + retryTask.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) { + // 发生完成标识 + WsSendEvent sendEvent = new WsSendEvent(this); + sendEvent.setMessage("END"); + sendEvent.setSid(sid); + SnailSpringContext.getContext().publishEvent(sendEvent); + } else { + // 覆盖作为下次查询的起始条件 + queryVO.setStartRealTime(lastRealTime); + scheduleNextAttempt(queryVO, sid); + } } /** diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/RetryTaskLogTimerTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/RetryTaskLogTimerTask.java index 6264d2b1c..ebccfa4d1 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/RetryTaskLogTimerTask.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/RetryTaskLogTimerTask.java @@ -33,7 +33,7 @@ public class RetryTaskLogTimerTask implements TimerTask { try { LogTimerWheel.clearCache(idempotentKey()); RetryTaskService logService = SnailSpringContext.getBean(RetryTaskService.class); - logService.getRetryTaskLogMessagePageV2(logQueryVO); + logService.getRetryTaskLogMessagePage(logQueryVO); } catch (Exception e) { SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e); } @@ -41,7 +41,6 @@ public class RetryTaskLogTimerTask implements TimerTask { @Override public String idempotentKey() { - Long taskId = logQueryVO.getRetryTaskId(); return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, taskId); }