feat(1.5.0-beta1): 增加websocket获取重试任务日志

This commit is contained in:
xiaochaihu 2025-04-15 23:12:45 +08:00
parent f47f24e47d
commit 57e487b96d
9 changed files with 234 additions and 6 deletions

View File

@ -1,7 +1,9 @@
package com.aizuda.snailjob.template.datasource.access.log; package com.aizuda.snailjob.template.datasource.access.log;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers; import org.mapstruct.factory.Mappers;
@ -28,4 +30,12 @@ public interface LogConverter {
List<JobLogMessageDO> toJobLogMessageDOList(List<JobLogMessage> logMessages); List<JobLogMessageDO> toJobLogMessageDOList(List<JobLogMessage> logMessages);
RetryTaskLogMessage toRetryTaskLogMessage(RetryTaskLogMessageDO logMessage);
List<RetryTaskLogMessage> toRetryTaskMessages(List<RetryTaskLogMessageDO> logMessages);
RetryTaskLogMessageDO toRetryTaskLogMessageDO(RetryTaskLogMessage logMessage);
List<RetryTaskLogMessageDO> toRetryTaskLogMessageDOList(List<RetryTaskLogMessage> logMessages);
} }

View File

@ -5,7 +5,13 @@ import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum; import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*; import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.Serializable; import java.io.Serializable;
@ -21,8 +27,11 @@ import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType;
* @author opensnail * @author opensnail
* @date 2025-03-29 * @date 2025-03-29
*/ */
@Slf4j
@RequiredArgsConstructor
public class RetryTaskLogMessageAccess implements RetryLogAccess<RetryTaskLogMessageDO> { public class RetryTaskLogMessageAccess implements RetryLogAccess<RetryTaskLogMessageDO> {
private final RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override @Override
public boolean supports(String operationType) { public boolean supports(String operationType) {
@ -31,17 +40,33 @@ public class RetryTaskLogMessageAccess implements RetryLogAccess<RetryTaskLogMes
} }
@Override @Override
public int insert(RetryTaskLogMessageDO retryTaskLogMessageDO) { public int insert(RetryTaskLogMessageDO retryTaskLogMessageDO) {
return 0; RetryTaskLogMessage retryTaskLogMessage = LogConverter.INSTANCE.toRetryTaskLogMessage(retryTaskLogMessageDO);
return retryTaskLogMessageMapper.insert(retryTaskLogMessage);
} }
@Override @Override
public int insertBatch(List<RetryTaskLogMessageDO> list) { public int insertBatch(List<RetryTaskLogMessageDO> list) {
return 0; List<RetryTaskLogMessage> retryTaskMessages = LogConverter.INSTANCE.toRetryTaskMessages(list);
return retryTaskLogMessageMapper.insertBatch(retryTaskMessages);
} }
@Override @Override
public PageResponseDO listPage(PageQueryDO queryDO) { public PageResponseDO listPage(PageQueryDO queryDO) {
return null; RetryTaskLogMessageQueryDO logPageQueryDO = (RetryTaskLogMessageQueryDO) queryDO;
PageDTO<RetryTaskLogMessage> selectPage = retryTaskLogMessageMapper.selectPage(
new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()),
new LambdaQueryWrapper<RetryTaskLogMessage>()
.ge(RetryTaskLogMessage::getId, logPageQueryDO.getStartId())
.eq(RetryTaskLogMessage::getRetryTaskId, logPageQueryDO.getRetryTaskId())
.orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime));
List<RetryTaskLogMessage> records = selectPage.getRecords();
PageResponseDO<RetryTaskLogMessageDO> responseDO = new PageResponseDO<>();
responseDO.setPage(selectPage.getCurrent());
responseDO.setSize(selectPage.getSize());
responseDO.setTotal(selectPage.getTotal());
responseDO.setRows(LogConverter.INSTANCE.toRetryTaskLogMessageDOList(records));
return responseDO;
} }
@Override @Override

View File

@ -8,6 +8,7 @@ import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLog
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -23,7 +24,7 @@ public class LogConfig {
@ConditionalOnMissingBean @ConditionalOnMissingBean
@Bean @Bean
public RetryLogAccess<RetryTaskLogMessageDO> defaultRetryLogAccess() { public RetryLogAccess<RetryTaskLogMessageDO> defaultRetryLogAccess(RetryTaskLogMessageMapper retryTaskLogMessageMapper) {
return new RetryTaskLogMessageAccess(); return new RetryTaskLogMessageAccess(retryTaskLogMessageMapper);
} }
} }

View File

