feat(1.5.0):增加日志自定义存储及获取接口;增加websocket获取定时任务日志

This commit is contained in:
srzou 2025-03-24 10:16:17 +08:00 committed by opensnail
parent f0d8ee92fd
commit 319275b9b8
38 changed files with 547 additions and 68 deletions

View File

@ -1,5 +1,10 @@
package com.aizuda.snailjob.server.common.config; package com.aizuda.snailjob.server.common.config;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.service.impl.DatabaseLogService;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -32,4 +37,10 @@ public class SnailJobServerCommonAutoConfiguration {
scheduler.setThreadNamePrefix("snail-job-alarm-thread-"); scheduler.setThreadNamePrefix("snail-job-alarm-thread-");
return scheduler; return scheduler;
} }
@Bean
@ConditionalOnMissingBean(LogService.class)
public LogService logService(JobLogMessageMapper jobLogMessageMapper, JobTaskBatchMapper jobTaskBatchMapper){
return new DatabaseLogService(jobLogMessageMapper, jobTaskBatchMapper);
}
} }

View File

@ -0,0 +1,25 @@
package com.aizuda.snailjob.server.common.convert;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.model.dto.LogTaskDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.common.support
* @Projectsnail-job
* @Date2025/3/10 21:16
* @FilenameJobLogMessageConverter
* @since 1.5.0
*/
@Mapper
public interface JobLogMessageConverter {
JobLogMessageConverter INSTANCE = Mappers.getMapper(JobLogMessageConverter.class);
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO);
}

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.server.job.task.dto; package com.aizuda.snailjob.server.common.dto;
import lombok.Data; import lombok.Data;

View File

@ -0,0 +1,28 @@
package com.aizuda.snailjob.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @since 1.5.0
*/
@Getter
@AllArgsConstructor
public enum WebSocketSceneEnum {
JOB_LOG_SCENE(1, "JOB_LOG_SCENE"),
WORKFLOW_LOG_SCENE(2, "WORKFLOW_LOG_SCENE"),
RETRY_LOG_SCENE(3,"RETRY_LOG_SCENE");
private final Integer type;
private final String scene;
// public static WebSocketSceneEnum valueOf(String scene) {
// for (WebSocketSceneEnum expressionTypeEnum : WebSocketSceneEnum.values()) {
// if (expressionTypeEnum.getScene().equals(scene)) {
// return expressionTypeEnum;
// }
// }
//
// return null;
// }
}

View File

@ -0,0 +1,23 @@
package com.aizuda.snailjob.server.common.service;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
import jakarta.websocket.Session;
import java.io.IOException;
import java.util.List;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.common.service
* @Projectsnail-job
* @Date2025/3/10 20:57
* @FilenameLogService
* @since 1.5.0
*/
public interface LogService {
void saveLog(JobLogDTO jobLogDTO);
void batchSaveLogs(List<JobLogTaskDTO> jobLogTasks);
void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException;
}

View File

