fix(1.5.0-beta1): 优化重试日志信息

This commit is contained in:
opensnail 2025-04-19 16:38:18 +08:00
parent 407474ea7f
commit 5d0675536d
8 changed files with 61 additions and 163 deletions

View File

@ -7,12 +7,12 @@ import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
@ -51,14 +51,15 @@ public class RetryTaskLogMessageAccess implements RetryLogAccess<RetryTaskLogMes
} }
@Override @Override
public PageResponseDO listPage(PageQueryDO queryDO) { public PageResponseDO<RetryTaskLogMessageDO> listPage(PageQueryDO queryDO) {
RetryTaskLogMessageQueryDO logPageQueryDO = (RetryTaskLogMessageQueryDO) queryDO; RetryTaskLogMessageQueryDO logPageQueryDO = (RetryTaskLogMessageQueryDO) queryDO;
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage( PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(
new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()), new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize(), logPageQueryDO.isSearchCount()),
new LambdaQueryWrapper<RetryTaskLogMessage>() new LambdaQueryWrapper<RetryTaskLogMessage>()
.ge(RetryTaskLogMessage::getId, logPageQueryDO.getStartId()) .gt(RetryTaskLogMessage::getRealTime, logPageQueryDO.getStartRealTime())
.eq(RetryTaskLogMessage::getRetryTaskId, logPageQueryDO.getRetryTaskId()) .eq(RetryTaskLogMessage::getRetryTaskId, logPageQueryDO.getRetryTaskId())
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)); .orderByAsc(RetryTaskLogMessage::getId)
.orderByAsc(RetryTaskLogMessage::getRealTime));
List<RetryTaskLogMessage> records = selectPage.getRecords(); List<RetryTaskLogMessage> records = selectPage.getRecords();
PageResponseDO<RetryTaskLogMessageDO> responseDO = new PageResponseDO<>(); PageResponseDO<RetryTaskLogMessageDO> responseDO = new PageResponseDO<>();

View File

@ -11,9 +11,9 @@ public class RetryTaskLogMessageQueryDO extends PageQueryDO {
private Long retryTaskId; private Long retryTaskId;
private Long startId; private Long startRealTime;
private Integer fromIndex; private boolean searchCount;
private String sid; private String sid;
} }

View File

@ -33,12 +33,6 @@ public class RetryTaskController {
return retryTaskService.getRetryTaskLogPage(queryVO); return retryTaskService.getRetryTaskLogPage(queryVO);
} }
@LoginRequired
@GetMapping("/message/list")
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) {
return retryTaskService.getRetryTaskLogMessagePage(queryVO);
}
@LoginRequired @LoginRequired
@GetMapping("{id}") @GetMapping("{id}")
public RetryTaskResponseVO getRetryTaskById(@PathVariable("id") Long id) { public RetryTaskResponseVO getRetryTaskById(@PathVariable("id") Long id) {

View File

@ -61,9 +61,9 @@ public class WsRequestListener {
String message = requestVO.getMessage(); String message = requestVO.getMessage();
RetryTaskLogMessageQueryVO retryTaskLogMessageQueryVO = JsonUtil.parseObject(message, RetryTaskLogMessageQueryVO.class); RetryTaskLogMessageQueryVO retryTaskLogMessageQueryVO = JsonUtil.parseObject(message, RetryTaskLogMessageQueryVO.class);
retryTaskLogMessageQueryVO.setSid(requestVO.getSid()); retryTaskLogMessageQueryVO.setSid(requestVO.getSid());
retryTaskLogMessageQueryVO.setStartId(0L); retryTaskLogMessageQueryVO.setStartRealTime(0L);
try { try {
retryTaskService.getRetryTaskLogMessagePageV2(retryTaskLogMessageQueryVO); retryTaskService.getRetryTaskLogMessagePage(retryTaskLogMessageQueryVO);
} catch (Exception e) { } catch (Exception e) {
log.warn("send log error", e); log.warn("send log error", e);
} }

View File

@ -13,12 +13,8 @@ import lombok.EqualsAndHashCode;
public class RetryTaskLogMessageQueryVO extends BaseQueryVO { public class RetryTaskLogMessageQueryVO extends BaseQueryVO {
private String groupName; private String groupName;
private Long retryTaskId; private Long retryTaskId;
private Long startRealTime;
private Long startId;
private Integer fromIndex;
private String sid; private String sid;
} }

View File

@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponseVO;
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import java.util.List; import java.util.List;
@ -18,9 +17,7 @@ public interface RetryTaskService {
PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO); PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO);
RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); void getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO);
void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO);
RetryTaskResponseVO getRetryTaskById(Long id); RetryTaskResponseVO getRetryTaskById(Long id);

