feat(1.5.0-beta1): 优化日志组件
This commit is contained in:
parent
5a5fd8e6ff
commit
98b5135d2d
@ -47,6 +47,15 @@
|
||||
<groupId>com.aizuda</groupId>
|
||||
<artifactId>snail-job-common-server-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mapstruct</groupId>
|
||||
<artifactId>mapstruct</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mapstruct</groupId>
|
||||
<artifactId>mapstruct-processor</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -7,6 +7,8 @@ import com.aizuda.snailjob.template.datasource.access.task.RetryDeadLetterTaskAc
|
||||
import com.aizuda.snailjob.template.datasource.access.task.RetryTaskAccess;
|
||||
import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
|
||||
import com.aizuda.snailjob.template.datasource.exception.SnailJobDatasourceException;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.*;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -103,4 +105,27 @@ public class AccessTemplate {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取通知配置操作类
|
||||
*
|
||||
* @return {@link NotifyConfigAccess} 获取通知配置操作类
|
||||
*/
|
||||
public LogAccess<RetryTaskLogMessageDO> getRetryTaskLogMessageAccess() {
|
||||
return (LogAccess<RetryTaskLogMessageDO>) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.RETRY_LOG.name()))
|
||||
.orElseThrow(() -> new SnailJobDatasourceException("not supports operation type"));
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 获取通知配置操作类
|
||||
*
|
||||
* @return {@link NotifyConfigAccess} 获取通知配置操作类
|
||||
*/
|
||||
public LogAccess<JobLogMessageDO> getJobLogMessageAccess() {
|
||||
return (LogAccess<JobLogMessageDO>) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.JOB_LOG.name()))
|
||||
.orElseThrow(() -> new SnailJobDatasourceException("not supports operation type"));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,39 @@
|
||||
package com.aizuda.snailjob.template.datasource.access;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-29
|
||||
*/
|
||||
public interface LogAccess<T> extends Access<T> {
|
||||
|
||||
int insert(T t);
|
||||
|
||||
int insertBatch(List<T> list);
|
||||
|
||||
PageResponseDO<T> listPage(PageQueryDO queryDO);
|
||||
|
||||
List<T> list(ListQueryDO queryDO);
|
||||
|
||||
T one(OneQueryDO query);
|
||||
|
||||
int update(T t, UpdateQueryDO query);
|
||||
|
||||
int updateById(T t);
|
||||
|
||||
int deleteById(Serializable id);
|
||||
|
||||
int delete(DeleteQueryDO query);
|
||||
|
||||
long count(LambdaQueryWrapper<T> query);
|
||||
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
package com.aizuda.snailjob.template.datasource.access.log;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.access.LogAccess;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
public abstract class AbstractLogAccess<T> implements LogAccess<T> {
|
||||
}
|
@ -0,0 +1,114 @@
|
||||
package com.aizuda.snailjob.template.datasource.access.log;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.access.LogAccess;
|
||||
import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
|
||||
import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
|
||||
import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
//@ConditionalOnMissingBean(LogAccess.class)
|
||||
@RequiredArgsConstructor
|
||||
public class JobLogMessageAccess extends AbstractLogAccess<JobLogMessageDO> {
|
||||
private final JobLogMessageMapper jobLogMessageMapper;
|
||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||
|
||||
@Override
|
||||
public boolean supports(String operationType) {
|
||||
return DbTypeEnum.all().contains(getDbType()) && OperationTypeEnum.JOB_LOG.name().equals(operationType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int insert(JobLogMessageDO jobLogMessageDO) {
|
||||
JobLogMessage jobLogMessage = LogConverter.INSTANCE.toJobLogMessage(jobLogMessageDO);
|
||||
return jobLogMessageMapper.insert(jobLogMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int insertBatch(List<JobLogMessageDO> list) {
|
||||
List<JobLogMessage> jobLogMessages = LogConverter.INSTANCE.toJobLogMessages(list);
|
||||
return jobLogMessageMapper.insertBatch(jobLogMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageResponseDO<JobLogMessageDO> listPage(PageQueryDO queryDO) {
|
||||
LogPageQueryDO logPageQueryDO = (LogPageQueryDO) queryDO;
|
||||
|
||||
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(
|
||||
new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()),
|
||||
new LambdaQueryWrapper<JobLogMessage>()
|
||||
.ge(JobLogMessage::getId, logPageQueryDO.getStartId())
|
||||
.eq(JobLogMessage::getTaskBatchId, logPageQueryDO.getTaskBatchId())
|
||||
.eq(JobLogMessage::getTaskId, logPageQueryDO.getTaskId())
|
||||
.orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
|
||||
List<JobLogMessage> records = selectPage.getRecords();
|
||||
|
||||
PageResponseDO<JobLogMessageDO> responseDO = new PageResponseDO<>();
|
||||
responseDO.setPage(selectPage.getCurrent());
|
||||
responseDO.setSize(selectPage.getSize());
|
||||
responseDO.setTotal(selectPage.getTotal());
|
||||
responseDO.setRows(LogConverter.INSTANCE.toJobLogMessageDOList(records));
|
||||
return responseDO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<JobLogMessageDO> list(ListQueryDO queryDO) {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobLogMessageDO one(OneQueryDO query) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int update(JobLogMessageDO jobLogMessageDO, UpdateQueryDO query) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int updateById(JobLogMessageDO jobLogMessageDO) {
|
||||
JobLogMessage jobLogMessage = LogConverter.INSTANCE.toJobLogMessage(jobLogMessageDO);
|
||||
return jobLogMessageMapper.updateById(jobLogMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteById(Serializable id) {
|
||||
return jobLogMessageMapper.deleteById(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int delete(DeleteQueryDO query) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count(LambdaQueryWrapper<JobLogMessageDO> query) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
package com.aizuda.snailjob.template.datasource.access.log;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@Mapper
|
||||
public interface LogConverter {
|
||||
|
||||
LogConverter INSTANCE = Mappers.getMapper(LogConverter.class);
|
||||
|
||||
JobLogMessage toJobLogMessage(JobLogMessageDO logMessage);
|
||||
|
||||
List<JobLogMessage> toJobLogMessages(List<JobLogMessageDO> logMessages);
|
||||
|
||||
JobLogMessageDO toJobLogMessageDO(JobLogMessage logMessage);
|
||||
|
||||
List<JobLogMessageDO> toJobLogMessageDOList(List<JobLogMessage> logMessages);
|
||||
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
package com.aizuda.snailjob.template.datasource.access.log;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
|
||||
import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-29
|
||||
*/
|
||||
@Component
|
||||
public class RetryTaskLogMessageAccess extends AbstractLogAccess<RetryTaskLogMessageDO> {
|
||||
|
||||
|
||||
@Override
|
||||
public boolean supports(String operationType) {
|
||||
return DbTypeEnum.all().contains(getDbType()) && OperationTypeEnum.RETRY_LOG.name().equals(operationType);
|
||||
|
||||
}
|
||||
@Override
|
||||
public int insert(RetryTaskLogMessageDO retryTaskLogMessageDO) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int insertBatch(List<RetryTaskLogMessageDO> list) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageResponseDO listPage(PageQueryDO queryDO) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RetryTaskLogMessageDO> list(ListQueryDO queryDO) {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RetryTaskLogMessageDO one(OneQueryDO query) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int update(RetryTaskLogMessageDO retryTaskLogMessageDO, UpdateQueryDO query) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int updateById(RetryTaskLogMessageDO retryTaskLogMessageDO) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteById(Serializable id) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int delete(DeleteQueryDO query) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count(LambdaQueryWrapper<RetryTaskLogMessageDO> query) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -5,6 +5,9 @@ import com.baomidou.mybatisplus.annotation.DbType;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* DB数据库类型
|
||||
*
|
||||
@ -37,4 +40,8 @@ public enum DbTypeEnum {
|
||||
|
||||
throw new SnailJobDatasourceException("暂不支持此数据库 [{}]", db);
|
||||
}
|
||||
|
||||
public static List<DbTypeEnum> all() {
|
||||
return Arrays.asList(DbTypeEnum.values());
|
||||
}
|
||||
}
|
||||
|
@ -13,5 +13,7 @@ public enum OperationTypeEnum {
|
||||
GROUP,
|
||||
RETRY_TASK,
|
||||
RETRY,
|
||||
RETRY_DEAD_LETTER
|
||||
RETRY_DEAD_LETTER,
|
||||
RETRY_LOG,
|
||||
JOB_LOG,
|
||||
}
|
||||
|
@ -0,0 +1,12 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
public class CountQueryDO {
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
public class DeleteQueryDO {
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
public class ListQueryDO {
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
public class OneQueryDO {
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@Data
|
||||
public class PageQueryDO {
|
||||
|
||||
/**
|
||||
* 当前页码
|
||||
*/
|
||||
private int page = 1;
|
||||
|
||||
/**
|
||||
* 每页条数
|
||||
*/
|
||||
private int size = 10;
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@Data
|
||||
public class PageResponseDO<T> {
|
||||
|
||||
private List<T> rows;
|
||||
|
||||
private long total;
|
||||
private long page;
|
||||
private long size;
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
public class UpdateQueryDO {
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.log;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@Data
|
||||
public class JobLogMessageDO {
|
||||
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 命名空间
|
||||
*/
|
||||
private String namespaceId;
|
||||
|
||||
/**
|
||||
* 组名称
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 任务信息id
|
||||
*/
|
||||
private Long jobId;
|
||||
|
||||
/**
|
||||
* 任务实例id
|
||||
*/
|
||||
private Long taskBatchId;
|
||||
|
||||
/**
|
||||
* 调度任务id
|
||||
*/
|
||||
private Long taskId;
|
||||
|
||||
/**
|
||||
* 日志数量
|
||||
*/
|
||||
private Integer logNum;
|
||||
|
||||
/**
|
||||
* 调度信息
|
||||
*/
|
||||
private String message;
|
||||
|
||||
/**
|
||||
* 真实上报时间
|
||||
*/
|
||||
private Long realTime;
|
||||
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.log;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageQueryDO;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-29
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class LogPageQueryDO extends PageQueryDO {
|
||||
private Long startId;
|
||||
private Long jobId;
|
||||
private Long taskBatchId;
|
||||
private Long taskId;
|
||||
private Integer fromIndex;
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.dataobject.log;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-29
|
||||
*/
|
||||
@Data
|
||||
public class RetryTaskLogMessageDO {
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 命名空间
|
||||
*/
|
||||
private String namespaceId;
|
||||
|
||||
/**
|
||||
* 组名称
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 重试任务id
|
||||
*/
|
||||
private Long retryTaskId;
|
||||
|
||||
/**
|
||||
* 重试信息Id
|
||||
*/
|
||||
private Long retryId;
|
||||
|
||||
/**
|
||||
* 异常信息
|
||||
*/
|
||||
private String message;
|
||||
|
||||
/**
|
||||
* 日志数量
|
||||
*/
|
||||
private Integer logNum;
|
||||
|
||||
/**
|
||||
* 真实上报时间
|
||||
*/
|
||||
private Long realTime;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private LocalDateTime createDt;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private LocalDateTime updateDt;
|
||||
}
|
@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.common.service;
|
||||
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
|
||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
||||
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
|
||||
import jakarta.websocket.Session;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@ -19,5 +18,5 @@ import java.util.List;
|
||||
public interface LogService {
|
||||
void saveLog(JobLogDTO jobLogDTO);
|
||||
void batchSaveLogs(List<JobLogTaskDTO> jobLogTasks);
|
||||
void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException;
|
||||
void getJobLogPage(JobLogQueryVO queryVO, String sid) throws IOException;
|
||||
}
|
||||
|
@ -9,8 +9,6 @@ import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO;
|
||||
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
|
||||
import com.aizuda.snailjob.server.common.service.LogService;
|
||||
import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter;
|
||||
import com.aizuda.snailjob.server.common.timer.JobTaskLogTimerTask;
|
||||
import com.aizuda.snailjob.server.common.timer.LogTimerWheel;
|
||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
||||
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
|
||||
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
|
||||
@ -21,13 +19,9 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import com.google.common.collect.Lists;
|
||||
import jakarta.websocket.Session;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
@ -93,8 +87,8 @@ public class DatabaseLogService implements LogService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException {
|
||||
Boolean taskBatchComplete = false;
|
||||
public void getJobLogPage(JobLogQueryVO queryVO, String sid) {
|
||||
boolean taskBatchComplete = false;
|
||||
while (!taskBatchComplete) {
|
||||
PageDTO<JobLogMessage> pageDTO = new PageDTO<>(1, queryVO.getSize());
|
||||
|
||||
@ -104,7 +98,8 @@ public class DatabaseLogService implements LogService {
|
||||
.ge(JobLogMessage::getId, queryVO.getStartId())
|
||||
.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
|
||||
.eq(JobLogMessage::getTaskId, queryVO.getTaskId())
|
||||
.orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
|
||||
.orderByAsc(JobLogMessage::getId)
|
||||
.orderByAsc(JobLogMessage::getRealTime));
|
||||
List<JobLogMessage> records = selectPage.getRecords();
|
||||
if (CollUtil.isEmpty(records)) {
|
||||
|
||||
@ -122,10 +117,9 @@ public class DatabaseLogService implements LogService {
|
||||
jobLogResponseVO.setFinished(Boolean.TRUE);
|
||||
jobLogResponseVO.setNextStartId(queryVO.getStartId());
|
||||
jobLogResponseVO.setFromIndex(0);
|
||||
session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
|
||||
return;
|
||||
} else {
|
||||
scheduleNextAttempt(queryVO, session);
|
||||
// scheduleNextAttempt(queryVO, sid);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -180,7 +174,7 @@ public class DatabaseLogService implements LogService {
|
||||
jobLogResponseVO.setMessage(messages);
|
||||
jobLogResponseVO.setNextStartId(nextStartId);
|
||||
jobLogResponseVO.setFromIndex(fromIndex);
|
||||
session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
|
||||
// session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
|
||||
|
||||
queryVO.setFromIndex(fromIndex);
|
||||
queryVO.setStartId(nextStartId);
|
||||
@ -188,22 +182,4 @@ public class DatabaseLogService implements LogService {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用时间轮5秒再进行日志查询
|
||||
*
|
||||
* @param queryVO
|
||||
* @param session
|
||||
*/
|
||||
private void scheduleNextAttempt(JobLogQueryVO queryVO, Session session) {
|
||||
if (TransactionSynchronizationManager.isActualTransactionActive()) {
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,4 +17,5 @@ public class JobLogQueryVO extends BaseQueryVO {
|
||||
private Long taskBatchId;
|
||||
private Long taskId;
|
||||
private Integer fromIndex;
|
||||
private String sid;
|
||||
}
|
||||
|
@ -4,9 +4,11 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
|
||||
import com.aizuda.snailjob.server.common.service.LogService;
|
||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
||||
import com.aizuda.snailjob.server.common.vo.WsRequestVO;
|
||||
import com.aizuda.snailjob.server.web.model.event.WsRequestEvent;
|
||||
import com.aizuda.snailjob.server.web.service.JobLogService;
|
||||
import jakarta.websocket.Session;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -25,23 +27,28 @@ import static com.aizuda.snailjob.server.web.socket.LogServer.USER_SESSION;
|
||||
*/
|
||||
@Component
|
||||
@AllArgsConstructor
|
||||
@Slf4j
|
||||
public class WsRequestListener {
|
||||
private final LogService logService;
|
||||
private final JobLogService jobLogService;
|
||||
|
||||
@Async
|
||||
@EventListener(classes = WsRequestVO.class)
|
||||
public void getJobLogs(WsRequestVO requestVO) {
|
||||
@EventListener(classes = WsRequestEvent.class)
|
||||
public void getJobLogs(WsRequestEvent requestVO) {
|
||||
if (!WebSocketSceneEnum.JOB_LOG_SCENE.equals(requestVO.getSceneEnum())) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("getJobLogs {}", requestVO.getSid());
|
||||
String message = requestVO.getMessage();
|
||||
JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class);
|
||||
Session session = USER_SESSION.get(requestVO.getSid());
|
||||
jobLogQueryVO.setSid(requestVO.getSid());
|
||||
jobLogQueryVO.setStartId(0L);
|
||||
try {
|
||||
logService.getJobLogPage(jobLogQueryVO, session);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
jobLogService.getJobLogPageV2(jobLogQueryVO);
|
||||
} catch (Exception e) {
|
||||
log.warn("send log error", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,70 @@
|
||||
package com.aizuda.snailjob.server.web.model.dto;
|
||||
|
||||
import com.aizuda.snailjob.server.common.dto.PartitionTask;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class JobLogMessagePartitionTask extends PartitionTask {
|
||||
|
||||
/**
|
||||
* 命名空间
|
||||
*/
|
||||
private String namespaceId;
|
||||
|
||||
/**
|
||||
* 组名称
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 任务信息id
|
||||
*/
|
||||
private Long jobId;
|
||||
|
||||
/**
|
||||
* 任务实例id
|
||||
*/
|
||||
private Long taskBatchId;
|
||||
|
||||
/**
|
||||
* 调度任务id
|
||||
*/
|
||||
private Long taskId;
|
||||
|
||||
/**
|
||||
* 日志数量
|
||||
*/
|
||||
private Integer logNum;
|
||||
|
||||
/**
|
||||
* 调度信息
|
||||
*/
|
||||
private String message;
|
||||
|
||||
/**
|
||||
* 真实上报时间
|
||||
*/
|
||||
private Long realTime;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private LocalDateTime createDt;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private LocalDateTime updateDt;
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
package com.aizuda.snailjob.server.web.model.dto;
|
||||
|
||||
import com.aizuda.snailjob.server.common.dto.PartitionTask;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class LogMessagePartitionTask extends PartitionTask {
|
||||
|
||||
/**
|
||||
* 命名空间
|
||||
*/
|
||||
private String namespaceId;
|
||||
|
||||
/**
|
||||
* 组名称
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 重试任务id
|
||||
*/
|
||||
private Long retryTaskId;
|
||||
|
||||
/**
|
||||
* 重试信息Id
|
||||
*/
|
||||
private Long retryId;
|
||||
|
||||
/**
|
||||
* 异常信息
|
||||
*/
|
||||
private String message;
|
||||
|
||||
/**
|
||||
* 日志数量
|
||||
*/
|
||||
private Integer logNum;
|
||||
|
||||
/**
|
||||
* 真实上报时间
|
||||
*/
|
||||
private Long realTime;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private LocalDateTime createDt;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private LocalDateTime updateDt;
|
||||
}
|
@ -1,7 +1,10 @@
|
||||
package com.aizuda.snailjob.server.common.vo;
|
||||
package com.aizuda.snailjob.server.web.model.event;
|
||||
|
||||
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
* @Author:srzou
|
||||
@ -11,8 +14,9 @@ import lombok.Data;
|
||||
* @Filename:WebSocketRequestVO
|
||||
* @since 1.5.0
|
||||
*/
|
||||
@Data
|
||||
public class WsRequestVO {
|
||||
@Setter
|
||||
@Getter
|
||||
public class WsRequestEvent extends ApplicationEvent {
|
||||
/**
|
||||
* wb类型
|
||||
*/
|
||||
@ -24,4 +28,8 @@ public class WsRequestVO {
|
||||
private String message;
|
||||
|
||||
private WebSocketSceneEnum sceneEnum;
|
||||
|
||||
public WsRequestEvent(Object source) {
|
||||
super(source);
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package com.aizuda.snailjob.server.web.model.event;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-01-18
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
public class WsSendEvent extends ApplicationEvent {
|
||||
|
||||
/**
|
||||
* 会话id
|
||||
*/
|
||||
private String sid;
|
||||
|
||||
/**
|
||||
* 需要发送的消息
|
||||
*/
|
||||
private String message;
|
||||
|
||||
public WsSendEvent(Object source) {
|
||||
super(source);
|
||||
}
|
||||
}
|
@ -11,4 +11,7 @@ import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
|
||||
public interface JobLogService {
|
||||
|
||||
JobLogResponseVO getJobLogPage(JobLogQueryVO jobQueryVO);
|
||||
|
||||
void getJobLogPageV2(JobLogQueryVO jobQueryVO);
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,29 @@
|
||||
package com.aizuda.snailjob.server.web.service.convert;
|
||||
|
||||
import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask;
|
||||
import com.aizuda.snailjob.server.web.model.dto.LogMessagePartitionTask;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.Mapping;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
*
|
||||
* </p>
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2025-03-30
|
||||
*/
|
||||
@Mapper
|
||||
public interface LogMessagePartitionTaskConverter {
|
||||
LogMessagePartitionTaskConverter INSTANCE = Mappers.getMapper(LogMessagePartitionTaskConverter.class);
|
||||
|
||||
List<LogMessagePartitionTask> toLogMessagePartitionTask(List<RetryTaskLogMessageDO> retryTaskLogMessageList);
|
||||
|
||||
|
||||
List<JobLogMessagePartitionTask> toJobLogMessagePartitionTasks(List<JobLogMessageDO> jobLogMessageDOS);
|
||||
}
|
@ -1,23 +1,38 @@
|
||||
package com.aizuda.snailjob.server.web.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
|
||||
import com.aizuda.snailjob.server.common.dto.PartitionTask;
|
||||
import com.aizuda.snailjob.server.web.timer.JobTaskLogTimerTask;
|
||||
import com.aizuda.snailjob.server.web.timer.LogTimerWheel;
|
||||
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
|
||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
||||
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
|
||||
import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask;
|
||||
import com.aizuda.snailjob.server.web.model.event.WsSendEvent;
|
||||
import com.aizuda.snailjob.server.web.service.JobLogService;
|
||||
import com.aizuda.snailjob.server.web.service.convert.LogMessagePartitionTaskConverter;
|
||||
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
|
||||
@ -30,8 +45,10 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class JobLogServiceImpl implements JobLogService {
|
||||
private static final Long DELAY_MILLS = 5000L;
|
||||
private final JobLogMessageMapper jobLogMessageMapper;
|
||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||
private final AccessTemplate accessTemplate;
|
||||
|
||||
@Override
|
||||
public JobLogResponseVO getJobLogPage(final JobLogQueryVO queryVO) {
|
||||
@ -121,4 +138,85 @@ public class JobLogServiceImpl implements JobLogService {
|
||||
jobLogResponseVO.setFromIndex(fromIndex);
|
||||
return jobLogResponseVO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getJobLogPageV2(JobLogQueryVO queryVO) {
|
||||
String sid = queryVO.getSid();
|
||||
LogPageQueryDO pageQueryDO = new LogPageQueryDO();
|
||||
pageQueryDO.setPage(1);
|
||||
pageQueryDO.setSize(queryVO.getSize());
|
||||
pageQueryDO.setTaskBatchId(queryVO.getTaskBatchId());
|
||||
pageQueryDO.setTaskId(queryVO.getTaskId());
|
||||
pageQueryDO.setStartId(queryVO.getStartId());
|
||||
PartitionTaskUtils.process(startId -> {
|
||||
// 记录下次起始时间
|
||||
queryVO.setStartId(startId);
|
||||
pageQueryDO.setStartId(startId);
|
||||
// 拉去数据
|
||||
PageResponseDO<JobLogMessageDO> pageResponseDO = accessTemplate.getJobLogMessageAccess()
|
||||
.listPage(pageQueryDO);
|
||||
List<JobLogMessageDO> rows = pageResponseDO.getRows();
|
||||
return LogMessagePartitionTaskConverter.INSTANCE.toJobLogMessagePartitionTasks(rows);
|
||||
}, new Consumer<>() {
|
||||
@Override
|
||||
public void accept(List<? extends PartitionTask> partitionTasks) {
|
||||
|
||||
List<JobLogMessagePartitionTask> partitionTaskList = (List<JobLogMessagePartitionTask>) partitionTasks;
|
||||
|
||||
for (JobLogMessagePartitionTask logMessagePartitionTask : partitionTaskList) {
|
||||
// 发生日志内容到前端
|
||||
String message = logMessagePartitionTask.getMessage();
|
||||
List<Map<String, String>> logContents = JsonUtil.parseObject(message, List.class);
|
||||
logContents = logContents.stream()
|
||||
.sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP))))
|
||||
.toList();
|
||||
for (Map<String, String> logContent : logContents) {
|
||||
// send发消息
|
||||
WsSendEvent sendEvent = new WsSendEvent(this);
|
||||
sendEvent.setSid(sid);
|
||||
sendEvent.setMessage(JsonUtil.toJsonString(logContent));
|
||||
SnailSpringContext.getContext().publishEvent(sendEvent);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}, new Predicate<>() {
|
||||
@Override
|
||||
public boolean apply(List<? extends PartitionTask> rows) {
|
||||
|
||||
// 决策是否完成
|
||||
if (!CollUtil.isEmpty(rows)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(
|
||||
new LambdaQueryWrapper<JobTaskBatch>().eq(JobTaskBatch::getId, queryVO.getTaskBatchId()));
|
||||
|
||||
if (Objects.isNull(jobTaskBatch)
|
||||
|| (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
|
||||
jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) {
|
||||
// 发生完成标识
|
||||
WsSendEvent sendEvent = new WsSendEvent(this);
|
||||
sendEvent.setMessage("END");
|
||||
sendEvent.setSid(sid);
|
||||
SnailSpringContext.getContext().publishEvent(sendEvent);
|
||||
return true;
|
||||
} else {
|
||||
scheduleNextAttempt(queryVO, sid);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}, queryVO.getStartId());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用时间轮5秒再进行日志查询
|
||||
*
|
||||
* @param queryVO
|
||||
* @param sid
|
||||
*/
|
||||
private void scheduleNextAttempt(JobLogQueryVO queryVO, String sid) {
|
||||
LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, sid), Duration.ofMillis(DELAY_MILLS));
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,16 @@
|
||||
package com.aizuda.snailjob.server.web.socket;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
|
||||
import com.aizuda.snailjob.server.common.vo.WsRequestVO;
|
||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||
import com.aizuda.snailjob.server.web.model.event.WsSendEvent;
|
||||
import com.aizuda.snailjob.server.web.model.event.WsRequestEvent;
|
||||
import com.aizuda.snailjob.server.web.config.WebSocketConfigurator;
|
||||
import jakarta.websocket.*;
|
||||
import jakarta.websocket.server.PathParam;
|
||||
import jakarta.websocket.server.ServerEndpoint;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -24,33 +27,59 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ServerEndpoint(value = "/webSocket", configurator = WebSocketConfigurator.class)
|
||||
@ServerEndpoint(value = "/websocket", configurator = WebSocketConfigurator.class)
|
||||
public class LogServer {
|
||||
|
||||
// 缓存session
|
||||
public static final ConcurrentHashMap<String, Session> USER_SESSION = new ConcurrentHashMap<>();
|
||||
|
||||
@EventListener
|
||||
public void sendMessage(WsSendEvent message) throws IOException {
|
||||
Session session = USER_SESSION.get(message.getSid());
|
||||
Assert.notNull(session, () -> new SnailJobServerException("ws session not exist"));
|
||||
if (session.isOpen()) {
|
||||
synchronized (session) {
|
||||
session.getBasicRemote().sendText(message.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@OnOpen
|
||||
public void onOpen(Session session) {
|
||||
Map<String, Object> userProperties = session.getUserProperties();
|
||||
USER_SESSION.put((String) userProperties.get(WebSocketConfigurator.SID), session);
|
||||
String sid = (String) userProperties.get(WebSocketConfigurator.SID);
|
||||
USER_SESSION.put(sid, session);
|
||||
log.info("sid:[{}] websocket started", sid);
|
||||
}
|
||||
|
||||
|
||||
@OnMessage
|
||||
public void onMessage(String message, Session session) throws IOException, EncodeException {
|
||||
public void onMessage(String message, Session session) {
|
||||
// 接收请求
|
||||
Map<String, Object> userProperties = session.getUserProperties();
|
||||
String sid = (String) userProperties.get(WebSocketConfigurator.SID);
|
||||
String scene = (String) userProperties.get(WebSocketConfigurator.SCENE);
|
||||
WsRequestVO requestVO = new WsRequestVO();
|
||||
WsRequestEvent requestVO = new WsRequestEvent(this);
|
||||
requestVO.setSceneEnum(WebSocketSceneEnum.valueOf(scene));
|
||||
requestVO.setMessage(message);
|
||||
requestVO.setSid(sid);
|
||||
SnailSpringContext.getContext().publishEvent(requestVO);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 连接关闭时触发
|
||||
*/
|
||||
@OnClose
|
||||
public void onClose(Session session) {
|
||||
|
||||
Map<String, Object> userProperties = session.getUserProperties();
|
||||
String sid = (String) userProperties.get(WebSocketConfigurator.SID);
|
||||
|
||||
log.info("sid:[{}] websocket closed", sid);
|
||||
USER_SESSION.remove(sid);
|
||||
}
|
||||
|
||||
//错误时调用
|
||||
@OnError
|
||||
public void onError(Session session, Throwable throwable) {
|
||||
|
@ -1,17 +1,17 @@
|
||||
package com.aizuda.snailjob.server.common.timer;
|
||||
package com.aizuda.snailjob.server.web.timer;
|
||||
|
||||
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.TimerTask;
|
||||
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
|
||||
import com.aizuda.snailjob.server.common.service.LogService;
|
||||
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
|
||||
import com.aizuda.snailjob.server.web.service.JobLogService;
|
||||
import io.netty.util.Timeout;
|
||||
import jakarta.websocket.Session;
|
||||
import lombok.AllArgsConstructor;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @Author:srzou
|
||||
@ -23,11 +23,9 @@ import java.util.Map;
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class JobTaskLogTimerTask implements TimerTask<String> {
|
||||
private static final String SID = "sid";
|
||||
private static final String SCENE = "scene";
|
||||
private static final String IDEMPOTENT_KEY_PREFIX = "jobTaskLog_{0}_{1}_{2}";
|
||||
private JobLogQueryVO logQueryVO;
|
||||
private Session session;
|
||||
private String sid;
|
||||
|
||||
@Override
|
||||
public void run(final Timeout timeout) throws Exception {
|
||||
@ -35,8 +33,8 @@ public class JobTaskLogTimerTask implements TimerTask<String> {
|
||||
|
||||
try {
|
||||
LogTimerWheel.clearCache(idempotentKey());
|
||||
LogService logService = SnailSpringContext.getBean(LogService.class);
|
||||
logService.getJobLogPage(logQueryVO, session);
|
||||
JobLogService logService = SnailSpringContext.getBean(JobLogService.class);
|
||||
logService.getJobLogPageV2(logQueryVO);
|
||||
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.error("定时任务日志查询执行失败", e);
|
||||
@ -45,10 +43,8 @@ public class JobTaskLogTimerTask implements TimerTask<String> {
|
||||
|
||||
@Override
|
||||
public String idempotentKey() {
|
||||
Map<String, Object> userProperties = session.getUserProperties();
|
||||
String sid = (String) userProperties.get(SID);
|
||||
String scene = (String) userProperties.get(SCENE);
|
||||
|
||||
Long jobTaskId = logQueryVO.getTaskBatchId();
|
||||
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, scene, jobTaskId);
|
||||
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, jobTaskId);
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.aizuda.snailjob.server.common.timer;
|
||||
package com.aizuda.snailjob.server.web.timer;
|
||||
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.TimerTask;
|
Loading…
Reference in New Issue
Block a user