@ -0,0 +1,205 @@
package com.aizuda.snailjob.server.common.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import jakarta.websocket.Session;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.common.service.impl
* @Projectsnail-job
* @Date2025/3/10 21:12
* @FilenameDatabaseLogService
* @since 1.5.0
*/
@Slf4j
@RequiredArgsConstructor
public class DatabaseLogService implements LogService {
private final JobLogMessageMapper jobLogMessageMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
// 创建一个调度线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 保存单调日志
*
* @param jobLogDTO
*/
@Override
public void saveLog(JobLogDTO jobLogDTO) {
JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(jobLogDTO);
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(1);
jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY));
jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L));
jobLogMessageMapper.insert(jobLogMessage);
}
/**
* 批量保存日志
*
* @param jobLogTasks
*/
@Override
public void batchSaveLogs(List<JobLogTaskDTO> jobLogTasks) {
Map<Long, List<JobLogTaskDTO>> logTaskDTOMap = jobLogTasks.
stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList()));
List<JobLogMessage> jobLogMessageList = new ArrayList<>();
for (List<JobLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0));
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(logTaskDTOList.size());
List<Map<String, String>> messageMapList = StreamUtils.toList(logTaskDTOList,
taskDTO -> taskDTO.getFieldList()
.stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)));
jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList));
jobLogMessageList.add(jobLogMessage);
}
jobLogMessageMapper.insertBatch(jobLogMessageList);
}
@Override
public void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException {
Boolean taskBatchComplete = false;
while (!taskBatchComplete){
PageDTO<JobLogMessage> pageDTO = new PageDTO<>(1, queryVO.getSize());
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO,
new LambdaQueryWrapper<JobLogMessage>()
.select(JobLogMessage::getId, JobLogMessage::getLogNum)
.ge(JobLogMessage::getId, queryVO.getStartId())
.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId())
.orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
List<JobLogMessage> records = selectPage.getRecords();
if (CollUtil.isEmpty(records)) {
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(
new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, queryVO.getTaskBatchId())
);
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
if (Objects.isNull(jobTaskBatch)
|| (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))
) {
jobLogResponseVO.setFinished(Boolean.TRUE);
jobLogResponseVO.setNextStartId(queryVO.getStartId());
jobLogResponseVO.setFromIndex(0);
session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
System.out.println("结束了");
return;
}else {
// 如果没有完成就等五秒执行
System.out.println("异步执行");
scheduleNextAttempt(queryVO, session);
return;
}
}
Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0);
JobLogMessage firstRecord = records.get(0);
List<Long> ids = Lists.newArrayList(firstRecord.getId());
int total = firstRecord.getLogNum() - fromIndex;
for (int i = 1; i < records.size(); i++) {
JobLogMessage record = records.get(i);
if (total + record.getLogNum() > queryVO.getSize()) {
break;
}
total += record.getLogNum();
ids.add(record.getId());
}
long nextStartId = 0;
List<Map<String, String>> messages = Lists.newArrayList();
List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectList(
new LambdaQueryWrapper<JobLogMessage>()
.in(JobLogMessage::getId, ids)
.orderByAsc(JobLogMessage::getId)
.orderByAsc(JobLogMessage::getRealTime)
);
for (final JobLogMessage jobLogMessage : jobLogMessages) {
List<Map<String, String>> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);
int size = originalList.size() - fromIndex;
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
.collect(Collectors.toList());
if (messages.size() + size >= queryVO.getSize()) {
messages.addAll(pageList);
nextStartId = jobLogMessage.getId();
fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1;
break;
}
messages.addAll(pageList);
nextStartId = jobLogMessage.getId() + 1;
fromIndex = 0;
}
messages = messages.stream()
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
.collect(Collectors.toList());
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
jobLogResponseVO.setMessage(messages);
jobLogResponseVO.setNextStartId(nextStartId);
jobLogResponseVO.setFromIndex(fromIndex);
session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
queryVO.setFromIndex(fromIndex);
queryVO.setStartId(nextStartId);
}
}
private void scheduleNextAttempt(JobLogQueryVO queryVO, Session session) {
scheduler.schedule(() -> {
try {
// 再次调用查询
getJobLogPage(queryVO, session);
} catch (IOException e) {
e.printStackTrace();
}
// 5秒后执行
}, 5, TimeUnit.SECONDS);
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.common.vo;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.server.web.model.response; package com.aizuda.snailjob.server.common.vo;
import lombok.Data; import lombok.Data;

View File

@ -0,0 +1,27 @@
package com.aizuda.snailjob.server.common.vo;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import lombok.Data;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.web.model.request
* @Projectsnail-job
* @Date2025/3/5 16:54
* @FilenameWebSocketRequestVO
* @since 1.5.0
*/
@Data
public class WsRequestVO {
/**
* wb类型
*/
private String sid;
/**
* context
*/
private String message;
private WebSocketSceneEnum sceneEnum;
}

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.server.web.model.base; package com.aizuda.snailjob.server.common.vo.base;
import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.ObjUtil;
import lombok.Data; import lombok.Data;

View File

@ -4,6 +4,7 @@ import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest; import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest;
import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.job.task.dto.*; import com.aizuda.snailjob.server.job.task.dto.*;
import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext; import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext;

View File

@ -1,26 +1,18 @@
package com.aizuda.snailjob.server.job.task.support.dispatch; package com.aizuda.snailjob.server.job.task.support.dispatch;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO;
import com.aizuda.snailjob.server.common.pekko.ActorGenerator; import com.aizuda.snailjob.server.common.pekko.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.JobLogDTO; import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
/** /**
* @author opensnail * @author opensnail
@ -32,7 +24,7 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class JobLogActor extends AbstractActor { public class JobLogActor extends AbstractActor {
private final JobLogMessageMapper jobLogMessageMapper; private final LogService logService;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
@ -44,24 +36,7 @@ public class JobLogActor extends AbstractActor {
} }
List<JobLogTaskDTO> jobLogTasks = (List<JobLogTaskDTO>) list; List<JobLogTaskDTO> jobLogTasks = (List<JobLogTaskDTO>) list;
Map<Long, List<JobLogTaskDTO>> logTaskDTOMap = jobLogTasks. logService.batchSaveLogs(jobLogTasks);
stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList()));
List<JobLogMessage> jobLogMessageList = new ArrayList<>();
for (List<JobLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0));
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(logTaskDTOList.size());
List<Map<String, String>> messageMapList = StreamUtils.toList(logTaskDTOList,
taskDTO -> taskDTO.getFieldList()
.stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)));
jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList));
jobLogMessageList.add(jobLogMessage);
}
jobLogMessageMapper.insertBatch(jobLogMessageList);
} catch (Exception e) { } catch (Exception e) {
log.error("保存客户端日志异常.", e); log.error("保存客户端日志异常.", e);
} finally { } finally {
@ -81,11 +56,6 @@ public class JobLogActor extends AbstractActor {
} }
private void saveLogMessage(JobLogDTO jobLogDTO) { private void saveLogMessage(JobLogDTO jobLogDTO) {
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogDTO); logService.saveLog(jobLogDTO);
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(1);
jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY));
jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L));
jobLogMessageMapper.insert(jobLogMessage);
} }
} }

View File

@ -10,7 +10,7 @@ import com.aizuda.snailjob.server.common.pekko.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.dto.LogMetaDTO; import com.aizuda.snailjob.server.common.dto.LogMetaDTO;
import com.aizuda.snailjob.server.common.log.LogStorageFactory; import com.aizuda.snailjob.server.common.log.LogStorageFactory;
import com.aizuda.snailjob.server.job.task.dto.JobLogDTO; import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View File

@ -39,6 +39,10 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId> <artifactId>spring-boot-starter-validation</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- libs --> <!-- libs -->
<dependency> <dependency>

View File

@ -0,0 +1,74 @@
package com.aizuda.snailjob.server.web.config;
import com.aizuda.snailjob.common.core.exception.SnailJobAuthenticationException;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.template.datasource.persistence.po.SystemUser;
import com.auth0.jwt.JWT;
import com.auth0.jwt.exceptions.JWTDecodeException;
import jakarta.websocket.HandshakeResponse;
import jakarta.websocket.server.HandshakeRequest;
import jakarta.websocket.server.ServerEndpointConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import java.util.List;
import java.util.Map;
/**
* WebScoket配置处理器
* @since 1.5.0
*/
@Configuration
public class WebSocketConfigurator extends ServerEndpointConfig.Configurator {
public static final String AUTHENTICATION = "Snail-Job-Auth";
public static final String SID = "sid";
public static final String SCENE = "scene";
public static final String USER_INFO = "user-info";
/**
* ServerEndpointExporter 作用
*
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
Map<String, List<String>> parameterMap = request.getParameterMap();
String queryString = request.getQueryString();
// 获取 token 中的 user id
SystemUser systemUser;
try {
String token = parameterMap.get(AUTHENTICATION).get(0);
systemUser = JsonUtil.parseObject(JWT.decode(token).getAudience().get(0), SystemUser.class);
} catch (JWTDecodeException j) {
throw new SnailJobAuthenticationException("登陆过期,请重新登陆");
}
String sid = parameterMap.get(SID).get(0);
String scene = parameterMap.get(SCENE).get(0);
Map<String, Object> userProperties = sec.getUserProperties();
userProperties.put(SID, sid);
userProperties.put(SCENE, scene);
userProperties.put(USER_INFO, systemUser);
}
/**
* 初始化端点对象,也就是被@ServerEndpoint所标注的对象
*/
@Override
public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
return super.getEndpointInstance(clazz);
}
}

