diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java index b96402313..6704bc6d0 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SnailJobServerCommonAutoConfiguration.java @@ -1,5 +1,10 @@ package com.aizuda.snailjob.server.common.config; +import com.aizuda.snailjob.server.common.service.LogService; +import com.aizuda.snailjob.server.common.service.impl.DatabaseLogService; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -32,4 +37,10 @@ public class SnailJobServerCommonAutoConfiguration { scheduler.setThreadNamePrefix("snail-job-alarm-thread-"); return scheduler; } + + @Bean + @ConditionalOnMissingBean(LogService.class) + public LogService logService(JobLogMessageMapper jobLogMessageMapper, JobTaskBatchMapper jobTaskBatchMapper){ + return new DatabaseLogService(jobLogMessageMapper, jobTaskBatchMapper); + } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobLogMessageConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobLogMessageConverter.java new file mode 100644 index 000000000..ec08e6c14 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/JobLogMessageConverter.java @@ -0,0 +1,25 @@ +package com.aizuda.snailjob.server.common.convert; + +import com.aizuda.snailjob.server.common.dto.JobLogDTO; +import com.aizuda.snailjob.server.model.dto.LogTaskDTO; +import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.common.support + * @Project:snail-job + * @Date:2025/3/10 21:16 + * @Filename:JobLogMessageConverter + * @since 1.5.0 + */ +@Mapper +public interface JobLogMessageConverter { + + JobLogMessageConverter INSTANCE = Mappers.getMapper(JobLogMessageConverter.class); + + JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO); + + JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO); +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobLogDTO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobLogDTO.java similarity index 92% rename from snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobLogDTO.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobLogDTO.java index f2aaaf0b2..e482df301 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobLogDTO.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/JobLogDTO.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.job.task.dto; +package com.aizuda.snailjob.server.common.dto; import lombok.Data; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/WebSocketSceneEnum.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/WebSocketSceneEnum.java new file mode 100644 index 000000000..660e281bf --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/enums/WebSocketSceneEnum.java @@ -0,0 +1,28 @@ +package com.aizuda.snailjob.server.common.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @since 1.5.0 + */ +@Getter +@AllArgsConstructor +public enum WebSocketSceneEnum { + JOB_LOG_SCENE(1, "JOB_LOG_SCENE"), + WORKFLOW_LOG_SCENE(2, "WORKFLOW_LOG_SCENE"), + RETRY_LOG_SCENE(3,"RETRY_LOG_SCENE"); + + private final Integer type; + private final String scene; + +// public static WebSocketSceneEnum valueOf(String scene) { +// for (WebSocketSceneEnum expressionTypeEnum : WebSocketSceneEnum.values()) { +// if (expressionTypeEnum.getScene().equals(scene)) { +// return expressionTypeEnum; +// } +// } +// +// return null; +// } +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java new file mode 100644 index 000000000..720a4c8c6 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/LogService.java @@ -0,0 +1,23 @@ +package com.aizuda.snailjob.server.common.service; + +import com.aizuda.snailjob.server.common.dto.JobLogDTO; +import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; +import jakarta.websocket.Session; + +import java.io.IOException; +import java.util.List; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.common.service + * @Project:snail-job + * @Date:2025/3/10 20:57 + * @Filename:LogService + * @since 1.5.0 + */ +public interface LogService { + void saveLog(JobLogDTO jobLogDTO); + void batchSaveLogs(List jobLogTasks); + void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException; +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java new file mode 100644 index 000000000..8639d9941 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/service/impl/DatabaseLogService.java @@ -0,0 +1,205 @@ +package com.aizuda.snailjob.server.common.service.impl; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; +import com.aizuda.snailjob.common.log.constant.LogFieldConstants; +import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO; +import com.aizuda.snailjob.server.common.dto.JobLogDTO; +import com.aizuda.snailjob.server.common.service.LogService; +import com.aizuda.snailjob.server.common.convert.JobLogMessageConverter; +import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; +import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.collect.Lists; +import jakarta.websocket.Session; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.common.service.impl + * @Project:snail-job + * @Date:2025/3/10 21:12 + * @Filename:DatabaseLogService + * @since 1.5.0 + */ + +@Slf4j +@RequiredArgsConstructor +public class DatabaseLogService implements LogService { + private final JobLogMessageMapper jobLogMessageMapper; + private final JobTaskBatchMapper jobTaskBatchMapper; + // 创建一个调度线程池 + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + /** + * 保存单调日志 + * + * @param jobLogDTO + */ + @Override + public void saveLog(JobLogDTO jobLogDTO) { + JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(jobLogDTO); + jobLogMessage.setCreateDt(LocalDateTime.now()); + jobLogMessage.setLogNum(1); + jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY)); + jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L)); + jobLogMessageMapper.insert(jobLogMessage); + } + + /** + * 批量保存日志 + * + * @param jobLogTasks + */ + @Override + public void batchSaveLogs(List jobLogTasks) { + Map> logTaskDTOMap = jobLogTasks. + stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList())); + + List jobLogMessageList = new ArrayList<>(); + for (List logTaskDTOList : logTaskDTOMap.values()) { + JobLogMessage jobLogMessage = JobLogMessageConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0)); + jobLogMessage.setCreateDt(LocalDateTime.now()); + jobLogMessage.setLogNum(logTaskDTOList.size()); + List> messageMapList = StreamUtils.toList(logTaskDTOList, + taskDTO -> taskDTO.getFieldList() + .stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue())) + .collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))); + jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList)); + + jobLogMessageList.add(jobLogMessage); + } + + jobLogMessageMapper.insertBatch(jobLogMessageList); + } + + @Override + public void getJobLogPage(JobLogQueryVO queryVO, Session session) throws IOException { + Boolean taskBatchComplete = false; + while (!taskBatchComplete){ + PageDTO pageDTO = new PageDTO<>(1, queryVO.getSize()); + + PageDTO selectPage = jobLogMessageMapper.selectPage(pageDTO, + new LambdaQueryWrapper() + .select(JobLogMessage::getId, JobLogMessage::getLogNum) + .ge(JobLogMessage::getId, queryVO.getStartId()) + .eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()) + .eq(JobLogMessage::getTaskId, queryVO.getTaskId()) + .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime)); + List records = selectPage.getRecords(); + if (CollUtil.isEmpty(records)) { + + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .eq(JobTaskBatch::getId, queryVO.getTaskBatchId()) + ); + + JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); + + if (Objects.isNull(jobTaskBatch) + || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && + jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now())) + ) { + jobLogResponseVO.setFinished(Boolean.TRUE); + jobLogResponseVO.setNextStartId(queryVO.getStartId()); + jobLogResponseVO.setFromIndex(0); + session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO)); + System.out.println("结束了"); + return; + }else { + // 如果没有完成,就等五秒执行 + System.out.println("异步执行"); + scheduleNextAttempt(queryVO, session); + return; + } + } + + Integer fromIndex = Optional.ofNullable(queryVO.getFromIndex()).orElse(0); + JobLogMessage firstRecord = records.get(0); + List ids = Lists.newArrayList(firstRecord.getId()); + int total = firstRecord.getLogNum() - fromIndex; + for (int i = 1; i < records.size(); i++) { + JobLogMessage record = records.get(i); + if (total + record.getLogNum() > queryVO.getSize()) { + break; + } + + total += record.getLogNum(); + ids.add(record.getId()); + } + + long nextStartId = 0; + List> messages = Lists.newArrayList(); + List jobLogMessages = jobLogMessageMapper.selectList( + new LambdaQueryWrapper() + .in(JobLogMessage::getId, ids) + .orderByAsc(JobLogMessage::getId) + .orderByAsc(JobLogMessage::getRealTime) + ); + + for (final JobLogMessage jobLogMessage : jobLogMessages) { + + List> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class); + int size = originalList.size() - fromIndex; + List> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) + .collect(Collectors.toList()); + + if (messages.size() + size >= queryVO.getSize()) { + messages.addAll(pageList); + nextStartId = jobLogMessage.getId(); + fromIndex = Math.min(fromIndex + queryVO.getSize(), originalList.size() - 1) + 1; + break; + } + + messages.addAll(pageList); + nextStartId = jobLogMessage.getId() + 1; + fromIndex = 0; + } + + messages = messages.stream() + .sorted(Comparator.comparingLong(o -> Long.parseLong(o.get(LogFieldConstants.TIME_STAMP)))) + .collect(Collectors.toList()); + + JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); + jobLogResponseVO.setMessage(messages); + jobLogResponseVO.setNextStartId(nextStartId); + jobLogResponseVO.setFromIndex(fromIndex); + session.getBasicRemote().sendText(JsonUtil.toJsonString(jobLogResponseVO)); + + queryVO.setFromIndex(fromIndex); + queryVO.setStartId(nextStartId); + } + + } + + private void scheduleNextAttempt(JobLogQueryVO queryVO, Session session) { + scheduler.schedule(() -> { + try { + // 再次调用查询 + getJobLogPage(queryVO, session); + } catch (IOException e) { + e.printStackTrace(); + } + // 5秒后执行 + }, 5, TimeUnit.SECONDS); + } +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobLogQueryVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java similarity index 75% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobLogQueryVO.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java index a6b53160b..3e2bae672 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobLogQueryVO.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogQueryVO.java @@ -1,6 +1,6 @@ -package com.aizuda.snailjob.server.web.model.request; +package com.aizuda.snailjob.server.common.vo; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/JobLogResponseVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogResponseVO.java similarity index 84% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/JobLogResponseVO.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogResponseVO.java index f1b9148af..28571faa6 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/response/JobLogResponseVO.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/JobLogResponseVO.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.web.model.response; +package com.aizuda.snailjob.server.common.vo; import lombok.Data; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java new file mode 100644 index 000000000..61830457f --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/WsRequestVO.java @@ -0,0 +1,27 @@ +package com.aizuda.snailjob.server.common.vo; + +import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; +import lombok.Data; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.web.model.request + * @Project:snail-job + * @Date:2025/3/5 16:54 + * @Filename:WebSocketRequestVO + * @since 1.5.0 + */ +@Data +public class WsRequestVO { + /** + * wb类型 + */ + private String sid; + + /** + * context + */ + private String message; + + private WebSocketSceneEnum sceneEnum; +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/base/BaseQueryVO.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/base/BaseQueryVO.java similarity index 94% rename from snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/base/BaseQueryVO.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/base/BaseQueryVO.java index 9f6f96670..39a5ea9cf 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/base/BaseQueryVO.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/vo/base/BaseQueryVO.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.web.model.base; +package com.aizuda.snailjob.server.common.vo.base; import cn.hutool.core.util.ObjUtil; import lombok.Data; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java index ab79d718d..e151fc10d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java @@ -4,6 +4,7 @@ import com.aizuda.snailjob.client.model.request.DispatchJobRequest; import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest; import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; +import com.aizuda.snailjob.server.common.dto.JobLogDTO; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.job.task.dto.*; import com.aizuda.snailjob.server.job.task.support.block.job.BlockStrategyContext; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java index 03dbbce7b..15312744e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobLogActor.java @@ -1,26 +1,18 @@ package com.aizuda.snailjob.server.job.task.support.dispatch; +import com.aizuda.snailjob.server.common.dto.JobLogDTO; import org.apache.pekko.actor.AbstractActor; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.common.log.dto.TaskLogFieldDTO; import com.aizuda.snailjob.server.common.pekko.ActorGenerator; -import com.aizuda.snailjob.server.job.task.dto.JobLogDTO; -import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.common.service.LogService; import com.aizuda.snailjob.server.model.dto.JobLogTaskDTO; -import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; import java.util.*; -import java.util.stream.Collectors; /** * @author opensnail @@ -32,7 +24,7 @@ import java.util.stream.Collectors; @Slf4j @RequiredArgsConstructor public class JobLogActor extends AbstractActor { - private final JobLogMessageMapper jobLogMessageMapper; + private final LogService logService; @Override public Receive createReceive() { @@ -44,24 +36,7 @@ public class JobLogActor extends AbstractActor { } List jobLogTasks = (List) list; - Map> logTaskDTOMap = jobLogTasks. - stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList())); - - List jobLogMessageList = new ArrayList<>(); - for (List logTaskDTOList : logTaskDTOMap.values()) { - JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0)); - jobLogMessage.setCreateDt(LocalDateTime.now()); - jobLogMessage.setLogNum(logTaskDTOList.size()); - List> messageMapList = StreamUtils.toList(logTaskDTOList, - taskDTO -> taskDTO.getFieldList() - .stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue())) - .collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))); - jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList)); - - jobLogMessageList.add(jobLogMessage); - } - - jobLogMessageMapper.insertBatch(jobLogMessageList); + logService.batchSaveLogs(jobLogTasks); } catch (Exception e) { log.error("保存客户端日志异常.", e); } finally { @@ -81,11 +56,6 @@ public class JobLogActor extends AbstractActor { } private void saveLogMessage(JobLogDTO jobLogDTO) { - JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogDTO); - jobLogMessage.setCreateDt(LocalDateTime.now()); - jobLogMessage.setLogNum(1); - jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY)); - jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L)); - jobLogMessageMapper.insert(jobLogMessage); + logService.saveLog(jobLogDTO); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/log/JobLogStorage.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/log/JobLogStorage.java index 177f38c27..86e234f1b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/log/JobLogStorage.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/log/JobLogStorage.java @@ -10,7 +10,7 @@ import com.aizuda.snailjob.server.common.pekko.ActorGenerator; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.LogMetaDTO; import com.aizuda.snailjob.server.common.log.LogStorageFactory; -import com.aizuda.snailjob.server.job.task.dto.JobLogDTO; +import com.aizuda.snailjob.server.common.dto.JobLogDTO; import com.google.common.collect.Lists; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; diff --git a/snail-job-server/snail-job-server-web/pom.xml b/snail-job-server/snail-job-server-web/pom.xml index 7882ef3c3..c45bb5026 100644 --- a/snail-job-server/snail-job-server-web/pom.xml +++ b/snail-job-server/snail-job-server-web/pom.xml @@ -39,6 +39,10 @@ org.springframework.boot spring-boot-starter-validation + + org.springframework.boot + spring-boot-starter-websocket + diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/config/WebSocketConfigurator.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/config/WebSocketConfigurator.java new file mode 100644 index 000000000..3651bd501 --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/config/WebSocketConfigurator.java @@ -0,0 +1,74 @@ +package com.aizuda.snailjob.server.web.config; + +import com.aizuda.snailjob.common.core.exception.SnailJobAuthenticationException; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.template.datasource.persistence.po.SystemUser; +import com.auth0.jwt.JWT; +import com.auth0.jwt.exceptions.JWTDecodeException; +import jakarta.websocket.HandshakeResponse; +import jakarta.websocket.server.HandshakeRequest; +import jakarta.websocket.server.ServerEndpointConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +import java.util.List; +import java.util.Map; + +/** + * WebScoket配置处理器 + * @since 1.5.0 + */ +@Configuration +public class WebSocketConfigurator extends ServerEndpointConfig.Configurator { + public static final String AUTHENTICATION = "Snail-Job-Auth"; + public static final String SID = "sid"; + public static final String SCENE = "scene"; + public static final String USER_INFO = "user-info"; + + /** + * ServerEndpointExporter 作用 + * + * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint + * + * @return + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } + + @Override + public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { + Map> parameterMap = request.getParameterMap(); + String queryString = request.getQueryString(); + + // 获取 token 中的 user id + SystemUser systemUser; + try { + String token = parameterMap.get(AUTHENTICATION).get(0); + systemUser = JsonUtil.parseObject(JWT.decode(token).getAudience().get(0), SystemUser.class); + } catch (JWTDecodeException j) { + throw new SnailJobAuthenticationException("登陆过期,请重新登陆"); + } + + String sid = parameterMap.get(SID).get(0); + String scene = parameterMap.get(SCENE).get(0); + Map userProperties = sec.getUserProperties(); + + userProperties.put(SID, sid); + userProperties.put(SCENE, scene); + userProperties.put(USER_INFO, systemUser); + + } + + /** + * 初始化端点对象,也就是被@ServerEndpoint所标注的对象 + */ + @Override + public T getEndpointInstance(Class clazz) throws InstantiationException { + return super.getEndpointInstance(clazz); + } + + +} \ No newline at end of file diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java index 7db88fb95..2389d5178 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/controller/JobLogController.java @@ -1,8 +1,8 @@ package com.aizuda.snailjob.server.web.controller; import com.aizuda.snailjob.server.web.annotation.LoginRequired; -import com.aizuda.snailjob.server.web.model.request.JobLogQueryVO; -import com.aizuda.snailjob.server.web.model.response.JobLogResponseVO; +import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; import com.aizuda.snailjob.server.web.service.JobLogService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java new file mode 100644 index 000000000..4ef1b3cfe --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/listener/WsRequestListener.java @@ -0,0 +1,47 @@ +package com.aizuda.snailjob.server.web.listener; + +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; +import com.aizuda.snailjob.server.common.service.LogService; +import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import com.aizuda.snailjob.server.common.vo.WsRequestVO; +import jakarta.websocket.Session; +import lombok.AllArgsConstructor; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +import static com.aizuda.snailjob.server.web.socket.LogServer.USER_SESSION; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.web.listener + * @Project:snail-job + * @Date:2025/3/18 10:56 + * @Filename:WsRequestListener + * @since 1.5.0 + */ +@Component +@AllArgsConstructor +public class WsRequestListener { + private final LogService logService; + + @Async + @EventListener(classes = WsRequestVO.class) + public void getJobLogs(WsRequestVO requestVO) { + if (!WebSocketSceneEnum.JOB_LOG_SCENE.equals(requestVO.getSceneEnum())) { + return; + } + String message = requestVO.getMessage(); + JobLogQueryVO jobLogQueryVO = JsonUtil.parseObject(message, JobLogQueryVO.class); + Session session = USER_SESSION.get(requestVO.getSid()); + try { + logService.getJobLogPage(jobLogQueryVO, session); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/GroupConfigQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/GroupConfigQueryVO.java index acb72dc5d..d2fb29375 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/GroupConfigQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/GroupConfigQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobBatchQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobBatchQueryVO.java index 0e68e5fca..067bb5f6d 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobBatchQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobBatchQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobNotifyConfigQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobNotifyConfigQueryVO.java index e4042b684..4f89c8167 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobNotifyConfigQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobNotifyConfigQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobQueryVO.java index fe6ee68c2..4fa7444fe 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobTaskQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobTaskQueryVO.java index db4b53891..f89524c36 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobTaskQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/JobTaskQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/LineQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/LineQueryVO.java index 47c3d3ab4..f334c72b0 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/LineQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/LineQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NamespaceQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NamespaceQueryVO.java index db2aff5c9..c42cb9247 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NamespaceQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NamespaceQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigQueryVO.java index 0f9de7e76..1692d306c 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyConfigQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyRecipientQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyRecipientQueryVO.java index 5fde4f4bb..7275a0ec2 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyRecipientQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/NotifyRecipientQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryDeadLetterQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryDeadLetterQueryVO.java index f99648c69..dff0dbbe7 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryDeadLetterQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryDeadLetterQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryQueryVO.java index 695bbc46c..057c09fc4 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java index b3f2e1179..cbe7d99f3 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskLogMessageQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java index cf1bf5d3e..8290fa87b 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/RetryTaskQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigQueryVO.java index 11240e136..c198f6be2 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SceneConfigQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ServerNodeQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ServerNodeQueryVO.java index 878a751ae..e3b6756f1 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ServerNodeQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/ServerNodeQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SystemUserQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SystemUserQueryVO.java index 115a6c0b5..93c893117 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SystemUserQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/SystemUserQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowBatchQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowBatchQueryVO.java index 468ac0b4a..45123c935 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowBatchQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowBatchQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowQueryVO.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowQueryVO.java index e66e3f6ed..13deeea2f 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowQueryVO.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/model/request/WorkflowQueryVO.java @@ -1,6 +1,6 @@ package com.aizuda.snailjob.server.web.model.request; -import com.aizuda.snailjob.server.web.model.base.BaseQueryVO; +import com.aizuda.snailjob.server.common.vo.base.BaseQueryVO; import lombok.Data; import lombok.EqualsAndHashCode; diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java index 6c9f8703d..0be6e6a18 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/JobLogService.java @@ -1,7 +1,7 @@ package com.aizuda.snailjob.server.web.service; -import com.aizuda.snailjob.server.web.model.request.JobLogQueryVO; -import com.aizuda.snailjob.server.web.model.response.JobLogResponseVO; +import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; /** * @author: opensnail diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java index fcccda624..c5516841f 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobLogServiceImpl.java @@ -3,8 +3,8 @@ package com.aizuda.snailjob.server.web.service.impl; import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.constant.LogFieldConstants; -import com.aizuda.snailjob.server.web.model.request.JobLogQueryVO; -import com.aizuda.snailjob.server.web.model.response.JobLogResponseVO; +import com.aizuda.snailjob.server.common.vo.JobLogQueryVO; +import com.aizuda.snailjob.server.common.vo.JobLogResponseVO; import com.aizuda.snailjob.server.web.service.JobLogService; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; @@ -20,6 +20,8 @@ import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; +import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; + /** * @author: opensnail * @date : 2023-10-12 09:55 @@ -40,8 +42,8 @@ public class JobLogServiceImpl implements JobLogService { new LambdaQueryWrapper() .select(JobLogMessage::getId, JobLogMessage::getLogNum) .ge(JobLogMessage::getId, queryVO.getStartId()) - .ge(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()) - .ge(JobLogMessage::getJobId, queryVO.getJobId()) + .eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()) +// .ge(JobLogMessage::getJobId, queryVO.getJobId()) .eq(JobLogMessage::getTaskId, queryVO.getTaskId()) .orderByAsc(JobLogMessage::getId).orderByAsc(JobLogMessage::getRealTime)); List records = selectPage.getRecords(); @@ -55,7 +57,9 @@ public class JobLogServiceImpl implements JobLogService { JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); if (Objects.isNull(jobTaskBatch) - || jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now()) + || (COMPLETED.contains(jobTaskBatch.getTaskBatchStatus()) && + jobTaskBatch.getUpdateDt().plusSeconds(15).isBefore(LocalDateTime.now())) + ) { jobLogResponseVO.setFinished(Boolean.TRUE); } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/socket/LogServer.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/socket/LogServer.java new file mode 100644 index 000000000..e059f84c7 --- /dev/null +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/socket/LogServer.java @@ -0,0 +1,60 @@ +package com.aizuda.snailjob.server.web.socket; + +import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.server.common.enums.WebSocketSceneEnum; +import com.aizuda.snailjob.server.common.vo.WsRequestVO; +import com.aizuda.snailjob.server.web.config.WebSocketConfigurator; +import jakarta.websocket.*; +import jakarta.websocket.server.PathParam; +import jakarta.websocket.server.ServerEndpoint; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.web.socket + * @Project:snail-job + * @Date:2025/3/4 16:31 + * @Filename:LogServer + * @since 1.5.0 + */ +@Slf4j +@Component +@ServerEndpoint(value = "/webSocket", configurator = WebSocketConfigurator.class) +public class LogServer { + + // 缓存session + public static final ConcurrentHashMap USER_SESSION = new ConcurrentHashMap<>(); + + + @OnOpen + public void onOpen(Session session) { + Map userProperties = session.getUserProperties(); + USER_SESSION.put((String) userProperties.get(WebSocketConfigurator.SID), session); + } + + + @OnMessage + public void onMessage(String message, Session session) throws IOException, EncodeException { + // 接收请求 + Map userProperties = session.getUserProperties(); + String sid = (String) userProperties.get(WebSocketConfigurator.SID); + String scene = (String) userProperties.get(WebSocketConfigurator.SCENE); + WsRequestVO requestVO = new WsRequestVO(); + requestVO.setSceneEnum(WebSocketSceneEnum.valueOf(scene)); + requestVO.setMessage(message); + requestVO.setSid(sid); + SnailSpringContext.getContext().publishEvent(requestVO); + } + + //错误时调用 + @OnError + public void onError(Session session, Throwable throwable) { + log.error("发生错误{}", throwable); + } + +}