@ -0,0 +1,19 @@
package com.aizuda.snailjob.template.datasource.persistence.dataobject.log;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageQueryDO;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class RetryTaskLogMessageQueryDO extends PageQueryDO {
private String groupName;
private Long retryTaskId;
private Long startId;
private Integer fromIndex;
private String sid;
}

View File

@ -4,7 +4,9 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.web.model.event.WsRequestEvent; import com.aizuda.snailjob.server.web.model.event.WsRequestEvent;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.service.JobLogService; import com.aizuda.snailjob.server.web.service.JobLogService;
import com.aizuda.snailjob.server.web.service.RetryTaskService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
@ -25,6 +27,8 @@ import org.springframework.stereotype.Component;
public class WsRequestListener { public class WsRequestListener {
private final JobLogService jobLogService; private final JobLogService jobLogService;
private final RetryTaskService retryTaskService;
@Async("logQueryExecutor") @Async("logQueryExecutor")
@EventListener(classes = WsRequestEvent.class) @EventListener(classes = WsRequestEvent.class)
public void getJobLogs(WsRequestEvent requestVO) { public void getJobLogs(WsRequestEvent requestVO) {
@ -45,4 +49,24 @@ public class WsRequestListener {
} }
@Async("logQueryExecutor")
@EventListener(classes = WsRequestEvent.class)
public void getRetryLogs(WsRequestEvent requestVO) {
if (!WebSocketSceneEnum.RETRY_LOG_SCENE.equals(requestVO.getSceneEnum())) {
return;
}
log.info("getRetryLogs {}", requestVO.getSid());
String message = requestVO.getMessage();
RetryTaskLogMessageQueryVO retryTaskLogMessageQueryVO = JsonUtil.parseObject(message, RetryTaskLogMessageQueryVO.class);
retryTaskLogMessageQueryVO.setSid(requestVO.getSid());
retryTaskLogMessageQueryVO.setStartId(0L);
try {
retryTaskService.getRetryTaskLogMessagePageV2(retryTaskLogMessageQueryVO);
} catch (Exception e) {
log.warn("send log error", e);
}
}
} }

View File

@ -19,4 +19,6 @@ public class RetryTaskLogMessageQueryVO extends BaseQueryVO {
private Long startId; private Long startId;
private Integer fromIndex; private Integer fromIndex;
private String sid;
} }

View File

@ -20,6 +20,8 @@ public interface RetryTaskService {
RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO);
void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO);
RetryTaskResponseVO getRetryTaskById(Long id); RetryTaskResponseVO getRetryTaskById(Long id);
boolean deleteById(Long id); boolean deleteById(Long id);

View File

