fix(1.5.0-beta1): 优化重试日志信息
This commit is contained in:
parent
310d9bc580
commit
33b6f35916
@ -7,12 +7,12 @@ import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
|
|||||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
|
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO;
|
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
|
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -51,14 +51,15 @@ public class RetryTaskLogMessageAccess implements RetryLogAccess<RetryTaskLogMes
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageResponseDO listPage(PageQueryDO queryDO) {
|
public PageResponseDO<RetryTaskLogMessageDO> listPage(PageQueryDO queryDO) {
|
||||||
RetryTaskLogMessageQueryDO logPageQueryDO = (RetryTaskLogMessageQueryDO) queryDO;
|
RetryTaskLogMessageQueryDO logPageQueryDO = (RetryTaskLogMessageQueryDO) queryDO;
|
||||||
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(
|
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(
|
||||||
new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()),
|
new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize(), logPageQueryDO.isSearchCount()),
|
||||||
new LambdaQueryWrapper<RetryTaskLogMessage>()
|
new LambdaQueryWrapper<RetryTaskLogMessage>()
|
||||||
.ge(RetryTaskLogMessage::getId, logPageQueryDO.getStartId())
|
.gt(RetryTaskLogMessage::getRealTime, logPageQueryDO.getStartRealTime())
|
||||||
.eq(RetryTaskLogMessage::getRetryTaskId, logPageQueryDO.getRetryTaskId())
|
.eq(RetryTaskLogMessage::getRetryTaskId, logPageQueryDO.getRetryTaskId())
|
||||||
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime));
|
.orderByAsc(RetryTaskLogMessage::getId)
|
||||||
|
.orderByAsc(RetryTaskLogMessage::getRealTime));
|
||||||
List<RetryTaskLogMessage> records = selectPage.getRecords();
|
List<RetryTaskLogMessage> records = selectPage.getRecords();
|
||||||
|
|
||||||
PageResponseDO<RetryTaskLogMessageDO> responseDO = new PageResponseDO<>();
|
PageResponseDO<RetryTaskLogMessageDO> responseDO = new PageResponseDO<>();
|
||||||
|
@ -11,9 +11,9 @@ public class RetryTaskLogMessageQueryDO extends PageQueryDO {
|
|||||||
|
|
||||||
private Long retryTaskId;
|
private Long retryTaskId;
|
||||||
|
|
||||||
private Long startId;
|
private Long startRealTime;
|
||||||
|
|
||||||
private Integer fromIndex;
|
private boolean searchCount;
|
||||||
|
|
||||||
private String sid;
|
private String sid;
|
||||||
}
|
}
|
@ -33,12 +33,6 @@ public class RetryTaskController {
|
|||||||
return retryTaskService.getRetryTaskLogPage(queryVO);
|
return retryTaskService.getRetryTaskLogPage(queryVO);
|
||||||
}
|
}
|
||||||
|
|
||||||
@LoginRequired
|
|
||||||
@GetMapping("/message/list")
|
|
||||||
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) {
|
|
||||||
return retryTaskService.getRetryTaskLogMessagePage(queryVO);
|
|
||||||
}
|
|
||||||
|
|
||||||
@LoginRequired
|
@LoginRequired
|
||||||
@GetMapping("{id}")
|
@GetMapping("{id}")
|
||||||
public RetryTaskResponseVO getRetryTaskById(@PathVariable("id") Long id) {
|
public RetryTaskResponseVO getRetryTaskById(@PathVariable("id") Long id) {
|
||||||
|
@ -61,9 +61,9 @@ public class WsRequestListener {
|
|||||||
String message = requestVO.getMessage();
|
String message = requestVO.getMessage();
|
||||||
RetryTaskLogMessageQueryVO retryTaskLogMessageQueryVO = JsonUtil.parseObject(message, RetryTaskLogMessageQueryVO.class);
|
RetryTaskLogMessageQueryVO retryTaskLogMessageQueryVO = JsonUtil.parseObject(message, RetryTaskLogMessageQueryVO.class);
|
||||||
retryTaskLogMessageQueryVO.setSid(requestVO.getSid());
|
retryTaskLogMessageQueryVO.setSid(requestVO.getSid());
|
||||||
retryTaskLogMessageQueryVO.setStartId(0L);
|
retryTaskLogMessageQueryVO.setStartRealTime(0L);
|
||||||
try {
|
try {
|
||||||
retryTaskService.getRetryTaskLogMessagePageV2(retryTaskLogMessageQueryVO);
|
retryTaskService.getRetryTaskLogMessagePage(retryTaskLogMessageQueryVO);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("send log error", e);
|
log.warn("send log error", e);
|
||||||
}
|
}
|
||||||
|
@ -13,12 +13,8 @@ import lombok.EqualsAndHashCode;
|
|||||||
public class RetryTaskLogMessageQueryVO extends BaseQueryVO {
|
public class RetryTaskLogMessageQueryVO extends BaseQueryVO {
|
||||||
|
|
||||||
private String groupName;
|
private String groupName;
|
||||||
|
|
||||||
private Long retryTaskId;
|
private Long retryTaskId;
|
||||||
|
private Long startRealTime;
|
||||||
private Long startId;
|
|
||||||
|
|
||||||
private Integer fromIndex;
|
|
||||||
|
|
||||||
private String sid;
|
private String sid;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.web.service;
|
|||||||
import com.aizuda.snailjob.server.web.model.base.PageResult;
|
import com.aizuda.snailjob.server.web.model.base.PageResult;
|
||||||
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
|
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
|
||||||
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
|
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
|
||||||
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
|
|
||||||
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
|
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -18,9 +17,7 @@ public interface RetryTaskService {
|
|||||||
|
|
||||||
PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO);
|
PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO);
|
||||||
|
|
||||||
RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO);
|
void getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO);
|
||||||
|
|
||||||
void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO);
|
|
||||||
|
|
||||||
RetryTaskResponseVO getRetryTaskById(Long id);
|
RetryTaskResponseVO getRetryTaskById(Long id);
|
||||||
|
|
||||||
|
@ -96,116 +96,33 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(
|
public void getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) {
|
||||||
RetryTaskLogMessageQueryVO queryVO) {
|
|
||||||
if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) {
|
|
||||||
RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO();
|
|
||||||
jobLogResponseVO.setNextStartId(0L);
|
|
||||||
jobLogResponseVO.setFromIndex(0);
|
|
||||||
return jobLogResponseVO;
|
|
||||||
}
|
|
||||||
|
|
||||||
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
|
|
||||||
|
|
||||||
PageDTO<RetryTaskLogMessage> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
|
|
||||||
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(pageDTO,
|
|
||||||
new LambdaQueryWrapper<RetryTaskLogMessage>()
|
|
||||||
.select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum)
|
|
||||||
.ge(RetryTaskLogMessage::getId, queryVO.getStartId())
|
|
||||||
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
|
|
||||||
.eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId())
|
|
||||||
.eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName())
|
|
||||||
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)
|
|
||||||
.orderByDesc(RetryTaskLogMessage::getCreateDt));
|
|
||||||
|
|
||||||
List<RetryTaskLogMessage> records = selectPage.getRecords();
|
|
||||||
|
|
||||||
if (CollUtil.isEmpty(records)) {
|
|
||||||
return new RetryTaskLogMessageResponseVO()
|
|
||||||
.setFinished(Boolean.TRUE)
|
|
||||||
.setNextStartId(queryVO.getStartId())
|
|
||||||
.setFromIndex(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0);
|
|
||||||
RetryTaskLogMessage firstRecord = records.get(0);
|
|
||||||
List<Long> ids = Lists.newArrayList(firstRecord.getId());
|
|
||||||
int total = firstRecord.getLogNum() - fromIndex;
|
|
||||||
for (int i = 1; i < records.size(); i++) {
|
|
||||||
RetryTaskLogMessage record = records.get(i);
|
|
||||||
if (total + record.getLogNum() > queryVO.getSize()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
total += record.getLogNum();
|
|
||||||
ids.add(record.getId());
|
|
||||||
}
|
|
||||||
|
|
||||||
long nextStartId = 0;
|
|
||||||
List<Map<String, String>> messages = Lists.newArrayList();
|
|
||||||
List<RetryTaskLogMessage> jobLogMessages = retryTaskLogMessageMapper.selectList(
|
|
||||||
new LambdaQueryWrapper<RetryTaskLogMessage>()
|
|
||||||
.in(RetryTaskLogMessage::getId, ids)
|
|
||||||
.orderByAsc(RetryTaskLogMessage::getId)
|
|
||||||
.orderByAsc(RetryTaskLogMessage::getRealTime)
|
|
||||||
);
|
|
||||||
|
|
||||||
for (final RetryTaskLogMessage retryTaskLogMessage : jobLogMessages) {
|
|
||||||
|
|
||||||
List<Map<String, String>> originalList = JsonUtil.parseObject(retryTaskLogMessage.getMessage(), List.class);
|
|
||||||
int size = originalList.size() - fromIndex;
|
|
||||||
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
if (messages.size() + size >= queryVO.getSize()) {
|
|
||||||
messages.addAll(pageList);
|
|
||||||
nextStartId = retryTaskLogMessage.getId();
|
|
||||||
fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
messages.addAll(pageList);
|
|
||||||
nextStartId = retryTaskLogMessage.getId() + 1;
|
|
||||||
fromIndex = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
messages = messages.stream()
|
|
||||||
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
RetryTaskLogMessageResponseVO responseVO = new RetryTaskLogMessageResponseVO();
|
|
||||||
responseVO.setMessage(messages);
|
|
||||||
responseVO.setNextStartId(nextStartId);
|
|
||||||
responseVO.setFromIndex(fromIndex);
|
|
||||||
|
|
||||||
return responseVO;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO) {
|
|
||||||
String sid = queryVO.getSid();
|
String sid = queryVO.getSid();
|
||||||
RetryTaskLogMessageQueryDO pageQueryDO = new RetryTaskLogMessageQueryDO();
|
RetryTaskLogMessageQueryDO pageQueryDO = new RetryTaskLogMessageQueryDO();
|
||||||
pageQueryDO.setPage(1);
|
pageQueryDO.setPage(1);
|
||||||
pageQueryDO.setSize(queryVO.getSize());
|
pageQueryDO.setSize(queryVO.getSize());
|
||||||
pageQueryDO.setRetryTaskId(queryVO.getRetryTaskId());
|
pageQueryDO.setRetryTaskId(queryVO.getRetryTaskId());
|
||||||
pageQueryDO.setStartId(queryVO.getStartId());
|
pageQueryDO.setStartRealTime(queryVO.getStartRealTime());
|
||||||
PartitionTaskUtils.process(startId -> {
|
pageQueryDO.setSearchCount(true);
|
||||||
// 记录下次起始时间
|
// 拉取数据
|
||||||
queryVO.setStartId(startId);
|
|
||||||
pageQueryDO.setStartId(startId);
|
|
||||||
// 拉去数据
|
|
||||||
PageResponseDO<RetryTaskLogMessageDO> pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess()
|
PageResponseDO<RetryTaskLogMessageDO> pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess()
|
||||||
.listPage(pageQueryDO);
|
.listPage(pageQueryDO);
|
||||||
List<RetryTaskLogMessageDO> rows = pageResponseDO.getRows();
|
|
||||||
return LogMessagePartitionTaskConverter.INSTANCE.toLogMessagePartitionTask(rows);
|
|
||||||
}, new Consumer<>() {
|
|
||||||
@Override
|
|
||||||
public void accept(List<? extends PartitionTask> partitionTasks) {
|
|
||||||
|
|
||||||
List<LogMessagePartitionTask> partitionTaskList = (List<LogMessagePartitionTask>) partitionTasks;
|
long total = pageResponseDO.getTotal();
|
||||||
|
|
||||||
for (LogMessagePartitionTask logMessagePartitionTask : partitionTaskList) {
|
int totalPage = (int) ((total + queryVO.getSize() - 1) / queryVO.getSize());
|
||||||
|
|
||||||
|
Long lastRealTime = 0L;
|
||||||
|
|
||||||
|
if (0 == totalPage &&
|
||||||
|
(null != pageQueryDO.getStartRealTime() && 0 != pageQueryDO.getStartRealTime())){
|
||||||
|
lastRealTime = pageQueryDO.getStartRealTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 1; i <= totalPage;) {
|
||||||
|
for (RetryTaskLogMessageDO retryTaskLogMessageDO : pageResponseDO.getRows()) {
|
||||||
// 发生日志内容到前端
|
// 发生日志内容到前端
|
||||||
String message = logMessagePartitionTask.getMessage();
|
String message = retryTaskLogMessageDO.getMessage();
|
||||||
List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
|
List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
|
||||||
logContents = logContents.stream()
|
logContents = logContents.stream()
|
||||||
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
|
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
|
||||||
@ -219,14 +136,11 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
// 继续查询下一页
|
||||||
}, new Predicate<>() {
|
pageQueryDO.setSearchCount(false);
|
||||||
@Override
|
pageQueryDO.setPage(++i);
|
||||||
public boolean apply(List<? extends PartitionTask> rows) {
|
pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess()
|
||||||
|
.listPage(pageQueryDO);
|
||||||
// 决策是否完成
|
|
||||||
if (!CollUtil.isEmpty(rows)) {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RetryTask retryTask = retryTaskMapper.selectOne(
|
RetryTask retryTask = retryTaskMapper.selectOne(
|
||||||
@ -240,15 +154,12 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
|||||||
sendEvent.setMessage("END");
|
sendEvent.setMessage("END");
|
||||||
sendEvent.setSid(sid);
|
sendEvent.setSid(sid);
|
||||||
SnailSpringContext.getContext().publishEvent(sendEvent);
|
SnailSpringContext.getContext().publishEvent(sendEvent);
|
||||||
return true;
|
|
||||||
} else {
|
} else {
|
||||||
|
// 覆盖作为下次查询的起始条件
|
||||||
|
queryVO.setStartRealTime(lastRealTime);
|
||||||
scheduleNextAttempt(queryVO, sid);
|
scheduleNextAttempt(queryVO, sid);
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, queryVO.getStartId());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 使用时间轮5秒再进行日志查询
|
* 使用时间轮5秒再进行日志查询
|
||||||
|
@ -33,7 +33,7 @@ public class RetryTaskLogTimerTask implements TimerTask<String> {
|
|||||||
try {
|
try {
|
||||||
LogTimerWheel.clearCache(idempotentKey());
|
LogTimerWheel.clearCache(idempotentKey());
|
||||||
RetryTaskService logService = SnailSpringContext.getBean(RetryTaskService.class);
|
RetryTaskService logService = SnailSpringContext.getBean(RetryTaskService.class);
|
||||||
logService.getRetryTaskLogMessagePageV2(logQueryVO);
|
logService.getRetryTaskLogMessagePage(logQueryVO);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e);
|
SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e);
|
||||||
}
|
}
|
||||||
@ -41,7 +41,6 @@ public class RetryTaskLogTimerTask implements TimerTask<String> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String idempotentKey() {
|
public String idempotentKey() {
|
||||||
|
|
||||||
Long taskId = logQueryVO.getRetryTaskId();
|
Long taskId = logQueryVO.getRetryTaskId();
|
||||||
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, taskId);
|
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, taskId);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user