diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java index 6704bc6d0..232032143 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java @@ -1,10 +1,5 @@ package com.aizuda.snailjob.server.common.config; -import com.aizuda.snailjob.server.common.service.LogService; -import com.aizuda.snailjob.server.common.service.impl.DatabaseLogService; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -38,9 +33,4 @@ public class SnailJobServerCommonAutoConfiguration { return scheduler; } - @Bean - @ConditionalOnMissingBean(LogService.class) - public LogService logService(JobLogMessageMapper jobLogMessageMapper, JobTaskBatchMapper jobTaskBatchMapper){ - return new DatabaseLogService(jobLogMessageMapper, jobTaskBatchMapper); - } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobLogMessageConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobLogMessageConverter.java index ec08e6c14..171d531c4 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobLogMessageConverter.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobLogMessageConverter.java @@ -1,11 +1,15 @@ package com.aizuda.snailjob.server.common.convert; import com.aizuda.snailjob.server.common.dto.JobLogDTO; +import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; import com.aizuda.snailjob.server.model.dto.LogTaskDTO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; +import java.util.List; + /** * @Author:srzou * @Package:com.aizuda.snailjob.server.common.support @@ -19,7 +23,7 @@ public interface JobLogMessageConverter { JobLogMessageConverter INSTANCE = Mappers.getMapper(JobLogMessageConverter.class); - JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO); - JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO); + List toJobLogMessages(List jobLogDTOs); + } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java deleted file mode 100644 index fac3b0a9a..000000000 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.aizuda.snailjob.server.common.service; - -import com.aizuda.snailjob.server.common.dto.JobLogDTO; -import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; -import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; - -import java.io.IOException; -import java.util.List; - -/** - * @Author:srzou - * @Package:com.aizuda.snailjob.server.common.service - * @Project:snail-job - * @Date:2025/3/10 20:57 - * @Filename:LogService - * @since 1.5.0 - */ -public interface LogService { - void saveLog(JobLogDTO jobLogDTO); - void batchSaveLogs(List jobLogTasks); - void getJobLogPage(JobLogQueryVO queryVO, String sid) throws IOException; -} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java deleted file mode 100644 index 54abe67bf..000000000 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.aizuda.snailjob.server.common.service.impl; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.common.log.constant.LogFieldConstants; -import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO; -import com.aizuda.snailjob.server.common.dto.JobLogDTO; -import com.aizuda.snailjob.server.common.service.LogService; -import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter; -import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; -import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; -import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; -import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import com.google.common.collect.Lists; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.*; -import java.util.stream.Collectors; - -import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; - -/** - * @Author:srzou - * @Package:com.aizuda.snailjob.server.common.service.impl - * @Project:snail-job - * @Date:2025/3/10 21:12 - * @Filename:DatabaseLogService - * @since 1.5.0 - */ -@Slf4j -@RequiredArgsConstructor -public class DatabaseLogService implements LogService { - private static final Long DELAY_MILLS = 5000L; - private final JobLogMessageMapper jobLogMessageMapper; - private final JobTaskBatchMapper jobTaskBatchMapper; - - /** - * 保存单调日志 - * - * @param jobLogDTO - */ - @Override - public void saveLog(JobLogDTO jobLogDTO) { - JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(jobLogDTO); - jobLogMessage.setCreateDt(LocalDateTime.now()); - jobLogMessage.setLogNum(1); - jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY)); - jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L)); - jobLogMessageMapper.insert(jobLogMessage); - } - - /** - * 批量保存日志 - * - * @param jobLogTasks - */ - @Override - public void batchSaveLogs(List jobLogTasks) { - Map> logTaskDTOMap = jobLogTasks. - stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList())); - - List jobLogMessageList = new ArrayList<>(); - for (List logTaskDTOList : logTaskDTOMap.values()) { - JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0)); - jobLogMessage.setCreateDt(LocalDateTime.now()); - jobLogMessage.setLogNum(logTaskDTOList.size()); - List> messageMapList = StreamUtils.toList(logTaskDTOList, - taskDTO -> taskDTO.getFieldList() - .stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue())) - .collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))); - jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList)); - - jobLogMessageList.add(jobLogMessage); - } - - jobLogMessageMapper.insertBatch(jobLogMessageList); - } - - @Override - public void getJobLogPage(JobLogQueryVO queryVO, String sid) { - boolean taskBatchComplete = false; - while (!taskBatchComplete) { - PageDTO pageDTO = new PageDTO<>(1, queryVO.getSize()); - - PageDTO selectPage = jobLogMessageMapper.selectPage(pageDTO, - new LambdaQueryWrapper() - .select(JobLogMessage::getId, JobLogMessage::getLogNum) - .ge(JobLogMessage::getId, queryVO.getStartId()) - .eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()) - .eq(JobLogMessage::getTaskId, queryVO.getTaskId()) - .orderByAsc(JobLogMessage::getId) - .orderByAsc(JobLogMessage::getRealTime)); - List records = selectPage.getRecords(); - if (CollUtil.isEmpty(records)) { - - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne( - new LambdaQueryWrapper() - .eq(JobTaskBatch::getId, queryVO.getTaskBatchId()) - ); - - JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); - - if (Objects.isNull(jobTaskBatch) - || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && - jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now())) - ) { - jobLogResponseVO.setFinished(Boolean.TRUE); - jobLogResponseVO.setNextStartId(queryVO.getStartId()); - jobLogResponseVO.setFromIndex(0); - return; - } else { -// scheduleNextAttempt(queryVO, sid); - return; - } - } - - Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0); - JobLogMessage firstRecord = records.get(0); - List ids = Lists.newArrayList(firstRecord.getId()); - int total = firstRecord.getLogNum() - fromIndex; - for (int i = 1; i < records.size(); i++) { - JobLogMessage record = records.get(i); - if (total + record.getLogNum() > queryVO.getSize()) { - break; - } - - total += record.getLogNum(); - ids.add(record.getId()); - } - - long nextStartId = 0; - List> messages = Lists.newArrayList(); - List jobLogMessages = jobLogMessageMapper.selectList( - new LambdaQueryWrapper() - .in(JobLogMessage::getId, ids) - .orderByAsc(JobLogMessage::getId) - .orderByAsc(JobLogMessage::getRealTime) - ); - - for (final JobLogMessage jobLogMessage : jobLogMessages) { - - List> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class); - int size = originalList.size() - fromIndex; - List> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) - .collect(Collectors.toList()); - - if (messages.size() + size >= queryVO.getSize()) { - messages.addAll(pageList); - nextStartId = jobLogMessage.getId(); - fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1; - break; - } - - messages.addAll(pageList); - nextStartId = jobLogMessage.getId() + 1; - fromIndex = 0; - } - - messages = messages.stream() - .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) - .collect(Collectors.toList()); - - JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); - jobLogResponseVO.setMessage(messages); - jobLogResponseVO.setNextStartId(nextStartId); - jobLogResponseVO.setFromIndex(fromIndex); -// session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO)); - - queryVO.setFromIndex(fromIndex); - queryVO.setStartId(nextStartId); - } - - } - -} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java index e151fc10d..b1324a394 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java @@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext; import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.snailjob.server.model.dto.LogTaskDTO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; @@ -80,9 +81,9 @@ public interface JobTaskConverter { TaskStopJobContext toStopJobContext(JobTaskPrepareDTO context); - JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO); + JobLogMessageDO toJobLogMessage(JobLogDTO jobLogDTO); - JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO); + JobLogMessageDO toJobLogMessage(LogTaskDTO logTaskDTO); JobLogMetaDTO toJobLogDTO(BaseDTO baseDTO); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java index 15312744e..686ec1051 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java @@ -1,10 +1,16 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; +import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO; import com.aizuda.snailjob.server.common.dto.JobLogDTO; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; import org.apache.pekko.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.server.common.pekko.ActorGenerator; -import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -13,6 +19,7 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.*; +import java.util.stream.Collectors; /** * @author opensnail @@ -24,7 +31,7 @@ import java.util.*; @Slf4j @RequiredArgsConstructor public class JobLogActor extends AbstractActor { - private final LogService logService; + private final AccessTemplate accessTemplate; @Override public Receive createReceive() { @@ -36,7 +43,21 @@ public class JobLogActor extends AbstractActor { } List jobLogTasks = (List) list; - logService.batchSaveLogs(jobLogTasks); + Map> logTaskDTOMap = jobLogTasks. + stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList())); + List jobLogMessageList = new ArrayList<>(); + for (List logTaskDTOList : logTaskDTOMap.values()) { + JobLogMessageDO jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0)); + jobLogMessage.setLogNum(logTaskDTOList.size()); + List> messageMapList = StreamUtils.toList(logTaskDTOList, + taskDTO -> taskDTO.getFieldList() + .stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue())) + .collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))); + jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList)); + jobLogMessageList.add(jobLogMessage); + } + + accessTemplate.getJobLogMessageAccess().insertBatch(jobLogMessageList); } catch (Exception e) { log.error("保存客户端日志异常.", e); } finally { @@ -56,6 +77,10 @@ public class JobLogActor extends AbstractActor { } private void saveLogMessage(JobLogDTO jobLogDTO) { - logService.saveLog(jobLogDTO); + JobLogMessageDO jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogDTO); + jobLogMessage.setLogNum(1); + jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY)); + jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L)); + accessTemplate.getJobLogMessageAccess().insert(jobLogMessage); } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java index bc6825483..3a39dbe49 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java @@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.web.listener; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; -import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.web.model.event.WsRequestEvent; import com.aizuda.snailjob.server.web.service.JobLogService; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java index 2fe4239a7..06f34ef17 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java @@ -4,7 +4,6 @@ import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; -import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.web.service.JobLogService; import io.netty.util.Timeout; @@ -35,7 +34,6 @@ public class JobTaskLogTimerTask implements TimerTask { LogTimerWheel.clearCache(idempotentKey()); JobLogService logService = SnailSpringContext.getBean(JobLogService.class); logService.getJobLogPageV2(logQueryVO); - } catch (Exception e) { SnailJobLog.LOCAL.error("定时任务日志查询执行失败", e); }