From 33627a08e04a5a0753ba168ee69a947025b03b21 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Mon, 25 Mar 2024 18:44:05 +0800 Subject: [PATCH] =?UTF-8?q?feat:=203.2.0=20=E4=BC=98=E5=8C=96=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E5=90=88=E5=B9=B6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/schedule/JobLogMergeSchedule.java | 101 ++++++++---------- 1 file changed, 47 insertions(+), 54 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java index da8918eaa..d2fc63cc0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.job.task.support.schedule; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.collection.Partition; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.log.EasyRetryLog; @@ -20,6 +21,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -30,10 +32,15 @@ import org.springframework.transaction.support.TransactionTemplate; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static java.util.stream.Collectors.*; + /** * jogLogMessage 日志合并归档 * @@ -125,73 +132,59 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { protected void doInTransactionWithoutResult(final TransactionStatus status) { // Waiting for merge JobTaskBatchList - List ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList()); - if (ids == null || ids.size() == 0) { - return; - } - - // Waiting for deletion JobTaskList - List jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper().in(JobTask::getTaskBatchId, ids)); - if (jobTaskList == null || jobTaskList.size() == 0) { + List ids = partitionTasks.stream().map(PartitionTask::getId).collect(Collectors.toList()); + if (ids == null || ids.isEmpty()) { return; } // Waiting for deletion JobLogMessageList List jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper().in(JobLogMessage::getTaskBatchId, ids)); - if (jobLogMessageList == null || jobLogMessageList.size() == 0) { + if (jobLogMessageList == null || jobLogMessageList.isEmpty()) { return; } - List>> jobLogMessageGroupList = jobLogMessageList.stream().collect(Collectors.groupingBy(JobLogMessage::getTaskId)).entrySet().stream() - .filter(entry -> entry.getValue().size() >= 2).collect(Collectors.toList()); + List>> jobLogMessageGroupList = jobLogMessageList.stream().collect( + groupingBy(JobLogMessage::getTaskId)).entrySet().stream() + .filter(entry -> entry.getValue().size() >= 2).collect(toList()); - List jobLogMessageDeleteBatchIds = new ArrayList<>(); - List jobLogMessageUpdateList = new ArrayList<>(); - for (Map.Entry> jobLogMessage : jobLogMessageGroupList) { - Integer sumLogNum = 0, jobLogMessageListNum = 0; - List mergeJobLogMessageList = new ArrayList(); - for (int i = 0; i < jobLogMessage.getValue().size(); i++) { + for (Map.Entry> jobLogMessageMap : jobLogMessageGroupList) { + List jobLogMessageDeleteBatchIds = new ArrayList<>(); + List jobLogMessageUpdateList = new ArrayList<>(); - // 累加合并数 - JobLogMessage logMessage = jobLogMessage.getValue().get(i); - sumLogNum = sumLogNum + logMessage.getLogNum(); + List mergeMessages = jobLogMessageMap.getValue().stream().map(k -> { + jobLogMessageDeleteBatchIds.add(k.getId()); + return JsonUtil.parseObject(k.getMessage(), List.class); + }) + .reduce((a, b) -> { + // 合并日志信息 + List list = new ArrayList<>(); + list.addAll(a); + list.addAll(b); + return list; + }).get(); - // 最后一次合并小于默认日志数量 - if (jobLogMessageListNum == 0 && i == jobLogMessage.getValue().size() - 1) { - break; - } - // 需要合并日志数大于日志默认合并的条数,返回 - if (jobLogMessageListNum == 0 && sumLogNum > systemProperties.getMergeLogNum()) { - sumLogNum = 0; - continue; - } - // 合并更新日志 - logMessage.setLogNum(sumLogNum); - mergeJobLogMessageList.addAll(JsonUtil.parseObject(logMessage.getMessage(), List.class)); - logMessage.setMessage(JsonUtil.toJsonString(mergeJobLogMessageList)); - if (jobLogMessageListNum > 0 && sumLogNum > systemProperties.getMergeLogNum()) { - jobLogMessageUpdateList.add(logMessage); - mergeJobLogMessageList.clear(); - sumLogNum = 0; - jobLogMessageListNum = 0; - } else if (jobLogMessageListNum > 0 && i == jobLogMessage.getValue().size() - 1) { - jobLogMessageUpdateList.add(logMessage); - mergeJobLogMessageList.clear(); - } else { - jobLogMessageDeleteBatchIds.add(logMessage.getId()); - jobLogMessageListNum++; - } + // 500条数据为一个批次 + List> partitionMessages = Lists.partition(mergeMessages, + systemProperties.getMergeLogNum()); + + for (int i = 0; i < partitionMessages.size(); i++) { + JobLogMessage jobLogMessage = jobLogMessageMap.getValue().get(i); + // 剔除不需要删除的数据 + jobLogMessageDeleteBatchIds.remove(jobLogMessage.getId()); + + jobLogMessage.setMessage(JsonUtil.toJsonString(partitionMessages.get(0))); + jobLogMessageUpdateList.add(jobLogMessage); + } + + // 批量删除、更新日志 + if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) { + jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds); + } + if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) { + jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList); } - // GC - mergeJobLogMessageList.clear(); - } - // 批量删除、更新日志 - if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) { - jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds); - } - if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) { - jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList); } + } }); }