diff --git a/snail-job-datasource/snail-job-datasource-template/pom.xml b/snail-job-datasource/snail-job-datasource-template/pom.xml
index a51489f5b..36e8a440c 100644
--- a/snail-job-datasource/snail-job-datasource-template/pom.xml
+++ b/snail-job-datasource/snail-job-datasource-template/pom.xml
@@ -47,6 +47,15 @@
com.aizuda
snail-job-common-server-api
+
+ org.mapstruct
+ mapstruct
+
+
+ org.mapstruct
+ mapstruct-processor
+
+
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java
index 4f84f4983..0015fbdc4 100644
--- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/AccessTemplate.java
@@ -7,6 +7,8 @@ import com.aizuda.snailjob.template.datasource.access.task.RetryDeadLetterTaskAc
import com.aizuda.snailjob.template.datasource.access.task.RetryTaskAccess;
import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
import com.aizuda.snailjob.template.datasource.exception.SnailJobDatasourceException;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import org.springframework.stereotype.Component;
@@ -103,4 +105,27 @@ public class AccessTemplate {
}
+ /**
+ * 获取通知配置操作类
+ *
+ * @return {@link NotifyConfigAccess} 获取通知配置操作类
+ */
+ public LogAccess getRetryTaskLogMessageAccess() {
+ return (LogAccess) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.RETRY_LOG.name()))
+ .orElseThrow(() -> new SnailJobDatasourceException("not supports operation type"));
+
+ }
+
+
+ /**
+ * 获取通知配置操作类
+ *
+ * @return {@link NotifyConfigAccess} 获取通知配置操作类
+ */
+ public LogAccess getJobLogMessageAccess() {
+ return (LogAccess) Optional.ofNullable(REGISTER_ACCESS.get(OperationTypeEnum.JOB_LOG.name()))
+ .orElseThrow(() -> new SnailJobDatasourceException("not supports operation type"));
+
+ }
+
}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/LogAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/LogAccess.java
new file mode 100644
index 000000000..10ad36666
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/LogAccess.java
@@ -0,0 +1,39 @@
+package com.aizuda.snailjob.template.datasource.access;
+
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-29
+ */
+public interface LogAccess extends Access {
+
+ int insert(T t);
+
+ int insertBatch(List list);
+
+ PageResponseDO listPage(PageQueryDO queryDO);
+
+ List list(ListQueryDO queryDO);
+
+ T one(OneQueryDO query);
+
+ int update(T t, UpdateQueryDO query);
+
+ int updateById(T t);
+
+ int deleteById(Serializable id);
+
+ int delete(DeleteQueryDO query);
+
+ long count(LambdaQueryWrapper query);
+
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/AbstractLogAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/AbstractLogAccess.java
new file mode 100644
index 000000000..de1214c85
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/AbstractLogAccess.java
@@ -0,0 +1,14 @@
+package com.aizuda.snailjob.template.datasource.access.log;
+
+import com.aizuda.snailjob.template.datasource.access.LogAccess;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+public abstract class AbstractLogAccess implements LogAccess {
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java
new file mode 100644
index 000000000..d121e0183
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/JobLogMessageAccess.java
@@ -0,0 +1,114 @@
+package com.aizuda.snailjob.template.datasource.access.log;
+
+import com.aizuda.snailjob.template.datasource.access.LogAccess;
+import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
+import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO;
+import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
+import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
+import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+import java.util.*;
+
+import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@Component
+@Slf4j
+//@ConditionalOnMissingBean(LogAccess.class)
+@RequiredArgsConstructor
+public class JobLogMessageAccess extends AbstractLogAccess {
+ private final JobLogMessageMapper jobLogMessageMapper;
+ private final JobTaskBatchMapper jobTaskBatchMapper;
+
+ @Override
+ public boolean supports(String operationType) {
+ return DbTypeEnum.all().contains(getDbType()) && OperationTypeEnum.JOB_LOG.name().equals(operationType);
+ }
+
+ @Override
+ public int insert(JobLogMessageDO jobLogMessageDO) {
+ JobLogMessage jobLogMessage = LogConverter.INSTANCE.toJobLogMessage(jobLogMessageDO);
+ return jobLogMessageMapper.insert(jobLogMessage);
+ }
+
+ @Override
+ public int insertBatch(List list) {
+ List jobLogMessages = LogConverter.INSTANCE.toJobLogMessages(list);
+ return jobLogMessageMapper.insertBatch(jobLogMessages);
+ }
+
+ @Override
+ public PageResponseDO listPage(PageQueryDO queryDO) {
+ LogPageQueryDO logPageQueryDO = (LogPageQueryDO) queryDO;
+
+ PageDTO selectPage = jobLogMessageMapper.selectPage(
+ new PageDTO<>(queryDO.getPage(), logPageQueryDO.getSize()),
+ new LambdaQueryWrapper()
+ .ge(JobLogMessage::getId, logPageQueryDO.getStartId())
+ .eq(JobLogMessage::getTaskBatchId, logPageQueryDO.getTaskBatchId())
+ .eq(JobLogMessage::getTaskId, logPageQueryDO.getTaskId())
+ .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
+ List records = selectPage.getRecords();
+
+ PageResponseDO responseDO = new PageResponseDO<>();
+ responseDO.setPage(selectPage.getCurrent());
+ responseDO.setSize(selectPage.getSize());
+ responseDO.setTotal(selectPage.getTotal());
+ responseDO.setRows(LogConverter.INSTANCE.toJobLogMessageDOList(records));
+ return responseDO;
+ }
+
+ @Override
+ public List list(ListQueryDO queryDO) {
+ return List.of();
+ }
+
+ @Override
+ public JobLogMessageDO one(OneQueryDO query) {
+ return null;
+ }
+
+ @Override
+ public int update(JobLogMessageDO jobLogMessageDO, UpdateQueryDO query) {
+ return 0;
+ }
+
+ @Override
+ public int updateById(JobLogMessageDO jobLogMessageDO) {
+ JobLogMessage jobLogMessage = LogConverter.INSTANCE.toJobLogMessage(jobLogMessageDO);
+ return jobLogMessageMapper.updateById(jobLogMessage);
+ }
+
+ @Override
+ public int deleteById(Serializable id) {
+ return jobLogMessageMapper.deleteById(id);
+ }
+
+ @Override
+ public int delete(DeleteQueryDO query) {
+ return 0;
+ }
+
+ @Override
+ public long count(LambdaQueryWrapper query) {
+ return 0;
+ }
+
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java
new file mode 100644
index 000000000..922d85eb5
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/LogConverter.java
@@ -0,0 +1,31 @@
+package com.aizuda.snailjob.template.datasource.access.log;
+
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
+import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
+import org.mapstruct.Mapper;
+import org.mapstruct.factory.Mappers;
+
+import java.util.List;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@Mapper
+public interface LogConverter {
+
+ LogConverter INSTANCE = Mappers.getMapper(LogConverter.class);
+
+ JobLogMessage toJobLogMessage(JobLogMessageDO logMessage);
+
+ List toJobLogMessages(List logMessages);
+
+ JobLogMessageDO toJobLogMessageDO(JobLogMessage logMessage);
+
+ List toJobLogMessageDOList(List logMessages);
+
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java
new file mode 100644
index 000000000..457091a7e
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/log/RetryTaskLogMessageAccess.java
@@ -0,0 +1,83 @@
+package com.aizuda.snailjob.template.datasource.access.log;
+
+import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
+import com.aizuda.snailjob.template.datasource.enums.OperationTypeEnum;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.*;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import org.springframework.stereotype.Component;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static com.aizuda.snailjob.template.datasource.utils.DbUtils.getDbType;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-29
+ */
+@Component
+public class RetryTaskLogMessageAccess extends AbstractLogAccess {
+
+
+ @Override
+ public boolean supports(String operationType) {
+ return DbTypeEnum.all().contains(getDbType()) && OperationTypeEnum.RETRY_LOG.name().equals(operationType);
+
+ }
+ @Override
+ public int insert(RetryTaskLogMessageDO retryTaskLogMessageDO) {
+ return 0;
+ }
+
+ @Override
+ public int insertBatch(List list) {
+ return 0;
+ }
+
+ @Override
+ public PageResponseDO listPage(PageQueryDO queryDO) {
+ return null;
+ }
+
+ @Override
+ public List list(ListQueryDO queryDO) {
+ return List.of();
+ }
+
+ @Override
+ public RetryTaskLogMessageDO one(OneQueryDO query) {
+ return null;
+ }
+
+ @Override
+ public int update(RetryTaskLogMessageDO retryTaskLogMessageDO, UpdateQueryDO query) {
+ return 0;
+ }
+
+ @Override
+ public int updateById(RetryTaskLogMessageDO retryTaskLogMessageDO) {
+ return 0;
+ }
+
+ @Override
+ public int deleteById(Serializable id) {
+ return 0;
+ }
+
+ @Override
+ public int delete(DeleteQueryDO query) {
+ return 0;
+ }
+
+ @Override
+ public long count(LambdaQueryWrapper query) {
+ return 0;
+ }
+
+
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java
index eaa8b09e0..eaa9074b2 100644
--- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/DbTypeEnum.java
@@ -5,6 +5,9 @@ import com.baomidou.mybatisplus.annotation.DbType;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import java.util.Arrays;
+import java.util.List;
+
/**
* DB数据库类型
*
@@ -37,4 +40,8 @@ public enum DbTypeEnum {
throw new SnailJobDatasourceException("暂不支持此数据库 [{}]", db);
}
+
+ public static List all() {
+ return Arrays.asList(DbTypeEnum.values());
+ }
}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java
index c6e53532a..43197b755 100644
--- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/enums/OperationTypeEnum.java
@@ -13,5 +13,7 @@ public enum OperationTypeEnum {
GROUP,
RETRY_TASK,
RETRY,
- RETRY_DEAD_LETTER
+ RETRY_DEAD_LETTER,
+ RETRY_LOG,
+ JOB_LOG,
}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/CountQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/CountQueryDO.java
new file mode 100644
index 000000000..ae7c26fa7
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/CountQueryDO.java
@@ -0,0 +1,12 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+public class CountQueryDO {
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/DeleteQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/DeleteQueryDO.java
new file mode 100644
index 000000000..683fe2e36
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/DeleteQueryDO.java
@@ -0,0 +1,12 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+public class DeleteQueryDO {
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/ListQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/ListQueryDO.java
new file mode 100644
index 000000000..7d091a3b9
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/ListQueryDO.java
@@ -0,0 +1,12 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+public class ListQueryDO {
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/OneQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/OneQueryDO.java
new file mode 100644
index 000000000..a2a9ab6ee
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/OneQueryDO.java
@@ -0,0 +1,12 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+public class OneQueryDO {
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageQueryDO.java
new file mode 100644
index 000000000..ef34442df
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageQueryDO.java
@@ -0,0 +1,25 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
+
+import lombok.Data;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@Data
+public class PageQueryDO {
+
+ /**
+ * 当前页码
+ */
+ private int page = 1;
+
+ /**
+ * 每页条数
+ */
+ private int size = 10;
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageResponseDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageResponseDO.java
new file mode 100644
index 000000000..e57ea6d65
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/PageResponseDO.java
@@ -0,0 +1,23 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@Data
+public class PageResponseDO {
+
+ private List rows;
+
+ private long total;
+ private long page;
+ private long size;
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/UpdateQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/UpdateQueryDO.java
new file mode 100644
index 000000000..b41594373
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/common/UpdateQueryDO.java
@@ -0,0 +1,12 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.common;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+public class UpdateQueryDO {
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/JobLogMessageDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/JobLogMessageDO.java
new file mode 100644
index 000000000..f62775c16
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/JobLogMessageDO.java
@@ -0,0 +1,58 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.log;
+
+import lombok.Data;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@Data
+public class JobLogMessageDO {
+
+ private Long id;
+
+ /**
+ * 命名空间
+ */
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 任务信息id
+ */
+ private Long jobId;
+
+ /**
+ * 任务实例id
+ */
+ private Long taskBatchId;
+
+ /**
+ * 调度任务id
+ */
+ private Long taskId;
+
+ /**
+ * 日志数量
+ */
+ private Integer logNum;
+
+ /**
+ * 调度信息
+ */
+ private String message;
+
+ /**
+ * 真实上报时间
+ */
+ private Long realTime;
+
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java
new file mode 100644
index 000000000..5038bae1e
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/LogPageQueryDO.java
@@ -0,0 +1,23 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.log;
+
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageQueryDO;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-29
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class LogPageQueryDO extends PageQueryDO {
+ private Long startId;
+ private Long jobId;
+ private Long taskBatchId;
+ private Long taskId;
+ private Integer fromIndex;
+}
diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageDO.java
new file mode 100644
index 000000000..4f36341c2
--- /dev/null
+++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/log/RetryTaskLogMessageDO.java
@@ -0,0 +1,67 @@
+package com.aizuda.snailjob.template.datasource.persistence.dataobject.log;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-29
+ */
+@Data
+public class RetryTaskLogMessageDO {
+
+ /**
+ * 主键
+ */
+ private Long id;
+
+ /**
+ * 命名空间
+ */
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 重试任务id
+ */
+ private Long retryTaskId;
+
+ /**
+ * 重试信息Id
+ */
+ private Long retryId;
+
+ /**
+ * 异常信息
+ */
+ private String message;
+
+ /**
+ * 日志数量
+ */
+ private Integer logNum;
+
+ /**
+ * 真实上报时间
+ */
+ private Long realTime;
+
+ /**
+ * 创建时间
+ */
+ private LocalDateTime createDt;
+
+ /**
+ * 创建时间
+ */
+ private LocalDateTime updateDt;
+}
diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java
index 720a4c8c6..fac3b0a9a 100644
--- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java
+++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java
@@ -3,7 +3,6 @@ package com.aizuda.snailjob.server.common.service;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
-import jakarta.websocket.Session;
import java.io.IOException;
import java.util.List;
@@ -19,5 +18,5 @@ import java.util.List;
public interface LogService {
void saveLog(JobLogDTO jobLogDTO);
void batchSaveLogs(List jobLogTasks);
- void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException;
+ void getJobLogPage(JobLogQueryVO queryVO, String sid) throws IOException;
}
diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java
index 6a08dde84..54abe67bf 100644
--- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java
+++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java
@@ -9,8 +9,6 @@ import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO;
import com.aizuda.snailjob.server.common.dto.JobLogDTO;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter;
-import com.aizuda.snailjob.server.common.timer.JobTaskLogTimerTask;
-import com.aizuda.snailjob.server.common.timer.LogTimerWheel;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO;
@@ -21,13 +19,9 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
-import jakarta.websocket.Session;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.transaction.support.TransactionSynchronization;
-import org.springframework.transaction.support.TransactionSynchronizationManager;
-import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
@@ -93,8 +87,8 @@ public class DatabaseLogService implements LogService {
}
@Override
- public void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException {
- Boolean taskBatchComplete = false;
+ public void getJobLogPage(JobLogQueryVO queryVO, String sid) {
+ boolean taskBatchComplete = false;
while (!taskBatchComplete) {
PageDTO pageDTO = new PageDTO<>(1, queryVO.getSize());
@@ -104,7 +98,8 @@ public class DatabaseLogService implements LogService {
.ge(JobLogMessage::getId, queryVO.getStartId())
.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId())
- .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime));
+ .orderByAsc(JobLogMessage::getId)
+ .orderByAsc(JobLogMessage::getRealTime));
List records = selectPage.getRecords();
if (CollUtil.isEmpty(records)) {
@@ -122,10 +117,9 @@ public class DatabaseLogService implements LogService {
jobLogResponseVO.setFinished(Boolean.TRUE);
jobLogResponseVO.setNextStartId(queryVO.getStartId());
jobLogResponseVO.setFromIndex(0);
- session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
return;
} else {
- scheduleNextAttempt(queryVO, session);
+// scheduleNextAttempt(queryVO, sid);
return;
}
}
@@ -180,7 +174,7 @@ public class DatabaseLogService implements LogService {
jobLogResponseVO.setMessage(messages);
jobLogResponseVO.setNextStartId(nextStartId);
jobLogResponseVO.setFromIndex(fromIndex);
- session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
+// session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO));
queryVO.setFromIndex(fromIndex);
queryVO.setStartId(nextStartId);
@@ -188,22 +182,4 @@ public class DatabaseLogService implements LogService {
}
- /**
- * 使用时间轮5秒再进行日志查询
- *
- * @param queryVO
- * @param session
- */
- private void scheduleNextAttempt(JobLogQueryVO queryVO, Session session) {
- if (TransactionSynchronizationManager.isActualTransactionActive()) {
- TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
- @Override
- public void afterCompletion(int status) {
- LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS));
- }
- });
- } else {
- LogTimerWheel.registerWithJobTaskLog(() -> new JobTaskLogTimerTask(queryVO, session), Duration.ofMillis(DELAY_MILLS));
- }
- }
}
diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java
index 3e2bae672..af7376309 100644
--- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java
+++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java
@@ -17,4 +17,5 @@ public class JobLogQueryVO extends BaseQueryVO {
private Long taskBatchId;
private Long taskId;
private Integer fromIndex;
+ private String sid;
}
diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java
index 4ef1b3cfe..bc6825483 100644
--- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java
@@ -4,9 +4,11 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import com.aizuda.snailjob.server.common.service.LogService;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
-import com.aizuda.snailjob.server.common.vo.WsRequestVO;
+import com.aizuda.snailjob.server.web.model.event.WsRequestEvent;
+import com.aizuda.snailjob.server.web.service.JobLogService;
import jakarta.websocket.Session;
import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@@ -25,23 +27,28 @@ import static com.aizuda.snailjob.server.web.socket.LogServer.USER_SESSION;
*/
@Component
@AllArgsConstructor
+@Slf4j
public class WsRequestListener {
- private final LogService logService;
+ private final JobLogService jobLogService;
@Async
- @EventListener(classes = WsRequestVO.class)
- public void getJobLogs(WsRequestVO requestVO) {
+ @EventListener(classes = WsRequestEvent.class)
+ public void getJobLogs(WsRequestEvent requestVO) {
if (!WebSocketSceneEnum.JOB_LOG_SCENE.equals(requestVO.getSceneEnum())) {
return;
}
+
+ log.info("getJobLogs {}", requestVO.getSid());
String message = requestVO.getMessage();
JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class);
- Session session = USER_SESSION.get(requestVO.getSid());
+ jobLogQueryVO.setSid(requestVO.getSid());
+ jobLogQueryVO.setStartId(0L);
try {
- logService.getJobLogPage(jobLogQueryVO, session);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ jobLogService.getJobLogPageV2(jobLogQueryVO);
+ } catch (Exception e) {
+ log.warn("send log error", e);
}
+
}
}
diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/JobLogMessagePartitionTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/JobLogMessagePartitionTask.java
new file mode 100644
index 000000000..fc204eb28
--- /dev/null
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/JobLogMessagePartitionTask.java
@@ -0,0 +1,70 @@
+package com.aizuda.snailjob.server.web.model.dto;
+
+import com.aizuda.snailjob.server.common.dto.PartitionTask;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.time.LocalDateTime;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class JobLogMessagePartitionTask extends PartitionTask {
+
+ /**
+ * 命名空间
+ */
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 任务信息id
+ */
+ private Long jobId;
+
+ /**
+ * 任务实例id
+ */
+ private Long taskBatchId;
+
+ /**
+ * 调度任务id
+ */
+ private Long taskId;
+
+ /**
+ * 日志数量
+ */
+ private Integer logNum;
+
+ /**
+ * 调度信息
+ */
+ private String message;
+
+ /**
+ * 真实上报时间
+ */
+ private Long realTime;
+
+ /**
+ * 创建时间
+ */
+ private LocalDateTime createDt;
+
+ /**
+ * 创建时间
+ */
+ private LocalDateTime updateDt;
+}
diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/LogMessagePartitionTask.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/LogMessagePartitionTask.java
new file mode 100644
index 000000000..367b0922f
--- /dev/null
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/dto/LogMessagePartitionTask.java
@@ -0,0 +1,65 @@
+package com.aizuda.snailjob.server.web.model.dto;
+
+import com.aizuda.snailjob.server.common.dto.PartitionTask;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.time.LocalDateTime;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class LogMessagePartitionTask extends PartitionTask {
+
+ /**
+ * 命名空间
+ */
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 重试任务id
+ */
+ private Long retryTaskId;
+
+ /**
+ * 重试信息Id
+ */
+ private Long retryId;
+
+ /**
+ * 异常信息
+ */
+ private String message;
+
+ /**
+ * 日志数量
+ */
+ private Integer logNum;
+
+ /**
+ * 真实上报时间
+ */
+ private Long realTime;
+
+ /**
+ * 创建时间
+ */
+ private LocalDateTime createDt;
+
+ /**
+ * 创建时间
+ */
+ private LocalDateTime updateDt;
+}
diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsRequestEvent.java
similarity index 59%
rename from snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java
rename to snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsRequestEvent.java
index 61830457f..807e9f21c 100644
--- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsRequestEvent.java
@@ -1,7 +1,10 @@
-package com.aizuda.snailjob.server.common.vo;
+package com.aizuda.snailjob.server.web.model.event;
import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum;
import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.context.ApplicationEvent;
/**
* @Author:srzou
@@ -11,8 +14,9 @@ import lombok.Data;
* @Filename:WebSocketRequestVO
* @since 1.5.0
*/
-@Data
-public class WsRequestVO {
+@Setter
+@Getter
+public class WsRequestEvent extends ApplicationEvent {
/**
* wb类型
*/
@@ -24,4 +28,8 @@ public class WsRequestVO {
private String message;
private WebSocketSceneEnum sceneEnum;
+
+ public WsRequestEvent(Object source) {
+ super(source);
+ }
}
diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsSendEvent.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsSendEvent.java
new file mode 100644
index 000000000..619029359
--- /dev/null
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/event/WsSendEvent.java
@@ -0,0 +1,32 @@
+package com.aizuda.snailjob.server.web.model.event;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-01-18
+ */
+@Setter
+@Getter
+public class WsSendEvent extends ApplicationEvent {
+
+ /**
+ * 会话id
+ */
+ private String sid;
+
+ /**
+ * 需要发送的消息
+ */
+ private String message;
+
+ public WsSendEvent(Object source) {
+ super(source);
+ }
+}
diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java
index 0be6e6a18..be60b5b15 100644
--- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java
@@ -11,4 +11,7 @@ import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
public interface JobLogService {
JobLogResponseVO getJobLogPage(JobLogQueryVO jobQueryVO);
+
+ void getJobLogPageV2(JobLogQueryVO jobQueryVO);
+
}
diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/LogMessagePartitionTaskConverter.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/LogMessagePartitionTaskConverter.java
new file mode 100644
index 000000000..74ab0ef18
--- /dev/null
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/convert/LogMessagePartitionTaskConverter.java
@@ -0,0 +1,29 @@
+package com.aizuda.snailjob.server.web.service.convert;
+
+import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask;
+import com.aizuda.snailjob.server.web.model.dto.LogMessagePartitionTask;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.RetryTaskLogMessageDO;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.mapstruct.factory.Mappers;
+
+import java.util.List;
+
+/**
+ *
+ *
+ *
+ *
+ * @author opensnail
+ * @date 2025-03-30
+ */
+@Mapper
+public interface LogMessagePartitionTaskConverter {
+ LogMessagePartitionTaskConverter INSTANCE = Mappers.getMapper(LogMessagePartitionTaskConverter.class);
+
+ List toLogMessagePartitionTask(List retryTaskLogMessageList);
+
+
+ List toJobLogMessagePartitionTasks(List jobLogMessageDOS);
+}
diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java
index c5516841f..9f388c4b5 100644
--- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java
+++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java
@@ -1,23 +1,38 @@
package com.aizuda.snailjob.server.web.service.impl;
import cn.hutool.core.collection.CollUtil;
+import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.constant.LogFieldConstants;
+import com.aizuda.snailjob.server.common.dto.PartitionTask;
+import com.aizuda.snailjob.server.web.timer.JobTaskLogTimerTask;
+import com.aizuda.snailjob.server.web.timer.LogTimerWheel;
+import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.common.vo.JobLogQueryVO;
import com.aizuda.snailjob.server.common.vo.JobLogResponseVO;
+import com.aizuda.snailjob.server.web.model.dto.JobLogMessagePartitionTask;
+import com.aizuda.snailjob.server.web.model.event.WsSendEvent;
import com.aizuda.snailjob.server.web.service.JobLogService;
+import com.aizuda.snailjob.server.web.service.convert.LogMessagePartitionTaskConverter;
+import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.common.PageResponseDO;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.JobLogMessageDO;
+import com.aizuda.snailjob.template.datasource.persistence.dataobject.log.LogPageQueryDO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
+import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
+import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
@@ -30,8 +45,10 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPL
@Service
@RequiredArgsConstructor
public class JobLogServiceImpl implements JobLogService {
+ private static final Long DELAY_MILLS = 5000L;
private final JobLogMessageMapper jobLogMessageMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
+ private final AccessTemplate accessTemplate;
@Override
public JobLogResponseVO getJobLogPage(final JobLogQueryVO queryVO) {
@@ -57,8 +74,8 @@ public class JobLogServiceImpl implements JobLogService {
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
if (Objects.isNull(jobTaskBatch)
- || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
- jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))
+ || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) &&
+ jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()))
) {
jobLogResponseVO.setFinished(Boolean.TRUE);
@@ -121,4 +138,85 @@ public class JobLogServiceImpl implements JobLogService {
jobLogResponseVO.setFromIndex(fromIndex);
return jobLogResponseVO;
}
+
+ @Override
+ public void getJobLogPageV2(JobLogQueryVO queryVO) {
+ String sid = queryVO.getSid();
+ LogPageQueryDO pageQueryDO = new LogPageQueryDO();
+ pageQueryDO.setPage(1);
+ pageQueryDO.setSize(queryVO.getSize());
+ pageQueryDO.setTaskBatchId(queryVO.getTaskBatchId());
+ pageQueryDO.setTaskId(queryVO.getTaskId());
+ pageQueryDO.setStartId(queryVO.getStartId());
+ PartitionTaskUtils.process(startId -> {
+ // 记录下次起始时间
+ queryVO.setStartId(startId);
+ pageQueryDO.setStartId(startId);
+ // 拉去数据
+ PageResponseDO pageResponseDO = accessTemplate.getJobLogMessageAccess()
+ .listPage(pageQueryDO);
+ List rows = pageResponseDO.getRows();
+ return LogMessagePartitionTaskConverter.INSTANCE.toJobLogMessagePartitionTasks(rows);
+ }, new Consumer<>() {
+ @Override
+ public void accept(List extends PartitionTask> partitionTasks) {
+
+ List partitionTaskList = (List) partitionTasks;
+
+ for (JobLogMessagePartitionTask logMessagePartitionTask : partitionTaskList) {
+ // 发生日志内容到前端
+ String message = logMessagePartitionTask.getMessage();
+ List