feat(1.5.0-beta1): 日志推送改为分页查询
This commit is contained in:
parent
aadd0c3c03
commit
6d62ba29da
@ -31,7 +31,6 @@ import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType;
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class JobLogMessageAccess implements JobLogAccess<JobLogMessageDO> {
|
public class JobLogMessageAccess implements JobLogAccess<JobLogMessageDO> {
|
||||||
private final JobLogMessageMapper jobLogMessageMapper;
|
private final JobLogMessageMapper jobLogMessageMapper;
|
||||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(String operationType) {
|
public boolean supports(String operationType) {
|
||||||
@ -55,12 +54,13 @@ public class JobLogMessageAccess implements JobLogAccess<JobLogMessageDO> {
|
|||||||
LogPageQueryDO logPageQueryDO = (LogPageQueryDO) queryDO;
|
LogPageQueryDO logPageQueryDO = (LogPageQueryDO) queryDO;
|
||||||
|
|
||||||
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(
|
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(
|
||||||
new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()),
|
new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize(), logPageQueryDO.isSearchCount()),
|
||||||
new LambdaQueryWrapper<JobLogMessage>()
|
new LambdaQueryWrapper<JobLogMessage>()
|
||||||
.ge(JobLogMessage::getId, logPageQueryDO.getStartId())
|
.ge(JobLogMessage::getRealTime, logPageQueryDO.getStartRealTime())
|
||||||
.eq(JobLogMessage::getTaskBatchId, logPageQueryDO.getTaskBatchId())
|
.eq(JobLogMessage::getTaskBatchId, logPageQueryDO.getTaskBatchId())
|
||||||
.eq(JobLogMessage::getTaskId, logPageQueryDO.getTaskId())
|
.eq(JobLogMessage::getTaskId, logPageQueryDO.getTaskId())
|
||||||
.orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
|
.orderByAsc(JobLogMessage::getId)
|
||||||
|
.orderByAsc(JobLogMessage::getRealTime));
|
||||||
List<JobLogMessage> records = selectPage.getRecords();
|
List<JobLogMessage> records = selectPage.getRecords();
|
||||||
|
|
||||||
PageResponseDO<JobLogMessageDO> responseDO = new PageResponseDO<>();
|
PageResponseDO<JobLogMessageDO> responseDO = new PageResponseDO<>();
|
||||||
|
@ -18,8 +18,8 @@ public class LogConfig {
|
|||||||
|
|
||||||
@ConditionalOnMissingBean
|
@ConditionalOnMissingBean
|
||||||
@Bean
|
@Bean
|
||||||
public JobLogAccess<JobLogMessageDO> defaultJobLogAccess(JobLogMessageMapper jobLogMessageMapper, JobTaskBatchMapper jobTaskBatchMapper) {
|
public JobLogAccess<JobLogMessageDO> defaultJobLogAccess(JobLogMessageMapper jobLogMessageMapper) {
|
||||||
return new JobLogMessageAccess(jobLogMessageMapper, jobTaskBatchMapper);
|
return new JobLogMessageAccess(jobLogMessageMapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ConditionalOnMissingBean
|
@ConditionalOnMissingBean
|
||||||
|
@ -15,9 +15,9 @@ import lombok.EqualsAndHashCode;
|
|||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Data
|
@Data
|
||||||
public class LogPageQueryDO extends PageQueryDO {
|
public class LogPageQueryDO extends PageQueryDO {
|
||||||
private Long startId;
|
private Long startRealTime;
|
||||||
private Long jobId;
|
private Long jobId;
|
||||||
private Long taskBatchId;
|
private Long taskBatchId;
|
||||||
private Long taskId;
|
private Long taskId;
|
||||||
private Integer fromIndex;
|
private boolean searchCount;
|
||||||
}
|
}
|
||||||
|
@ -12,10 +12,9 @@ import lombok.EqualsAndHashCode;
|
|||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Data
|
@Data
|
||||||
public class JobLogQueryVO extends BaseQueryVO {
|
public class JobLogQueryVO extends BaseQueryVO {
|
||||||
private Long startId;
|
|
||||||
private Long jobId;
|
private Long jobId;
|
||||||
private Long taskBatchId;
|
private Long taskBatchId;
|
||||||
private Long taskId;
|
private Long taskId;
|
||||||
private Integer fromIndex;
|
|
||||||
private String sid;
|
private String sid;
|
||||||
|
private Long startRealTime;
|
||||||
}
|
}
|
||||||
|
@ -1,30 +0,0 @@
|
|||||||
package com.aizuda.snailjob.server.web.controller;
|
|
||||||
|
|
||||||
import com.aizuda.snailjob.server.web.annotation.LoginRequired;
|
|
||||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
|
||||||
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
|
|
||||||
import com.aizuda.snailjob.server.web.service.JobLogService;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author: opensnail
|
|
||||||
* @date : 2023-10-12 09:56
|
|
||||||
* @since : 2.4.0
|
|
||||||
*/
|
|
||||||
@RestController
|
|
||||||
@RequestMapping("/job")
|
|
||||||
public class JobLogController {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private JobLogService jobLogService;
|
|
||||||
|
|
||||||
@GetMapping("/log/list")
|
|
||||||
@LoginRequired
|
|
||||||
public JobLogResponseVO getJobLogPage(JobLogQueryVO jobQueryVO) {
|
|
||||||
return jobLogService.getJobLogPage(jobQueryVO);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -40,9 +40,9 @@ public class WsRequestListener {
|
|||||||
String message = requestVO.getMessage();
|
String message = requestVO.getMessage();
|
||||||
JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class);
|
JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class);
|
||||||
jobLogQueryVO.setSid(requestVO.getSid());
|
jobLogQueryVO.setSid(requestVO.getSid());
|
||||||
jobLogQueryVO.setStartId(0L);
|
jobLogQueryVO.setStartRealTime(0L);
|
||||||
try {
|
try {
|
||||||
jobLogService.getJobLogPageV2(jobLogQueryVO);
|
jobLogService.getJobLogPage(jobLogQueryVO);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("send log error", e);
|
log.warn("send log error", e);
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package com.aizuda.snailjob.server.web.service;
|
package com.aizuda.snailjob.server.web.service;
|
||||||
|
|
||||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
||||||
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: opensnail
|
* @author: opensnail
|
||||||
@ -10,8 +9,6 @@ import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
|
|||||||
*/
|
*/
|
||||||
public interface JobLogService {
|
public interface JobLogService {
|
||||||
|
|
||||||
JobLogResponseVO getJobLogPage(JobLogQueryVO jobQueryVO);
|
void getJobLogPage(JobLogQueryVO jobQueryVO);
|
||||||
|
|
||||||
void getJobLogPageV2(JobLogQueryVO jobQueryVO);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,39 +1,26 @@
|
|||||||
package com.aizuda.snailjob.server.web.service.impl;
|
package com.aizuda.snailjob.server.web.service.impl;
|
||||||
|
|
||||||
import cn.hutool.core.collection.CollUtil;
|
|
||||||
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
|
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
|
||||||
import com.aizuda.snailjob.server.common.dto.PartitionTask;
|
|
||||||
import com.aizuda.snailjob.server.web.timer.JobTaskLogTimerTask;
|
import com.aizuda.snailjob.server.web.timer.JobTaskLogTimerTask;
|
||||||
import com.aizuda.snailjob.server.web.timer.LogTimerWheel;
|
import com.aizuda.snailjob.server.web.timer.LogTimerWheel;
|
||||||
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
|
|
||||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
||||||
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
|
|
||||||
import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask;
|
|
||||||
import com.aizuda.snailjob.server.web.model.event.WsSendEvent;
|
import com.aizuda.snailjob.server.web.model.event.WsSendEvent;
|
||||||
import com.aizuda.snailjob.server.web.service.JobLogService;
|
import com.aizuda.snailjob.server.web.service.JobLogService;
|
||||||
import com.aizuda.snailjob.server.web.service.convert.LogMessagePartitionTaskConverter;
|
|
||||||
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO;
|
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
|
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO;
|
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
|
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
|
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.function.Consumer;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
|
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
|
||||||
|
|
||||||
@ -46,167 +33,74 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class JobLogServiceImpl implements JobLogService {
|
public class JobLogServiceImpl implements JobLogService {
|
||||||
private static final Long DELAY_MILLS = 5000L;
|
private static final Long DELAY_MILLS = 5000L;
|
||||||
private final JobLogMessageMapper jobLogMessageMapper;
|
|
||||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||||
private final AccessTemplate accessTemplate;
|
private final AccessTemplate accessTemplate;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JobLogResponseVO getJobLogPage(final JobLogQueryVO queryVO) {
|
public void getJobLogPage(JobLogQueryVO queryVO) {
|
||||||
|
|
||||||
PageDTO<JobLogMessage> pageDTO = new PageDTO<>(1, queryVO.getSize());
|
|
||||||
|
|
||||||
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO,
|
|
||||||
new LambdaQueryWrapper<JobLogMessage>()
|
|
||||||
.select(JobLogMessage::getId, JobLogMessage::getLogNum)
|
|
||||||
.ge(JobLogMessage::getId, queryVO.getStartId())
|
|
||||||
.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
|
|
||||||
// .ge(JobLogMessage::getJobId, queryVO.getJobId())
|
|
||||||
.eq(JobLogMessage::getTaskId, queryVO.getTaskId())
|
|
||||||
.orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
|
|
||||||
List<JobLogMessage> records = selectPage.getRecords();
|
|
||||||
if (CollUtil.isEmpty(records)) {
|
|
||||||
|
|
||||||
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(
|
|
||||||
new LambdaQueryWrapper<JobTaskBatch>()
|
|
||||||
.eq(JobTaskBatch::getId, queryVO.getTaskBatchId())
|
|
||||||
);
|
|
||||||
|
|
||||||
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
|
|
||||||
|
|
||||||
if (Objects.isNull(jobTaskBatch)
|
|
||||||
|| (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
|
|
||||||
jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))
|
|
||||||
|
|
||||||
) {
|
|
||||||
jobLogResponseVO.setFinished(Boolean.TRUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
jobLogResponseVO.setNextStartId(queryVO.getStartId());
|
|
||||||
jobLogResponseVO.setFromIndex(0);
|
|
||||||
return jobLogResponseVO;
|
|
||||||
}
|
|
||||||
|
|
||||||
Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0);
|
|
||||||
JobLogMessage firstRecord = records.get(0);
|
|
||||||
List<Long> ids = Lists.newArrayList(firstRecord.getId());
|
|
||||||
int total = firstRecord.getLogNum() - fromIndex;
|
|
||||||
for (int i = 1; i < records.size(); i++) {
|
|
||||||
JobLogMessage record = records.get(i);
|
|
||||||
if (total + record.getLogNum() > queryVO.getSize()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
total += record.getLogNum();
|
|
||||||
ids.add(record.getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
long nextStartId = 0;
|
|
||||||
List<Map<String, String>> messages = Lists.newArrayList();
|
|
||||||
List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectList(
|
|
||||||
new LambdaQueryWrapper<JobLogMessage>()
|
|
||||||
.in(JobLogMessage::getId, ids)
|
|
||||||
.orderByAsc(JobLogMessage::getId)
|
|
||||||
.orderByAsc(JobLogMessage::getRealTime)
|
|
||||||
);
|
|
||||||
|
|
||||||
for (final JobLogMessage jobLogMessage : jobLogMessages) {
|
|
||||||
|
|
||||||
List<Map<String, String>> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);
|
|
||||||
int size = originalList.size() - fromIndex;
|
|
||||||
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
if (messages.size() + size >= queryVO.getSize()) {
|
|
||||||
messages.addAll(pageList);
|
|
||||||
nextStartId = jobLogMessage.getId();
|
|
||||||
fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
messages.addAll(pageList);
|
|
||||||
nextStartId = jobLogMessage.getId() + 1;
|
|
||||||
fromIndex = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
messages = messages.stream()
|
|
||||||
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
|
|
||||||
jobLogResponseVO.setMessage(messages);
|
|
||||||
jobLogResponseVO.setNextStartId(nextStartId);
|
|
||||||
jobLogResponseVO.setFromIndex(fromIndex);
|
|
||||||
return jobLogResponseVO;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void getJobLogPageV2(JobLogQueryVO queryVO) {
|
|
||||||
String sid = queryVO.getSid();
|
String sid = queryVO.getSid();
|
||||||
LogPageQueryDO pageQueryDO = new LogPageQueryDO();
|
LogPageQueryDO pageQueryDO = new LogPageQueryDO();
|
||||||
pageQueryDO.setPage(1);
|
pageQueryDO.setPage(1);
|
||||||
pageQueryDO.setSize(queryVO.getSize());
|
pageQueryDO.setSize(queryVO.getSize());
|
||||||
pageQueryDO.setTaskBatchId(queryVO.getTaskBatchId());
|
pageQueryDO.setTaskBatchId(queryVO.getTaskBatchId());
|
||||||
pageQueryDO.setTaskId(queryVO.getTaskId());
|
pageQueryDO.setTaskId(queryVO.getTaskId());
|
||||||
pageQueryDO.setStartId(queryVO.getStartId());
|
pageQueryDO.setStartRealTime(queryVO.getStartRealTime());
|
||||||
PartitionTaskUtils.process(startId -> {
|
pageQueryDO.setSearchCount(true);
|
||||||
// 记录下次起始时间
|
PageResponseDO<JobLogMessageDO> pageResponseDO = accessTemplate.getJobLogMessageAccess()
|
||||||
queryVO.setStartId(startId);
|
.listPage(pageQueryDO);
|
||||||
pageQueryDO.setStartId(startId);
|
|
||||||
// 拉去数据
|
|
||||||
PageResponseDO<JobLogMessageDO> pageResponseDO = accessTemplate.getJobLogMessageAccess()
|
|
||||||
.listPage(pageQueryDO);
|
|
||||||
List<JobLogMessageDO> rows = pageResponseDO.getRows();
|
|
||||||
return LogMessagePartitionTaskConverter.INSTANCE.toJobLogMessagePartitionTasks(rows);
|
|
||||||
}, new Consumer<>() {
|
|
||||||
@Override
|
|
||||||
public void accept(List<? extends PartitionTask> partitionTasks) {
|
|
||||||
|
|
||||||
List<JobLogMessagePartitionTask> partitionTaskList = (List<JobLogMessagePartitionTask>) partitionTasks;
|
long total = pageResponseDO.getTotal();
|
||||||
|
|
||||||
for (JobLogMessagePartitionTask logMessagePartitionTask : partitionTaskList) {
|
int totalPage = (int) ((total + queryVO.getSize() - 1) / queryVO.getSize());
|
||||||
// 发生日志内容到前端
|
|
||||||
String message = logMessagePartitionTask.getMessage();
|
|
||||||
List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
|
|
||||||
logContents = logContents.stream()
|
|
||||||
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
|
|
||||||
.toList();
|
|
||||||
for (Map<String, String> logContent : logContents) {
|
|
||||||
// send发消息
|
|
||||||
WsSendEvent sendEvent = new WsSendEvent(this);
|
|
||||||
sendEvent.setSid(sid);
|
|
||||||
sendEvent.setMessage(JsonUtil.toJsonString(logContent));
|
|
||||||
SnailSpringContext.getContext().publishEvent(sendEvent);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
Long lastRealTime = null;
|
||||||
}, new Predicate<>() {
|
for (int i = 1; i < totalPage; i++) {
|
||||||
@Override
|
for (JobLogMessageDO jobLogMessageDO : pageResponseDO.getRows()) {
|
||||||
public boolean apply(List<? extends PartitionTask> rows) {
|
// 循环覆盖,最后一个肯定是最大的
|
||||||
|
lastRealTime = jobLogMessageDO.getRealTime();
|
||||||
// 决策是否完成
|
// 发生日志内容到前端
|
||||||
if (!CollUtil.isEmpty(rows)) {
|
String message = jobLogMessageDO.getMessage();
|
||||||
return false;
|
List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
|
||||||
}
|
logContents = logContents.stream()
|
||||||
|
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
|
||||||
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(
|
.toList();
|
||||||
new LambdaQueryWrapper<JobTaskBatch>().eq(JobTaskBatch::getId, queryVO.getTaskBatchId()));
|
for (Map<String, String> logContent : logContents) {
|
||||||
|
// send发消息
|
||||||
if (Objects.isNull(jobTaskBatch)
|
|
||||||
|| (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
|
|
||||||
jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) {
|
|
||||||
// 发生完成标识
|
|
||||||
WsSendEvent sendEvent = new WsSendEvent(this);
|
WsSendEvent sendEvent = new WsSendEvent(this);
|
||||||
sendEvent.setMessage("END");
|
|
||||||
sendEvent.setSid(sid);
|
sendEvent.setSid(sid);
|
||||||
|
sendEvent.setMessage(JsonUtil.toJsonString(logContent));
|
||||||
SnailSpringContext.getContext().publishEvent(sendEvent);
|
SnailSpringContext.getContext().publishEvent(sendEvent);
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
scheduleNextAttempt(queryVO, sid);
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, queryVO.getStartId());
|
|
||||||
|
// 继续查询下一页
|
||||||
|
pageQueryDO.setSearchCount(false);
|
||||||
|
pageQueryDO.setPage((i - 1) * queryVO.getSize());
|
||||||
|
pageResponseDO = accessTemplate.getJobLogMessageAccess()
|
||||||
|
.listPage(pageQueryDO);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// 这里判断是否继续查询
|
||||||
|
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(
|
||||||
|
new LambdaQueryWrapper<JobTaskBatch>().eq(JobTaskBatch::getId, queryVO.getTaskBatchId()));
|
||||||
|
|
||||||
|
// 结束查询
|
||||||
|
if (Objects.isNull(jobTaskBatch)
|
||||||
|
|| (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
|
||||||
|
jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) {
|
||||||
|
// 发生完成标识
|
||||||
|
WsSendEvent sendEvent = new WsSendEvent(this);
|
||||||
|
sendEvent.setMessage("END");
|
||||||
|
sendEvent.setSid(sid);
|
||||||
|
SnailSpringContext.getContext().publishEvent(sendEvent);
|
||||||
|
} else {
|
||||||
|
// 覆盖作为下次查询的起始条件
|
||||||
|
pageQueryDO.setStartRealTime(lastRealTime);
|
||||||
|
// 继续查询
|
||||||
|
scheduleNextAttempt(queryVO, sid);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ public class JobTaskLogTimerTask implements TimerTask<String> {
|
|||||||
try {
|
try {
|
||||||
LogTimerWheel.clearCache(idempotentKey());
|
LogTimerWheel.clearCache(idempotentKey());
|
||||||
JobLogService logService = SnailSpringContext.getBean(JobLogService.class);
|
JobLogService logService = SnailSpringContext.getBean(JobLogService.class);
|
||||||
logService.getJobLogPageV2(logQueryVO);
|
logService.getJobLogPage(logQueryVO);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e);
|
SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user