@ -4,14 +4,19 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.constant.LogFieldConstants; import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO;
import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler; import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler;
import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.dto.LogMessagePartitionTask;
import com.aizuda.snailjob.server.web.model.event.WsSendEvent;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO;
import com.aizuda.snailjob.server.web.model.request.UserSessionVO; import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
@ -19,9 +24,16 @@ import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponse
import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO;
import com.aizuda.snailjob.server.web.service.RetryTaskService; import com.aizuda.snailjob.server.web.service.RetryTaskService;
import com.aizuda.snailjob.server.retry.task.convert.RetryConverter; import com.aizuda.snailjob.server.retry.task.convert.RetryConverter;
import com.aizuda.snailjob.server.web.service.convert.LogMessagePartitionTaskConverter;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskLogResponseVOConverter; import com.aizuda.snailjob.server.web.service.convert.RetryTaskLogResponseVOConverter;
import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter; import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter;
import com.aizuda.snailjob.server.web.timer.LogTimerWheel;
import com.aizuda.snailjob.server.web.timer.RetryTaskLogTimerTask;
import com.aizuda.snailjob.server.web.util.UserSessionUtils; import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
@ -30,12 +42,16 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -45,11 +61,12 @@ import java.util.stream.Collectors;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class RetryTaskServiceImpl implements RetryTaskService { public class RetryTaskServiceImpl implements RetryTaskService {
private static final Long DELAY_MILLS = 5000L;
private final RetryTaskMapper retryTaskMapper; private final RetryTaskMapper retryTaskMapper;
private final RetryMapper retryMapper; private final RetryMapper retryMapper;
private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; private final RetryTaskLogMessageMapper retryTaskLogMessageMapper;
private final RetryTaskStopHandler retryTaskStopHandler; private final RetryTaskStopHandler retryTaskStopHandler;
private final AccessTemplate accessTemplate;
@Override @Override
public PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO) { public PageResult<List<RetryTaskResponseVO>> getRetryTaskLogPage(RetryTaskQueryVO queryVO) {
@ -163,6 +180,86 @@ public class RetryTaskServiceImpl implements RetryTaskService {
return responseVO; return responseVO;
} }
@Override
public void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO) {
String sid = queryVO.getSid();
RetryTaskLogMessageQueryDO pageQueryDO = new RetryTaskLogMessageQueryDO();
pageQueryDO.setPage(1);
pageQueryDO.setSize(queryVO.getSize());
pageQueryDO.setRetryTaskId(queryVO.getRetryTaskId());
pageQueryDO.setStartId(queryVO.getStartId());
PartitionTaskUtils.process(startId -> {
// 记录下次起始时间
queryVO.setStartId(startId);
pageQueryDO.setStartId(startId);
// 拉去数据
PageResponseDO<RetryTaskLogMessageDO> pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess()
.listPage(pageQueryDO);
List<RetryTaskLogMessageDO> rows = pageResponseDO.getRows();
return LogMessagePartitionTaskConverter.INSTANCE.toLogMessagePartitionTask(rows);
}, new Consumer<>() {
@Override
public void accept(List<? extends PartitionTask> partitionTasks) {
List<LogMessagePartitionTask> partitionTaskList = (List<LogMessagePartitionTask>) partitionTasks;
for (LogMessagePartitionTask logMessagePartitionTask : partitionTaskList) {
// 发生日志内容到前端
String message = logMessagePartitionTask.getMessage();
List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
logContents = logContents.stream()
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.toList();
for (Map<String, String> logContent : logContents) {
// send发消息
WsSendEvent sendEvent = new WsSendEvent(this);
sendEvent.setSid(sid);
sendEvent.setMessage(JsonUtil.toJsonString(logContent));
SnailSpringContext.getContext().publishEvent(sendEvent);
}
}
}
}, new Predicate<>() {
@Override
public boolean apply(List<? extends PartitionTask> rows) {
// 决策是否完成
if (!CollUtil.isEmpty(rows)) {
return false;
}
RetryTask retryTask = retryTaskMapper.selectOne(
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getId, queryVO.getRetryTaskId()));
if (Objects.isNull(retryTask)
|| (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus()) &&
retryTask.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) {
// 发生完成标识
WsSendEvent sendEvent = new WsSendEvent(this);
sendEvent.setMessage("END");
sendEvent.setSid(sid);
SnailSpringContext.getContext().publishEvent(sendEvent);
return true;
} else {
scheduleNextAttempt(queryVO, sid);
return true;
}
}
}, queryVO.getStartId());
}
/**
* 使用时间轮5秒再进行日志查询
*
* @param queryVO
* @param sid
*/
private void scheduleNextAttempt(RetryTaskLogMessageQueryVO queryVO, String sid) {
LogTimerWheel.registerWithJobTaskLog(() -> new RetryTaskLogTimerTask(queryVO, sid), Duration.ofMillis(DELAY_MILLS));
}
@Override @Override
public RetryTaskResponseVO getRetryTaskById(Long id) { public RetryTaskResponseVO getRetryTaskById(Long id) {
RetryTask retryTask = retryTaskMapper.selectById(id); RetryTask retryTask = retryTaskMapper.selectById(id);

View File

@ -0,0 +1,48 @@
package com.aizuda.snailjob.server.web.timer;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO;
import com.aizuda.snailjob.server.web.service.RetryTaskService;
import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import java.text.MessageFormat;
import java.time.LocalDateTime;
/**
* @Authorxiaochaihu
* @Packagecom.aizuda.snailjob.server.common.timer
* @Projectsnail-job
* @Date2025/4/15 22:15
* @FilenameRetryTaskLogTimerTask
* @since 1.5.0
*/
@AllArgsConstructor
public class RetryTaskLogTimerTask implements TimerTask<String> {
private static final String IDEMPOTENT_KEY_PREFIX = "retryTaskLog_{0}_{1}_{2}";
private RetryTaskLogMessageQueryVO logQueryVO;
private String sid;
@Override
public void run(final Timeout timeout) throws Exception {
SnailJobLog.LOCAL.debug("Start querying scheduled task logs. Current time:[{}] retryTaskId:[{}]", LocalDateTime.now(), logQueryVO.getRetryTaskId());
try {
LogTimerWheel.clearCache(idempotentKey());
RetryTaskService logService = SnailSpringContext.getBean(RetryTaskService.class);
logService.getRetryTaskLogMessagePageV2(logQueryVO);
} catch (Exception e) {
SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e);
}
}
@Override
public String idempotentKey() {
Long taskId = logQueryVO.getRetryTaskId();
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, taskId);
}
}