feat(1.5.0-beta1): 采用datasource的形式替换日志操作

This commit is contained in:
opensnail 2025-04-01 23:13:56 +08:00
parent 98b5135d2d
commit 520c540f74
8 changed files with 38 additions and 228 deletions

View File

@ -1,10 +1,5 @@
package com.aizuda.snailjob.server.common.config;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.service.impl.DatabaseLogService;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@ -38,9 +33,4 @@ public class SnailJobServerCommonAutoConfiguration {
return scheduler;
}
@Bean
@ConditionalOnMissingBean(LogService.class)
public LogService logService(JobLogMessageMapper jobLogMessageMapper, JobTaskBatchMapper jobTaskBatchMapper){
return new DatabaseLogService(jobLogMessageMapper, jobTaskBatchMapper);
}
}

View File

@ -1,11 +1,15 @@
package com.aizuda.snailjob.server.common.convert;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
import com.aizuda.snailjob.server.model.dto.LogTaskDTO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.common.support
@ -19,7 +23,7 @@ public interface JobLogMessageConverter {
JobLogMessageConverter INSTANCE = Mappers.getMapper(JobLogMessageConverter.class);
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO);
List<JobLogMessageDO> toJobLogMessages(List<JobLogTaskDTO> jobLogDTOs);
}

View File

@ -1,22 +0,0 @@
package com.aizuda.snailjob.server.common.service;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
import java.io.IOException;
import java.util.List;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.common.service
* @Projectsnail-job
* @Date2025/3/10 20:57
* @FilenameLogService
* @since 1.5.0
*/
public interface LogService {
void saveLog(JobLogDTO jobLogDTO);
void batchSaveLogs(List<JobLogTaskDTO> jobLogTasks);
void getJobLogPage(JobLogQueryVO queryVO, String sid) throws IOException;
}

View File

@ -1,185 +0,0 @@
package com.aizuda.snailjob.server.common.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.common.service.impl
* @Projectsnail-job
* @Date2025/3/10 21:12
* @FilenameDatabaseLogService
* @since 1.5.0
*/
@Slf4j
@RequiredArgsConstructor
public class DatabaseLogService implements LogService {
private static final Long DELAY_MILLS = 5000L;
private final JobLogMessageMapper jobLogMessageMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
/**
* 保存单调日志
*
* @param jobLogDTO
*/
@Override
public void saveLog(JobLogDTO jobLogDTO) {
JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(jobLogDTO);
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(1);
jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY));
jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L));
jobLogMessageMapper.insert(jobLogMessage);
}
/**
* 批量保存日志
*
* @param jobLogTasks
*/
@Override
public void batchSaveLogs(List<JobLogTaskDTO> jobLogTasks) {
Map<Long, List<JobLogTaskDTO>> logTaskDTOMap = jobLogTasks.
stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList()));
List<JobLogMessage> jobLogMessageList = new ArrayList<>();
for (List<JobLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0));
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(logTaskDTOList.size());
List<Map<String, String>> messageMapList = StreamUtils.toList(logTaskDTOList,
taskDTO -> taskDTO.getFieldList()
.stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)));
jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList));
jobLogMessageList.add(jobLogMessage);
}
jobLogMessageMapper.insertBatch(jobLogMessageList);
}
@Override
public void getJobLogPage(JobLogQueryVO queryVO, String sid) {
boolean taskBatchComplete = false;
while (!taskBatchComplete) {
PageDTO<JobLogMessage> pageDTO = new PageDTO<>(1, queryVO.getSize());
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO,
new LambdaQueryWrapper<JobLogMessage>()
.select(JobLogMessage::getId, JobLogMessage::getLogNum)
.ge(JobLogMessage::getId, queryVO.getStartId())
.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId())
.orderByAsc(JobLogMessage::getId)
.orderByAsc(JobLogMessage::getRealTime));
List<JobLogMessage> records = selectPage.getRecords();
if (CollUtil.isEmpty(records)) {
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(
new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, queryVO.getTaskBatchId())
);
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
if (Objects.isNull(jobTaskBatch)
|| (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))
) {
jobLogResponseVO.setFinished(Boolean.TRUE);
jobLogResponseVO.setNextStartId(queryVO.getStartId());
jobLogResponseVO.setFromIndex(0);
return;
} else {
// scheduleNextAttempt(queryVO, sid);
return;
}
}
Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0);
JobLogMessage firstRecord = records.get(0);
List<Long> ids = Lists.newArrayList(firstRecord.getId());
int total = firstRecord.getLogNum() - fromIndex;
for (int i = 1; i < records.size(); i++) {
JobLogMessage record = records.get(i);
if (total + record.getLogNum() > queryVO.getSize()) {
break;
}
total += record.getLogNum();
ids.add(record.getId());
}
long nextStartId = 0;
List<Map<String, String>> messages = Lists.newArrayList();
List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectList(
new LambdaQueryWrapper<JobLogMessage>()
.in(JobLogMessage::getId, ids)
.orderByAsc(JobLogMessage::getId)
.orderByAsc(JobLogMessage::getRealTime)
);
for (final JobLogMessage jobLogMessage : jobLogMessages) {
List<Map<String, String>> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);
int size = originalList.size() - fromIndex;
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
.collect(Collectors.toList());
if (messages.size() + size >= queryVO.getSize()) {
messages.addAll(pageList);
nextStartId = jobLogMessage.getId();
fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1;
break;
}
messages.addAll(pageList);
nextStartId = jobLogMessage.getId() + 1;
fromIndex = 0;
}
messages = messages.stream()
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.collect(Collectors.toList());
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
jobLogResponseVO.setMessage(messages);
jobLogResponseVO.setNextStartId(nextStartId);
jobLogResponseVO.setFromIndex(fromIndex);
// session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
queryVO.setFromIndex(fromIndex);
queryVO.setStartId(nextStartId);
}
}
}

