From 98b5135d2d5009127deffae9103bd8ab332b7b90 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sun, 30 Mar 2025 18:37:30 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.5.0-beta1):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snail-job-datasource-template/pom.xml | 9 ++ .../datasource/access/AccessTemplate.java | 25 ++++ .../template/datasource/access/LogAccess.java | 39 ++++++ .../access/log/AbstractLogAccess.java | 14 +++ .../access/log/JobLogMessageAccess.java | 114 ++++++++++++++++++ .../datasource/access/log/LogConverter.java | 31 +++++ .../access/log/RetryTaskLogMessageAccess.java | 83 +++++++++++++ .../template/datasource/enums/DbTypeEnum.java | 7 ++ .../datasource/enums/OperationTypeEnum.java | 4 +- .../dataobject/common/CountQueryDO.java | 12 ++ .../dataobject/common/DeleteQueryDO.java | 12 ++ .../dataobject/common/ListQueryDO.java | 12 ++ .../dataobject/common/OneQueryDO.java | 12 ++ .../dataobject/common/PageQueryDO.java | 25 ++++ .../dataobject/common/PageResponseDO.java | 23 ++++ .../dataobject/common/UpdateQueryDO.java | 12 ++ .../dataobject/log/JobLogMessageDO.java | 58 +++++++++ .../dataobject/log/LogPageQueryDO.java | 23 ++++ .../dataobject/log/RetryTaskLogMessageDO.java | 67 ++++++++++ .../server/common/service/LogService.java | 3 +- .../service/impl/DatabaseLogService.java | 36 +----- .../server/common/vo/JobLogQueryVO.java | 1 + .../web/listener/WsRequestListener.java | 23 ++-- .../model/dto/JobLogMessagePartitionTask.java | 70 +++++++++++ .../model/dto/LogMessagePartitionTask.java | 65 ++++++++++ .../web/model/event/WsRequestEvent.java} | 14 ++- .../server/web/model/event/WsSendEvent.java | 32 +++++ .../server/web/service/JobLogService.java | 3 + .../LogMessagePartitionTaskConverter.java | 29 +++++ .../web/service/impl/JobLogServiceImpl.java | 102 +++++++++++++++- .../snailjob/server/web/socket/LogServer.java | 43 +++++-- .../web}/timer/JobTaskLogTimerTask.java | 20 ++- .../server/web}/timer/LogTimerWheel.java | 2 +- 33 files changed, 959 insertions(+), 66 deletions(-) create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/LogAccess.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/AbstractLogAccess.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/CountQueryDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/DeleteQueryDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/ListQueryDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/OneQueryDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageQueryDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageResponseDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/UpdateQueryDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/JobLogMessageDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageDO.java create mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/JobLogMessagePartitionTask.java create mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/LogMessagePartitionTask.java rename snail-job-server/{snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java => snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsRequestEvent.java} (59%) create mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsSendEvent.java create mode 100644 snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/LogMessagePartitionTaskConverter.java rename snail-job-server/{snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common => snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web}/timer/JobTaskLogTimerTask.java (71%) rename snail-job-server/{snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common => snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web}/timer/LogTimerWheel.java (98%) diff --git a/snail-job-datasource/snail-job-datasource-template/pom.xml b/snail-job-datasource/snail-job-datasource-template/pom.xml index a51489f5b..36e8a440c 100644 --- a/snail-job-datasource/snail-job-datasource-template/pom.xml +++ b/snail-job-datasource/snail-job-datasource-template/pom.xml @@ -47,6 +47,15 @@ com.aizuda snail-job-common-server-api + + org.mapstruct + mapstruct + + + org.mapstruct + mapstruct-processor + + diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java index 4f84f4983..0015fbdc4 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java @@ -7,6 +7,8 @@ import com.aizuda.snailjob.template.datasource.access.task.RetryDeadLetterTaskAc import com.aizuda.snailjob.template.datasource.access.task.RetryTaskAccess; import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum; import com.aizuda.snailjob.template.datasource.exception.SnailJobDatasourceException; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; import com.aizuda.snailjob.template.datasource.persistence.po.*; import org.springframework.stereotype.Component; @@ -103,4 +105,27 @@ public class AccessTemplate { } + /** + * 获取通知配置操作类 + * + * @return {@link NotifyConfigAccess} 获取通知配置操作类 + */ + public LogAccess getRetryTaskLogMessageAccess() { + return (LogAccess) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.RETRY_LOG.name())) + .orElseThrow(() -> new SnailJobDatasourceException("not supports operation type")); + + } + + + /** + * 获取通知配置操作类 + * + * @return {@link NotifyConfigAccess} 获取通知配置操作类 + */ + public LogAccess getJobLogMessageAccess() { + return (LogAccess) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.JOB_LOG.name())) + .orElseThrow(() -> new SnailJobDatasourceException("not supports operation type")); + + } + } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/LogAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/LogAccess.java new file mode 100644 index 000000000..10ad36666 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/LogAccess.java @@ -0,0 +1,39 @@ +package com.aizuda.snailjob.template.datasource.access; + +import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; + +import java.io.Serializable; +import java.util.List; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-29 + */ +public interface LogAccess extends Access { + + int insert(T t); + + int insertBatch(List list); + + PageResponseDO listPage(PageQueryDO queryDO); + + List list(ListQueryDO queryDO); + + T one(OneQueryDO query); + + int update(T t, UpdateQueryDO query); + + int updateById(T t); + + int deleteById(Serializable id); + + int delete(DeleteQueryDO query); + + long count(LambdaQueryWrapper query); + +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/AbstractLogAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/AbstractLogAccess.java new file mode 100644 index 000000000..de1214c85 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/AbstractLogAccess.java @@ -0,0 +1,14 @@ +package com.aizuda.snailjob.template.datasource.access.log; + +import com.aizuda.snailjob.template.datasource.access.LogAccess; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +public abstract class AbstractLogAccess implements LogAccess { +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java new file mode 100644 index 000000000..d121e0183 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java @@ -0,0 +1,114 @@ +package com.aizuda.snailjob.template.datasource.access.log; + +import com.aizuda.snailjob.template.datasource.access.LogAccess; +import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum; +import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.stereotype.Component; + +import java.io.Serializable; +import java.util.*; + +import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@Component +@Slf4j +//@ConditionalOnMissingBean(LogAccess.class) +@RequiredArgsConstructor +public class JobLogMessageAccess extends AbstractLogAccess { + private final JobLogMessageMapper jobLogMessageMapper; + private final JobTaskBatchMapper jobTaskBatchMapper; + + @Override + public boolean supports(String operationType) { + return DbTypeEnum.all().contains(getDbType()) && OperationTypeEnum.JOB_LOG.name().equals(operationType); + } + + @Override + public int insert(JobLogMessageDO jobLogMessageDO) { + JobLogMessage jobLogMessage = LogConverter.INSTANCE.toJobLogMessage(jobLogMessageDO); + return jobLogMessageMapper.insert(jobLogMessage); + } + + @Override + public int insertBatch(List list) { + List jobLogMessages = LogConverter.INSTANCE.toJobLogMessages(list); + return jobLogMessageMapper.insertBatch(jobLogMessages); + } + + @Override + public PageResponseDO listPage(PageQueryDO queryDO) { + LogPageQueryDO logPageQueryDO = (LogPageQueryDO) queryDO; + + PageDTO selectPage = jobLogMessageMapper.selectPage( + new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()), + new LambdaQueryWrapper() + .ge(JobLogMessage::getId, logPageQueryDO.getStartId()) + .eq(JobLogMessage::getTaskBatchId, logPageQueryDO.getTaskBatchId()) + .eq(JobLogMessage::getTaskId, logPageQueryDO.getTaskId()) + .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime)); + List records = selectPage.getRecords(); + + PageResponseDO responseDO = new PageResponseDO<>(); + responseDO.setPage(selectPage.getCurrent()); + responseDO.setSize(selectPage.getSize()); + responseDO.setTotal(selectPage.getTotal()); + responseDO.setRows(LogConverter.INSTANCE.toJobLogMessageDOList(records)); + return responseDO; + } + + @Override + public List list(ListQueryDO queryDO) { + return List.of(); + } + + @Override + public JobLogMessageDO one(OneQueryDO query) { + return null; + } + + @Override + public int update(JobLogMessageDO jobLogMessageDO, UpdateQueryDO query) { + return 0; + } + + @Override + public int updateById(JobLogMessageDO jobLogMessageDO) { + JobLogMessage jobLogMessage = LogConverter.INSTANCE.toJobLogMessage(jobLogMessageDO); + return jobLogMessageMapper.updateById(jobLogMessage); + } + + @Override + public int deleteById(Serializable id) { + return jobLogMessageMapper.deleteById(id); + } + + @Override + public int delete(DeleteQueryDO query) { + return 0; + } + + @Override + public long count(LambdaQueryWrapper query) { + return 0; + } + +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java new file mode 100644 index 000000000..922d85eb5 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java @@ -0,0 +1,31 @@ +package com.aizuda.snailjob.template.datasource.access.log; + +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@Mapper +public interface LogConverter { + + LogConverter INSTANCE = Mappers.getMapper(LogConverter.class); + + JobLogMessage toJobLogMessage(JobLogMessageDO logMessage); + + List toJobLogMessages(List logMessages); + + JobLogMessageDO toJobLogMessageDO(JobLogMessage logMessage); + + List toJobLogMessageDOList(List logMessages); + +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java new file mode 100644 index 000000000..457091a7e --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java @@ -0,0 +1,83 @@ +package com.aizuda.snailjob.template.datasource.access.log; + +import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum; +import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import org.springframework.stereotype.Component; + +import java.io.Serializable; +import java.util.List; + +import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-29 + */ +@Component +public class RetryTaskLogMessageAccess extends AbstractLogAccess { + + + @Override + public boolean supports(String operationType) { + return DbTypeEnum.all().contains(getDbType()) && OperationTypeEnum.RETRY_LOG.name().equals(operationType); + + } + @Override + public int insert(RetryTaskLogMessageDO retryTaskLogMessageDO) { + return 0; + } + + @Override + public int insertBatch(List list) { + return 0; + } + + @Override + public PageResponseDO listPage(PageQueryDO queryDO) { + return null; + } + + @Override + public List list(ListQueryDO queryDO) { + return List.of(); + } + + @Override + public RetryTaskLogMessageDO one(OneQueryDO query) { + return null; + } + + @Override + public int update(RetryTaskLogMessageDO retryTaskLogMessageDO, UpdateQueryDO query) { + return 0; + } + + @Override + public int updateById(RetryTaskLogMessageDO retryTaskLogMessageDO) { + return 0; + } + + @Override + public int deleteById(Serializable id) { + return 0; + } + + @Override + public int delete(DeleteQueryDO query) { + return 0; + } + + @Override + public long count(LambdaQueryWrapper query) { + return 0; + } + + +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java index eaa8b09e0..eaa9074b2 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java @@ -5,6 +5,9 @@ import com.baomidou.mybatisplus.annotation.DbType; import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.Arrays; +import java.util.List; + /** * DB数据库类型 * @@ -37,4 +40,8 @@ public enum DbTypeEnum { throw new SnailJobDatasourceException("暂不支持此数据库 [{}]", db); } + + public static List all() { + return Arrays.asList(DbTypeEnum.values()); + } } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java index c6e53532a..43197b755 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java @@ -13,5 +13,7 @@ public enum OperationTypeEnum { GROUP, RETRY_TASK, RETRY, - RETRY_DEAD_LETTER + RETRY_DEAD_LETTER, + RETRY_LOG, + JOB_LOG, } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/CountQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/CountQueryDO.java new file mode 100644 index 000000000..ae7c26fa7 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/CountQueryDO.java @@ -0,0 +1,12 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.common; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +public class CountQueryDO { +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/DeleteQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/DeleteQueryDO.java new file mode 100644 index 000000000..683fe2e36 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/DeleteQueryDO.java @@ -0,0 +1,12 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.common; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +public class DeleteQueryDO { +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/ListQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/ListQueryDO.java new file mode 100644 index 000000000..7d091a3b9 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/ListQueryDO.java @@ -0,0 +1,12 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.common; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +public class ListQueryDO { +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/OneQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/OneQueryDO.java new file mode 100644 index 000000000..a2a9ab6ee --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/OneQueryDO.java @@ -0,0 +1,12 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.common; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +public class OneQueryDO { +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageQueryDO.java new file mode 100644 index 000000000..ef34442df --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageQueryDO.java @@ -0,0 +1,25 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.common; + +import lombok.Data; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@Data +public class PageQueryDO { + + /** + * 当前页码 + */ + private int page = 1; + + /** + * 每页条数 + */ + private int size = 10; +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageResponseDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageResponseDO.java new file mode 100644 index 000000000..e57ea6d65 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageResponseDO.java @@ -0,0 +1,23 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.common; + +import lombok.Data; + +import java.util.List; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@Data +public class PageResponseDO { + + private List rows; + + private long total; + private long page; + private long size; +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/UpdateQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/UpdateQueryDO.java new file mode 100644 index 000000000..b41594373 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/UpdateQueryDO.java @@ -0,0 +1,12 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.common; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +public class UpdateQueryDO { +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/JobLogMessageDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/JobLogMessageDO.java new file mode 100644 index 000000000..f62775c16 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/JobLogMessageDO.java @@ -0,0 +1,58 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.log; + +import lombok.Data; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@Data +public class JobLogMessageDO { + + private Long id; + + /** + * 命名空间 + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 任务信息id + */ + private Long jobId; + + /** + * 任务实例id + */ + private Long taskBatchId; + + /** + * 调度任务id + */ + private Long taskId; + + /** + * 日志数量 + */ + private Integer logNum; + + /** + * 调度信息 + */ + private String message; + + /** + * 真实上报时间 + */ + private Long realTime; + +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java new file mode 100644 index 000000000..5038bae1e --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java @@ -0,0 +1,23 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.log; + +import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageQueryDO; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-29 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class LogPageQueryDO extends PageQueryDO { + private Long startId; + private Long jobId; + private Long taskBatchId; + private Long taskId; + private Integer fromIndex; +} diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageDO.java new file mode 100644 index 000000000..4f36341c2 --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageDO.java @@ -0,0 +1,67 @@ +package com.aizuda.snailjob.template.datasource.persistence.dataobject.log; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-29 + */ +@Data +public class RetryTaskLogMessageDO { + + /** + * 主键 + */ + private Long id; + + /** + * 命名空间 + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 重试任务id + */ + private Long retryTaskId; + + /** + * 重试信息Id + */ + private Long retryId; + + /** + * 异常信息 + */ + private String message; + + /** + * 日志数量 + */ + private Integer logNum; + + /** + * 真实上报时间 + */ + private Long realTime; + + /** + * 创建时间 + */ + private LocalDateTime createDt; + + /** + * 创建时间 + */ + private LocalDateTime updateDt; +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java index 720a4c8c6..fac3b0a9a 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java @@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.common.service; import com.aizuda.snailjob.server.common.dto.JobLogDTO; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; -import jakarta.websocket.Session; import java.io.IOException; import java.util.List; @@ -19,5 +18,5 @@ import java.util.List; public interface LogService { void saveLog(JobLogDTO jobLogDTO); void batchSaveLogs(List jobLogTasks); - void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException; + void getJobLogPage(JobLogQueryVO queryVO, String sid) throws IOException; } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java index 6a08dde84..54abe67bf 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java @@ -9,8 +9,6 @@ import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO; import com.aizuda.snailjob.server.common.dto.JobLogDTO; import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter; -import com.aizuda.snailjob.server.common.timer.JobTaskLogTimerTask; -import com.aizuda.snailjob.server.common.timer.LogTimerWheel; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; @@ -21,13 +19,9 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; -import jakarta.websocket.Session; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationManager; -import java.io.IOException; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; @@ -93,8 +87,8 @@ public class DatabaseLogService implements LogService { } @Override - public void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException { - Boolean taskBatchComplete = false; + public void getJobLogPage(JobLogQueryVO queryVO, String sid) { + boolean taskBatchComplete = false; while (!taskBatchComplete) { PageDTO pageDTO = new PageDTO<>(1, queryVO.getSize()); @@ -104,7 +98,8 @@ public class DatabaseLogService implements LogService { .ge(JobLogMessage::getId, queryVO.getStartId()) .eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()) .eq(JobLogMessage::getTaskId, queryVO.getTaskId()) - .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime)); + .orderByAsc(JobLogMessage::getId) + .orderByAsc(JobLogMessage::getRealTime)); List records = selectPage.getRecords(); if (CollUtil.isEmpty(records)) { @@ -122,10 +117,9 @@ public class DatabaseLogService implements LogService { jobLogResponseVO.setFinished(Boolean.TRUE); jobLogResponseVO.setNextStartId(queryVO.getStartId()); jobLogResponseVO.setFromIndex(0); - session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO)); return; } else { - scheduleNextAttempt(queryVO, session); +// scheduleNextAttempt(queryVO, sid); return; } } @@ -180,7 +174,7 @@ public class DatabaseLogService implements LogService { jobLogResponseVO.setMessage(messages); jobLogResponseVO.setNextStartId(nextStartId); jobLogResponseVO.setFromIndex(fromIndex); - session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO)); +// session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO)); queryVO.setFromIndex(fromIndex); queryVO.setStartId(nextStartId); @@ -188,22 +182,4 @@ public class DatabaseLogService implements LogService { } - /** - * 使用时间轮5秒再进行日志查询 - * - * @param queryVO - * @param session - */ - private void scheduleNextAttempt(JobLogQueryVO queryVO, Session session) { - if (TransactionSynchronizationManager.isActualTransactionActive()) { - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { - @Override - public void afterCompletion(int status) { - LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS)); - } - }); - } else { - LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS)); - } - } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java index 3e2bae672..af7376309 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java @@ -17,4 +17,5 @@ public class JobLogQueryVO extends BaseQueryVO { private Long taskBatchId; private Long taskId; private Integer fromIndex; + private String sid; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java index 4ef1b3cfe..bc6825483 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java @@ -4,9 +4,11 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; -import com.aizuda.snailjob.server.common.vo.WsRequestVO; +import com.aizuda.snailjob.server.web.model.event.WsRequestEvent; +import com.aizuda.snailjob.server.web.service.JobLogService; import jakarta.websocket.Session; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @@ -25,23 +27,28 @@ import static com.aizuda.snailjob.server.web.socket.LogServer.USER_SESSION; */ @Component @AllArgsConstructor +@Slf4j public class WsRequestListener { - private final LogService logService; + private final JobLogService jobLogService; @Async - @EventListener(classes = WsRequestVO.class) - public void getJobLogs(WsRequestVO requestVO) { + @EventListener(classes = WsRequestEvent.class) + public void getJobLogs(WsRequestEvent requestVO) { if (!WebSocketSceneEnum.JOB_LOG_SCENE.equals(requestVO.getSceneEnum())) { return; } + + log.info("getJobLogs {}", requestVO.getSid()); String message = requestVO.getMessage(); JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class); - Session session = USER_SESSION.get(requestVO.getSid()); + jobLogQueryVO.setSid(requestVO.getSid()); + jobLogQueryVO.setStartId(0L); try { - logService.getJobLogPage(jobLogQueryVO, session); - } catch (IOException e) { - throw new RuntimeException(e); + jobLogService.getJobLogPageV2(jobLogQueryVO); + } catch (Exception e) { + log.warn("send log error", e); } + } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/JobLogMessagePartitionTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/JobLogMessagePartitionTask.java new file mode 100644 index 000000000..fc204eb28 --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/JobLogMessagePartitionTask.java @@ -0,0 +1,70 @@ +package com.aizuda.snailjob.server.web.model.dto; + +import com.aizuda.snailjob.server.common.dto.PartitionTask; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.time.LocalDateTime; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class JobLogMessagePartitionTask extends PartitionTask { + + /** + * 命名空间 + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 任务信息id + */ + private Long jobId; + + /** + * 任务实例id + */ + private Long taskBatchId; + + /** + * 调度任务id + */ + private Long taskId; + + /** + * 日志数量 + */ + private Integer logNum; + + /** + * 调度信息 + */ + private String message; + + /** + * 真实上报时间 + */ + private Long realTime; + + /** + * 创建时间 + */ + private LocalDateTime createDt; + + /** + * 创建时间 + */ + private LocalDateTime updateDt; +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/LogMessagePartitionTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/LogMessagePartitionTask.java new file mode 100644 index 000000000..367b0922f --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/LogMessagePartitionTask.java @@ -0,0 +1,65 @@ +package com.aizuda.snailjob.server.web.model.dto; + +import com.aizuda.snailjob.server.common.dto.PartitionTask; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.time.LocalDateTime; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class LogMessagePartitionTask extends PartitionTask { + + /** + * 命名空间 + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 重试任务id + */ + private Long retryTaskId; + + /** + * 重试信息Id + */ + private Long retryId; + + /** + * 异常信息 + */ + private String message; + + /** + * 日志数量 + */ + private Integer logNum; + + /** + * 真实上报时间 + */ + private Long realTime; + + /** + * 创建时间 + */ + private LocalDateTime createDt; + + /** + * 创建时间 + */ + private LocalDateTime updateDt; +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsRequestEvent.java similarity index 59% rename from snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsRequestEvent.java index 61830457f..807e9f21c 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsRequestEvent.java @@ -1,7 +1,10 @@ -package com.aizuda.snailjob.server.common.vo; +package com.aizuda.snailjob.server.web.model.event; import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; /** * @Author:srzou @@ -11,8 +14,9 @@ import lombok.Data; * @Filename:WebSocketRequestVO * @since 1.5.0 */ -@Data -public class WsRequestVO { +@Setter +@Getter +public class WsRequestEvent extends ApplicationEvent { /** * wb类型 */ @@ -24,4 +28,8 @@ public class WsRequestVO { private String message; private WebSocketSceneEnum sceneEnum; + + public WsRequestEvent(Object source) { + super(source); + } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsSendEvent.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsSendEvent.java new file mode 100644 index 000000000..619029359 --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsSendEvent.java @@ -0,0 +1,32 @@ +package com.aizuda.snailjob.server.web.model.event; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-01-18 + */ +@Setter +@Getter +public class WsSendEvent extends ApplicationEvent { + + /** + * 会话id + */ + private String sid; + + /** + * 需要发送的消息 + */ + private String message; + + public WsSendEvent(Object source) { + super(source); + } +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java index 0be6e6a18..be60b5b15 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java @@ -11,4 +11,7 @@ import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; public interface JobLogService { JobLogResponseVO getJobLogPage(JobLogQueryVO jobQueryVO); + + void getJobLogPageV2(JobLogQueryVO jobQueryVO); + } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/LogMessagePartitionTaskConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/LogMessagePartitionTaskConverter.java new file mode 100644 index 000000000..74ab0ef18 --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/LogMessagePartitionTaskConverter.java @@ -0,0 +1,29 @@ +package com.aizuda.snailjob.server.web.service.convert; + +import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask; +import com.aizuda.snailjob.server.web.model.dto.LogMessagePartitionTask; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + *

+ * + *

+ * + * @author opensnail + * @date 2025-03-30 + */ +@Mapper +public interface LogMessagePartitionTaskConverter { + LogMessagePartitionTaskConverter INSTANCE = Mappers.getMapper(LogMessagePartitionTaskConverter.class); + + List toLogMessagePartitionTask(List retryTaskLogMessageList); + + + List toJobLogMessagePartitionTasks(List jobLogMessageDOS); +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java index c5516841f..9f388c4b5 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java @@ -1,23 +1,38 @@ package com.aizuda.snailjob.server.web.service.impl; import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.constant.LogFieldConstants; +import com.aizuda.snailjob.server.common.dto.PartitionTask; +import com.aizuda.snailjob.server.web.timer.JobTaskLogTimerTask; +import com.aizuda.snailjob.server.web.timer.LogTimerWheel; +import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; +import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask; +import com.aizuda.snailjob.server.web.model.event.WsSendEvent; import com.aizuda.snailjob.server.web.service.JobLogService; +import com.aizuda.snailjob.server.web.service.convert.LogMessagePartitionTaskConverter; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; +import java.time.Duration; import java.time.LocalDateTime; import java.util.*; +import java.util.function.Consumer; import java.util.stream.Collectors; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; @@ -30,8 +45,10 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL @Service @RequiredArgsConstructor public class JobLogServiceImpl implements JobLogService { + private static final Long DELAY_MILLS = 5000L; private final JobLogMessageMapper jobLogMessageMapper; private final JobTaskBatchMapper jobTaskBatchMapper; + private final AccessTemplate accessTemplate; @Override public JobLogResponseVO getJobLogPage(final JobLogQueryVO queryVO) { @@ -57,8 +74,8 @@ public class JobLogServiceImpl implements JobLogService { JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); if (Objects.isNull(jobTaskBatch) - || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && - jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now())) + || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && + jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now())) ) { jobLogResponseVO.setFinished(Boolean.TRUE); @@ -121,4 +138,85 @@ public class JobLogServiceImpl implements JobLogService { jobLogResponseVO.setFromIndex(fromIndex); return jobLogResponseVO; } + + @Override + public void getJobLogPageV2(JobLogQueryVO queryVO) { + String sid = queryVO.getSid(); + LogPageQueryDO pageQueryDO = new LogPageQueryDO(); + pageQueryDO.setPage(1); + pageQueryDO.setSize(queryVO.getSize()); + pageQueryDO.setTaskBatchId(queryVO.getTaskBatchId()); + pageQueryDO.setTaskId(queryVO.getTaskId()); + pageQueryDO.setStartId(queryVO.getStartId()); + PartitionTaskUtils.process(startId -> { + // 记录下次起始时间 + queryVO.setStartId(startId); + pageQueryDO.setStartId(startId); + // 拉去数据 + PageResponseDO pageResponseDO = accessTemplate.getJobLogMessageAccess() + .listPage(pageQueryDO); + List rows = pageResponseDO.getRows(); + return LogMessagePartitionTaskConverter.INSTANCE.toJobLogMessagePartitionTasks(rows); + }, new Consumer<>() { + @Override + public void accept(List partitionTasks) { + + List partitionTaskList = (List) partitionTasks; + + for (JobLogMessagePartitionTask logMessagePartitionTask : partitionTaskList) { + // 发生日志内容到前端 + String message = logMessagePartitionTask.getMessage(); + List> logContents = JsonUtil.parseObject(message, List.class); + logContents = logContents.stream() + .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) + .toList(); + for (Map logContent : logContents) { + // send发消息 + WsSendEvent sendEvent = new WsSendEvent(this); + sendEvent.setSid(sid); + sendEvent.setMessage(JsonUtil.toJsonString(logContent)); + SnailSpringContext.getContext().publishEvent(sendEvent); + } + } + + } + }, new Predicate<>() { + @Override + public boolean apply(List rows) { + + // 决策是否完成 + if (!CollUtil.isEmpty(rows)) { + return false; + } + + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne( + new LambdaQueryWrapper().eq(JobTaskBatch::getId, queryVO.getTaskBatchId())); + + if (Objects.isNull(jobTaskBatch) + || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && + jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))) { + // 发生完成标识 + WsSendEvent sendEvent = new WsSendEvent(this); + sendEvent.setMessage("END"); + sendEvent.setSid(sid); + SnailSpringContext.getContext().publishEvent(sendEvent); + return true; + } else { + scheduleNextAttempt(queryVO, sid); + return true; + } + } + }, queryVO.getStartId()); + + } + + /** + * 使用时间轮5秒再进行日志查询 + * + * @param queryVO + * @param sid + */ + private void scheduleNextAttempt(JobLogQueryVO queryVO, String sid) { + LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, sid), Duration.ofMillis(DELAY_MILLS)); + } } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/socket/LogServer.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/socket/LogServer.java index e059f84c7..1c2e91282 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/socket/LogServer.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/socket/LogServer.java @@ -1,13 +1,16 @@ package com.aizuda.snailjob.server.web.socket; +import cn.hutool.core.lang.Assert; import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; -import com.aizuda.snailjob.server.common.vo.WsRequestVO; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.web.model.event.WsSendEvent; +import com.aizuda.snailjob.server.web.model.event.WsRequestEvent; import com.aizuda.snailjob.server.web.config.WebSocketConfigurator; import jakarta.websocket.*; -import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import java.io.IOException; @@ -24,33 +27,59 @@ import java.util.concurrent.ConcurrentHashMap; */ @Slf4j @Component -@ServerEndpoint(value = "/webSocket", configurator = WebSocketConfigurator.class) +@ServerEndpoint(value = "/websocket", configurator = WebSocketConfigurator.class) public class LogServer { // 缓存session public static final ConcurrentHashMap USER_SESSION = new ConcurrentHashMap<>(); + @EventListener + public void sendMessage(WsSendEvent message) throws IOException { + Session session = USER_SESSION.get(message.getSid()); + Assert.notNull(session, () -> new SnailJobServerException("ws session not exist")); + if (session.isOpen()) { + synchronized (session) { + session.getBasicRemote().sendText(message.getMessage()); + } + } + + } @OnOpen public void onOpen(Session session) { Map userProperties = session.getUserProperties(); - USER_SESSION.put((String) userProperties.get(WebSocketConfigurator.SID), session); + String sid = (String) userProperties.get(WebSocketConfigurator.SID); + USER_SESSION.put(sid, session); + log.info("sid:[{}] websocket started", sid); } - @OnMessage - public void onMessage(String message, Session session) throws IOException, EncodeException { + public void onMessage(String message, Session session) { // 接收请求 Map userProperties = session.getUserProperties(); String sid = (String) userProperties.get(WebSocketConfigurator.SID); String scene = (String) userProperties.get(WebSocketConfigurator.SCENE); - WsRequestVO requestVO = new WsRequestVO(); + WsRequestEvent requestVO = new WsRequestEvent(this); requestVO.setSceneEnum(WebSocketSceneEnum.valueOf(scene)); requestVO.setMessage(message); requestVO.setSid(sid); SnailSpringContext.getContext().publishEvent(requestVO); } + + /** + * 连接关闭时触发 + */ + @OnClose + public void onClose(Session session) { + + Map userProperties = session.getUserProperties(); + String sid = (String) userProperties.get(WebSocketConfigurator.SID); + + log.info("sid:[{}] websocket closed", sid); + USER_SESSION.remove(sid); + } + //错误时调用 @OnError public void onError(Session session, Throwable throwable) { diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/JobTaskLogTimerTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java similarity index 71% rename from snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/JobTaskLogTimerTask.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java index 2072deb9a..2fe4239a7 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/JobTaskLogTimerTask.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/JobTaskLogTimerTask.java @@ -1,17 +1,17 @@ -package com.aizuda.snailjob.server.common.timer; +package com.aizuda.snailjob.server.web.timer; import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask; +import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import com.aizuda.snailjob.server.web.service.JobLogService; import io.netty.util.Timeout; -import jakarta.websocket.Session; import lombok.AllArgsConstructor; import java.text.MessageFormat; import java.time.LocalDateTime; -import java.util.Map; /** * @Author:srzou @@ -23,11 +23,9 @@ import java.util.Map; */ @AllArgsConstructor public class JobTaskLogTimerTask implements TimerTask { - private static final String SID = "sid"; - private static final String SCENE = "scene"; private static final String IDEMPOTENT_KEY_PREFIX = "jobTaskLog_{0}_{1}_{2}"; private JobLogQueryVO logQueryVO; - private Session session; + private String sid; @Override public void run(final Timeout timeout) throws Exception { @@ -35,8 +33,8 @@ public class JobTaskLogTimerTask implements TimerTask { try { LogTimerWheel.clearCache(idempotentKey()); - LogService logService = SnailSpringContext.getBean(LogService.class); - logService.getJobLogPage(logQueryVO, session); + JobLogService logService = SnailSpringContext.getBean(JobLogService.class); + logService.getJobLogPageV2(logQueryVO); } catch (Exception e) { SnailJobLog.LOCAL.error("定时任务日志查询执行失败", e); @@ -45,10 +43,8 @@ public class JobTaskLogTimerTask implements TimerTask { @Override public String idempotentKey() { - Map userProperties = session.getUserProperties(); - String sid = (String) userProperties.get(SID); - String scene = (String) userProperties.get(SCENE); + Long jobTaskId = logQueryVO.getTaskBatchId(); - return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, scene, jobTaskId); + return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, sid, WebSocketSceneEnum.JOB_LOG_SCENE, jobTaskId); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/LogTimerWheel.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/LogTimerWheel.java similarity index 98% rename from snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/LogTimerWheel.java rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/LogTimerWheel.java index f8518e19f..7e0d32197 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/timer/LogTimerWheel.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/timer/LogTimerWheel.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.common.timer; +package com.aizuda.snailjob.server.web.timer; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.TimerTask;