View File

@ -1,8 +1,8 @@
package com.aizuda.snailjob.server.web.controller; package com.aizuda.snailjob.server.web.controller;
import com.aizuda.snailjob.server.web.annotation.LoginRequired; import com.aizuda.snailjob.server.web.annotation.LoginRequired;
import com.aizuda.snailjob.server.web.model.request.JobLogQueryVO; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.web.model.response.JobLogResponseVO; import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
import com.aizuda.snailjob.server.web.service.JobLogService; import com.aizuda.snailjob.server.web.service.JobLogService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;

View File

@ -0,0 +1,47 @@
package com.aizuda.snailjob.server.web.listener;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.common.vo.WsRequestVO;
import jakarta.websocket.Session;
import lombok.AllArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.io.IOException;
import static com.aizuda.snailjob.server.web.socket.LogServer.USER_SESSION;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.web.listener
* @Projectsnail-job
* @Date2025/3/18 10:56
* @FilenameWsRequestListener
* @since 1.5.0
*/
@Component
@AllArgsConstructor
public class WsRequestListener {
private final LogService logService;
@Async
@EventListener(classes = WsRequestVO.class)
public void getJobLogs(WsRequestVO requestVO) {
if (!WebSocketSceneEnum.JOB_LOG_SCENE.equals(requestVO.getSceneEnum())) {
return;
}
String message = requestVO.getMessage();
JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class);
Session session = USER_SESSION.get(requestVO.getSid());
try {
logService.getJobLogPage(jobLogQueryVO, session);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.web.model.request; package com.aizuda.snailjob.server.web.model.request;
import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.web.service; package com.aizuda.snailjob.server.web.service;
import com.aizuda.snailjob.server.web.model.request.JobLogQueryVO; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.web.model.response.JobLogResponseVO; import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
/** /**
* @author: opensnail * @author: opensnail

View File

@ -3,8 +3,8 @@ package com.aizuda.snailjob.server.web.service.impl;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.constant.LogFieldConstants; import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
import com.aizuda.snailjob.server.web.model.request.JobLogQueryVO; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.web.model.response.JobLogResponseVO; import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
import com.aizuda.snailjob.server.web.service.JobLogService; import com.aizuda.snailjob.server.web.service.JobLogService;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
@ -20,6 +20,8 @@ import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
/** /**
* @author: opensnail * @author: opensnail
* @date : 2023-10-12 09:55 * @date : 2023-10-12 09:55
@ -40,8 +42,8 @@ public class JobLogServiceImpl implements JobLogService {
new LambdaQueryWrapper<JobLogMessage>() new LambdaQueryWrapper<JobLogMessage>()
.select(JobLogMessage::getId, JobLogMessage::getLogNum) .select(JobLogMessage::getId, JobLogMessage::getLogNum)
.ge(JobLogMessage::getId, queryVO.getStartId()) .ge(JobLogMessage::getId, queryVO.getStartId())
.ge(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()) .eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
.ge(JobLogMessage::getJobId, queryVO.getJobId()) // .ge(JobLogMessage::getJobId, queryVO.getJobId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId()) .eq(JobLogMessage::getTaskId, queryVO.getTaskId())
.orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime)); .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
List<JobLogMessage> records = selectPage.getRecords(); List<JobLogMessage> records = selectPage.getRecords();
@ -55,7 +57,9 @@ public class JobLogServiceImpl implements JobLogService {
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
if (Objects.isNull(jobTaskBatch) if (Objects.isNull(jobTaskBatch)
|| jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()) || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))
) { ) {
jobLogResponseVO.setFinished(Boolean.TRUE); jobLogResponseVO.setFinished(Boolean.TRUE);
} }

View File

@ -0,0 +1,60 @@
package com.aizuda.snailjob.server.web.socket;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import com.aizuda.snailjob.server.common.vo.WsRequestVO;
import com.aizuda.snailjob.server.web.config.WebSocketConfigurator;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.web.socket
* @Projectsnail-job
* @Date2025/3/4 16:31
* @FilenameLogServer
* @since 1.5.0
*/
@Slf4j
@Component
@ServerEndpoint(value = "/webSocket", configurator = WebSocketConfigurator.class)
public class LogServer {
// 缓存session
public static final ConcurrentHashMap<String, Session> USER_SESSION = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session) {
Map<String, Object> userProperties = session.getUserProperties();
USER_SESSION.put((String) userProperties.get(WebSocketConfigurator.SID), session);
}
@OnMessage
public void onMessage(String message, Session session) throws IOException, EncodeException {
// 接收请求
Map<String, Object> userProperties = session.getUserProperties();
String sid = (String) userProperties.get(WebSocketConfigurator.SID);
String scene = (String) userProperties.get(WebSocketConfigurator.SCENE);
WsRequestVO requestVO = new WsRequestVO();
requestVO.setSceneEnum(WebSocketSceneEnum.valueOf(scene));
requestVO.setMessage(message);
requestVO.setSid(sid);
SnailSpringContext.getContext().publishEvent(requestVO);
}
//错误时调用
@OnError
public void onError(Session session, Throwable throwable) {
log.error("发生错误{}", throwable);
}
}