View File

@ -96,158 +96,69 @@ public class RetryTaskServiceImpl implements RetryTaskService {
} }
@Override @Override
public RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage( public void getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO) {
RetryTaskLogMessageQueryVO queryVO) {
if (queryVO.getRetryTaskId() == null || StrUtil.isBlank(queryVO.getGroupName())) {
RetryTaskLogMessageResponseVO jobLogResponseVO = new RetryTaskLogMessageResponseVO();
jobLogResponseVO.setNextStartId(0L);
jobLogResponseVO.setFromIndex(0);
return jobLogResponseVO;
}
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
PageDTO<RetryTaskLogMessage> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(pageDTO,
new LambdaQueryWrapper<RetryTaskLogMessage>()
.select(RetryTaskLogMessage::getId, RetryTaskLogMessage::getLogNum)
.ge(RetryTaskLogMessage::getId, queryVO.getStartId())
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getRetryTaskId, queryVO.getRetryTaskId())
.eq(RetryTaskLogMessage::getGroupName, queryVO.getGroupName())
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)
.orderByDesc(RetryTaskLogMessage::getCreateDt));
List<RetryTaskLogMessage> records = selectPage.getRecords();
if (CollUtil.isEmpty(records)) {
return new RetryTaskLogMessageResponseVO()
.setFinished(Boolean.TRUE)
.setNextStartId(queryVO.getStartId())
.setFromIndex(0);
}
Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0);
RetryTaskLogMessage firstRecord = records.get(0);
List<Long> ids = Lists.newArrayList(firstRecord.getId());
int total = firstRecord.getLogNum() - fromIndex;
for (int i = 1; i < records.size(); i++) {
RetryTaskLogMessage record = records.get(i);
if (total + record.getLogNum() > queryVO.getSize()) {
break;
}
total += record.getLogNum();
ids.add(record.getId());
}
long nextStartId = 0;
List<Map<String, String>> messages = Lists.newArrayList();
List<RetryTaskLogMessage> jobLogMessages = retryTaskLogMessageMapper.selectList(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.in(RetryTaskLogMessage::getId, ids)
.orderByAsc(RetryTaskLogMessage::getId)
.orderByAsc(RetryTaskLogMessage::getRealTime)
);
for (final RetryTaskLogMessage retryTaskLogMessage : jobLogMessages) {
List<Map<String, String>> originalList = JsonUtil.parseObject(retryTaskLogMessage.getMessage(), List.class);
int size = originalList.size() - fromIndex;
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
.collect(Collectors.toList());
if (messages.size() + size >= queryVO.getSize()) {
messages.addAll(pageList);
nextStartId = retryTaskLogMessage.getId();
fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1;
break;
}
messages.addAll(pageList);
nextStartId = retryTaskLogMessage.getId() + 1;
fromIndex = 0;
}
messages = messages.stream()
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.collect(Collectors.toList());
RetryTaskLogMessageResponseVO responseVO = new RetryTaskLogMessageResponseVO();
responseVO.setMessage(messages);
responseVO.setNextStartId(nextStartId);
responseVO.setFromIndex(fromIndex);
return responseVO;
}
@Override
public void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO) {
String sid = queryVO.getSid(); String sid = queryVO.getSid();
RetryTaskLogMessageQueryDO pageQueryDO = new RetryTaskLogMessageQueryDO(); RetryTaskLogMessageQueryDO pageQueryDO = new RetryTaskLogMessageQueryDO();
pageQueryDO.setPage(1); pageQueryDO.setPage(1);
pageQueryDO.setSize(queryVO.getSize()); pageQueryDO.setSize(queryVO.getSize());
pageQueryDO.setRetryTaskId(queryVO.getRetryTaskId()); pageQueryDO.setRetryTaskId(queryVO.getRetryTaskId());
pageQueryDO.setStartId(queryVO.getStartId()); pageQueryDO.setStartRealTime(queryVO.getStartRealTime());
PartitionTaskUtils.process(startId -> { pageQueryDO.setSearchCount(true);
// 记录下次起始时间 // 拉取数据
queryVO.setStartId(startId); PageResponseDO<RetryTaskLogMessageDO> pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess()
pageQueryDO.setStartId(startId); .listPage(pageQueryDO);
// 拉去数据
PageResponseDO<RetryTaskLogMessageDO> pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess()
.listPage(pageQueryDO);
List<RetryTaskLogMessageDO> rows = pageResponseDO.getRows();
return LogMessagePartitionTaskConverter.INSTANCE.toLogMessagePartitionTask(rows);
}, new Consumer<>() {
@Override
public void accept(List<? extends PartitionTask> partitionTasks) {
List<LogMessagePartitionTask> partitionTaskList = (List<LogMessagePartitionTask>) partitionTasks; long total = pageResponseDO.getTotal();
for (LogMessagePartitionTask logMessagePartitionTask : partitionTaskList) { int totalPage = (int) ((total + queryVO.getSize() - 1) / queryVO.getSize());
// 发生日志内容到前端
String message = logMessagePartitionTask.getMessage();
List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
logContents = logContents.stream()
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.toList();
for (Map<String, String> logContent : logContents) {
// send发消息
WsSendEvent sendEvent = new WsSendEvent(this);
sendEvent.setSid(sid);
sendEvent.setMessage(JsonUtil.toJsonString(logContent));
SnailSpringContext.getContext().publishEvent(sendEvent);
}
}
} Long lastRealTime = 0L;
}, new Predicate<>() {
@Override
public boolean apply(List<? extends PartitionTask> rows) {
// 决策是否完成 if (0 == totalPage &&
if (!CollUtil.isEmpty(rows)) { (null != pageQueryDO.getStartRealTime() && 0 != pageQueryDO.getStartRealTime())){
return false; lastRealTime = pageQueryDO.getStartRealTime();
} }
RetryTask retryTask = retryTaskMapper.selectOne( for (int i = 1; i <= totalPage;) {
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getId, queryVO.getRetryTaskId())); for (RetryTaskLogMessageDO retryTaskLogMessageDO : pageResponseDO.getRows()) {
// 发生日志内容到前端
if (Objects.isNull(retryTask) String message = retryTaskLogMessageDO.getMessage();
|| (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus()) && List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
retryTask.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) { logContents = logContents.stream()
// 发生完成标识 .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.toList();
for (Map<String, String> logContent : logContents) {
// send发消息
WsSendEvent sendEvent = new WsSendEvent(this); WsSendEvent sendEvent = new WsSendEvent(this);
sendEvent.setMessage("END");
sendEvent.setSid(sid); sendEvent.setSid(sid);
sendEvent.setMessage(JsonUtil.toJsonString(logContent));
SnailSpringContext.getContext().publishEvent(sendEvent); SnailSpringContext.getContext().publishEvent(sendEvent);
return true;
} else {
scheduleNextAttempt(queryVO, sid);
return true;
} }
} }
}, queryVO.getStartId());
// 继续查询下一页
pageQueryDO.setSearchCount(false);
pageQueryDO.setPage(++i);
pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess()
.listPage(pageQueryDO);
}
RetryTask retryTask = retryTaskMapper.selectOne(
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getId, queryVO.getRetryTaskId()));
if (Objects.isNull(retryTask)
|| (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus()) &&
retryTask.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) {
// 发生完成标识
WsSendEvent sendEvent = new WsSendEvent(this);
sendEvent.setMessage("END");
sendEvent.setSid(sid);
SnailSpringContext.getContext().publishEvent(sendEvent);
} else {
// 覆盖作为下次查询的起始条件
queryVO.setStartRealTime(lastRealTime);
scheduleNextAttempt(queryVO, sid);
}
} }
/** /**

View File

@ -33,7 +33,7 @@ public class RetryTaskLogTimerTask implements TimerTask<String> {
try { try {
LogTimerWheel.clearCache(idempotentKey()); LogTimerWheel.clearCache(idempotentKey());
RetryTaskService logService = SnailSpringContext.getBean(RetryTaskService.class); RetryTaskService logService = SnailSpringContext.getBean(RetryTaskService.class);
logService.getRetryTaskLogMessagePageV2(logQueryVO); logService.getRetryTaskLogMessagePage(logQueryVO);
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e); SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e);
} }
@ -41,7 +41,6 @@ public class RetryTaskLogTimerTask implements TimerTask<String> {
@Override @Override
public String idempotentKey() { public String idempotentKey() {
Long taskId = logQueryVO.getRetryTaskId(); Long taskId = logQueryVO.getRetryTaskId();
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, taskId); return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, taskId);
} }