View File

@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.server.model.dto.LogTaskDTO;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
@ -80,9 +81,9 @@ public interface JobTaskConverter {
TaskStopJobContext toStopJobContext(JobTaskPrepareDTO context);
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
JobLogMessageDO toJobLogMessage(JobLogDTO jobLogDTO);
JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO);
JobLogMessageDO toJobLogMessage(LogTaskDTO logTaskDTO);
JobLogMetaDTO toJobLogDTO(BaseDTO baseDTO);

View File

@ -1,10 +1,16 @@
package com.aizuda.snailjob.server.job.task.support.dispatch;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
import org.apache.pekko.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.server.common.pekko.ActorGenerator;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -13,6 +19,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author opensnail
@ -24,7 +31,7 @@ import java.util.*;
@Slf4j
@RequiredArgsConstructor
public class JobLogActor extends AbstractActor {
private final LogService logService;
private final AccessTemplate accessTemplate;
@Override
public Receive createReceive() {
@ -36,7 +43,21 @@ public class JobLogActor extends AbstractActor {
}
List<JobLogTaskDTO> jobLogTasks = (List<JobLogTaskDTO>) list;
logService.batchSaveLogs(jobLogTasks);
Map<Long, List<JobLogTaskDTO>> logTaskDTOMap = jobLogTasks.
stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList()));
List<JobLogMessageDO> jobLogMessageList = new ArrayList<>();
for (List<JobLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
JobLogMessageDO jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0));
jobLogMessage.setLogNum(logTaskDTOList.size());
List<Map<String, String>> messageMapList = StreamUtils.toList(logTaskDTOList,
taskDTO -> taskDTO.getFieldList()
.stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)));
jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList));
jobLogMessageList.add(jobLogMessage);
}
accessTemplate.getJobLogMessageAccess().insertBatch(jobLogMessageList);
} catch (Exception e) {
log.error("保存客户端日志异常.", e);
} finally {
@ -56,6 +77,10 @@ public class JobLogActor extends AbstractActor {
}
private void saveLogMessage(JobLogDTO jobLogDTO) {
logService.saveLog(jobLogDTO);
JobLogMessageDO jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogDTO);
jobLogMessage.setLogNum(1);
jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY));
jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L));
accessTemplate.getJobLogMessageAccess().insert(jobLogMessage);
}
}

View File

@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.web.listener;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.web.model.event.WsRequestEvent;
import com.aizuda.snailjob.server.web.service.JobLogService;

View File

@ -4,7 +4,6 @@ import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.web.service.JobLogService;
import io.netty.util.Timeout;
@ -35,7 +34,6 @@ public class JobTaskLogTimerTask implements TimerTask<String> {
LogTimerWheel.clearCache(idempotentKey());
JobLogService logService = SnailSpringContext.getBean(JobLogService.class);
logService.getJobLogPageV2(logQueryVO);
} catch (Exception e) {
SnailJobLog.LOCAL.error("定时任务日志查询执行失败", e);
}