diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java index 922d85eb5..fd4cfb2ed 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java @@ -1,7 +1,9 @@ package com.aizuda.snailjob.template.datasource.access.log; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -28,4 +30,12 @@ public interface LogConverter { List toJobLogMessageDOList(List logMessages); + RetryTaskLogMessage toRetryTaskLogMessage(RetryTaskLogMessageDO logMessage); + + List toRetryTaskMessages(List logMessages); + + RetryTaskLogMessageDO toRetryTaskLogMessageDO(RetryTaskLogMessage logMessage); + + List toRetryTaskLogMessageDOList(List logMessages); + } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java index 709d56033..d4e2d2ac7 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java @@ -5,7 +5,13 @@ import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum; import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum; import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*; import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.io.Serializable; @@ -21,8 +27,11 @@ import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType; * @author opensnail * @date 2025-03-29 */ +@Slf4j +@RequiredArgsConstructor public class RetryTaskLogMessageAccess implements RetryLogAccess { + private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; @Override public boolean supports(String operationType) { @@ -31,17 +40,33 @@ public class RetryTaskLogMessageAccess implements RetryLogAccess list) { - return 0; + List retryTaskMessages = LogConverter.INSTANCE.toRetryTaskMessages(list); + return retryTaskLogMessageMapper.insertBatch(retryTaskMessages); } @Override public PageResponseDO listPage(PageQueryDO queryDO) { - return null; + RetryTaskLogMessageQueryDO logPageQueryDO = (RetryTaskLogMessageQueryDO) queryDO; + PageDTO selectPage = retryTaskLogMessageMapper.selectPage( + new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()), + new LambdaQueryWrapper() + .ge(RetryTaskLogMessage::getId, logPageQueryDO.getStartId()) + .eq(RetryTaskLogMessage::getRetryTaskId, logPageQueryDO.getRetryTaskId()) + .orderByAsc(RetryTaskLogMessage::getId).orderByAsc(RetryTaskLogMessage::getRealTime)); + List records = selectPage.getRecords(); + + PageResponseDO responseDO = new PageResponseDO<>(); + responseDO.setPage(selectPage.getCurrent()); + responseDO.setSize(selectPage.getSize()); + responseDO.setTotal(selectPage.getTotal()); + responseDO.setRows(LogConverter.INSTANCE.toRetryTaskLogMessageDOList(records)); + return responseDO; } @Override diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java index 3c93f620a..d88b9ba4a 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/config/LogConfig.java @@ -8,6 +8,7 @@ import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLog import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -23,7 +24,7 @@ public class LogConfig { @ConditionalOnMissingBean @Bean - public RetryLogAccess defaultRetryLogAccess() { - return new RetryTaskLogMessageAccess(); + public RetryLogAccess defaultRetryLogAccess(RetryTaskLogMessageMapper retryTaskLogMessageMapper) { + return new RetryTaskLogMessageAccess(retryTaskLogMessageMapper); } } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageQueryDO.java new file mode 100644 index 000000000..de6591535 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageQueryDO.java @@ -0,0 +1,19 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.log; + +import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageQueryDO; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class RetryTaskLogMessageQueryDO extends PageQueryDO { + private String groupName; + + private Long retryTaskId; + + private Long startId; + + private Integer fromIndex; + + private String sid; +} \ No newline at end of file diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java index b2893d773..395b906e8 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java @@ -4,7 +4,9 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.web.model.event.WsRequestEvent; +import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; import com.aizuda.snailjob.server.web.service.JobLogService; +import com.aizuda.snailjob.server.web.service.RetryTaskService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; @@ -25,6 +27,8 @@ import org.springframework.stereotype.Component; public class WsRequestListener { private final JobLogService jobLogService; + private final RetryTaskService retryTaskService; + @Async("logQueryExecutor") @EventListener(classes = WsRequestEvent.class) public void getJobLogs(WsRequestEvent requestVO) { @@ -45,4 +49,24 @@ public class WsRequestListener { } + + @Async("logQueryExecutor") + @EventListener(classes = WsRequestEvent.class) + public void getRetryLogs(WsRequestEvent requestVO) { + if (!WebSocketSceneEnum.RETRY_LOG_SCENE.equals(requestVO.getSceneEnum())) { + return; + } + + log.info("getRetryLogs {}", requestVO.getSid()); + String message = requestVO.getMessage(); + RetryTaskLogMessageQueryVO retryTaskLogMessageQueryVO = JsonUtil.parseObject(message, RetryTaskLogMessageQueryVO.class); + retryTaskLogMessageQueryVO.setSid(requestVO.getSid()); + retryTaskLogMessageQueryVO.setStartId(0L); + try { + retryTaskService.getRetryTaskLogMessagePageV2(retryTaskLogMessageQueryVO); + } catch (Exception e) { + log.warn("send log error", e); + } + + } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java index cbe7d99f3..218d75def 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java @@ -19,4 +19,6 @@ public class RetryTaskLogMessageQueryVO extends BaseQueryVO { private Long startId; private Integer fromIndex; + + private String sid; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java index 9ab60e5ed..412724960 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/RetryTaskService.java @@ -20,6 +20,8 @@ public interface RetryTaskService { RetryTaskLogMessageResponseVO getRetryTaskLogMessagePage(RetryTaskLogMessageQueryVO queryVO); + void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO); + RetryTaskResponseVO getRetryTaskById(Long id); boolean deleteById(Long id); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java index 837d02c5a..6bf81449a 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/RetryTaskServiceImpl.java @@ -4,14 +4,19 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.enums.RetryOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.RetryTaskStatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.constant.LogFieldConstants; +import com.aizuda.snailjob.server.common.dto.PartitionTask; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; import com.aizuda.snailjob.server.retry.task.dto.TaskStopJobDTO; import com.aizuda.snailjob.server.retry.task.support.handler.RetryTaskStopHandler; import com.aizuda.snailjob.server.web.model.base.PageResult; +import com.aizuda.snailjob.server.web.model.dto.LogMessagePartitionTask; +import com.aizuda.snailjob.server.web.model.event.WsSendEvent; import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; import com.aizuda.snailjob.server.web.model.request.RetryTaskQueryVO; import com.aizuda.snailjob.server.web.model.request.UserSessionVO; @@ -19,9 +24,16 @@ import com.aizuda.snailjob.server.web.model.response.RetryTaskLogMessageResponse import com.aizuda.snailjob.server.web.model.response.RetryTaskResponseVO; import com.aizuda.snailjob.server.web.service.RetryTaskService; import com.aizuda.snailjob.server.retry.task.convert.RetryConverter; +import com.aizuda.snailjob.server.web.service.convert.LogMessagePartitionTaskConverter; import com.aizuda.snailjob.server.web.service.convert.RetryTaskLogResponseVOConverter; import com.aizuda.snailjob.server.web.service.convert.RetryTaskResponseVOConverter; +import com.aizuda.snailjob.server.web.timer.LogTimerWheel; +import com.aizuda.snailjob.server.web.timer.RetryTaskLogTimerTask; import com.aizuda.snailjob.server.web.util.UserSessionUtils; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageQueryDO; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; @@ -30,12 +42,16 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLogMessage; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.*; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -45,11 +61,12 @@ import java.util.stream.Collectors; @Service @RequiredArgsConstructor public class RetryTaskServiceImpl implements RetryTaskService { - + private static final Long DELAY_MILLS = 5000L; private final RetryTaskMapper retryTaskMapper; private final RetryMapper retryMapper; private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; private final RetryTaskStopHandler retryTaskStopHandler; + private final AccessTemplate accessTemplate; @Override public PageResult> getRetryTaskLogPage(RetryTaskQueryVO queryVO) { @@ -163,6 +180,86 @@ public class RetryTaskServiceImpl implements RetryTaskService { return responseVO; } + @Override + public void getRetryTaskLogMessagePageV2(RetryTaskLogMessageQueryVO queryVO) { + String sid = queryVO.getSid(); + RetryTaskLogMessageQueryDO pageQueryDO = new RetryTaskLogMessageQueryDO(); + pageQueryDO.setPage(1); + pageQueryDO.setSize(queryVO.getSize()); + pageQueryDO.setRetryTaskId(queryVO.getRetryTaskId()); + pageQueryDO.setStartId(queryVO.getStartId()); + PartitionTaskUtils.process(startId -> { + // 记录下次起始时间 + queryVO.setStartId(startId); + pageQueryDO.setStartId(startId); + // 拉去数据 + PageResponseDO pageResponseDO = accessTemplate.getRetryTaskLogMessageAccess() + .listPage(pageQueryDO); + List rows = pageResponseDO.getRows(); + return LogMessagePartitionTaskConverter.INSTANCE.toLogMessagePartitionTask(rows); + }, new Consumer<>() { + @Override + public void accept(List partitionTasks) { + + List partitionTaskList = (List) partitionTasks; + + for (LogMessagePartitionTask logMessagePartitionTask : partitionTaskList) { + // 发生日志内容到前端 + String message = logMessagePartitionTask.getMessage(); + List> logContents = JsonUtil.parseObject(message, List.class); + logContents = logContents.stream() + .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) + .toList(); + for (Map logContent : logContents) { + // send发消息 + WsSendEvent sendEvent = new WsSendEvent(this); + sendEvent.setSid(sid); + sendEvent.setMessage(JsonUtil.toJsonString(logContent)); + SnailSpringContext.getContext().publishEvent(sendEvent); + } + } + + } + }, new Predicate<>() { + @Override + public boolean apply(List rows) { + + // 决策是否完成 + if (!CollUtil.isEmpty(rows)) { + return false; + } + + RetryTask retryTask = retryTaskMapper.selectOne( + new LambdaQueryWrapper().eq(RetryTask::getId, queryVO.getRetryTaskId())); + + if (Objects.isNull(retryTask) + || (RetryTaskStatusEnum.TERMINAL_STATUS_SET.contains(retryTask.getTaskStatus()) && + retryTask.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) { + // 发生完成标识 + WsSendEvent sendEvent = new WsSendEvent(this); + sendEvent.setMessage("END"); + sendEvent.setSid(sid); + SnailSpringContext.getContext().publishEvent(sendEvent); + return true; + } else { + scheduleNextAttempt(queryVO, sid); + return true; + } + } + }, queryVO.getStartId()); + + } + + /** + * 使用时间轮5秒再进行日志查询 + * + * @param queryVO + * @param sid + */ + private void scheduleNextAttempt(RetryTaskLogMessageQueryVO queryVO, String sid) { + LogTimerWheel.registerWithJobTaskLog(() -> new RetryTaskLogTimerTask(queryVO, sid), Duration.ofMillis(DELAY_MILLS)); + } + @Override public RetryTaskResponseVO getRetryTaskById(Long id) { RetryTask retryTask = retryTaskMapper.selectById(id); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/RetryTaskLogTimerTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/RetryTaskLogTimerTask.java new file mode 100644 index 000000000..6264d2b1c --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/RetryTaskLogTimerTask.java @@ -0,0 +1,48 @@ +package com.aizuda.snailjob.server.web.timer; + +import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.TimerTask; +import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; +import com.aizuda.snailjob.server.web.model.request.RetryTaskLogMessageQueryVO; +import com.aizuda.snailjob.server.web.service.RetryTaskService; +import io.netty.util.Timeout; +import lombok.AllArgsConstructor; + +import java.text.MessageFormat; +import java.time.LocalDateTime; + +/** + * @Author:xiaochaihu + * @Package:com.aizuda.snailjob.server.common.timer + * @Project:snail-job + * @Date:2025/4/15 22:15 + * @Filename:RetryTaskLogTimerTask + * @since 1.5.0 + */ +@AllArgsConstructor +public class RetryTaskLogTimerTask implements TimerTask { + private static final String IDEMPOTENT_KEY_PREFIX = "retryTaskLog_{0}_{1}_{2}"; + private RetryTaskLogMessageQueryVO logQueryVO; + private String sid; + + @Override + public void run(final Timeout timeout) throws Exception { + SnailJobLog.LOCAL.debug("Start querying scheduled task logs. Current time:[{}] retryTaskId:[{}]", LocalDateTime.now(), logQueryVO.getRetryTaskId()); + + try { + LogTimerWheel.clearCache(idempotentKey()); + RetryTaskService logService = SnailSpringContext.getBean(RetryTaskService.class); + logService.getRetryTaskLogMessagePageV2(logQueryVO); + } catch (Exception e) { + SnailJobLog.LOCAL.error("Scheduled task log query execution failed", e); + } + } + + @Override + public String idempotentKey() { + + Long taskId = logQueryVO.getRetryTaskId(); + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, taskId); + } +}