feat: 3.2.0
优化日志合并逻辑
This commit is contained in:
parent
2354573c29
commit
33627a08e0
@ -1,6 +1,7 @@
|
||||
package com.aizuda.easy.retry.server.job.task.support.schedule;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.collection.Partition;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||
import com.aizuda.easy.retry.common.log.EasyRetryLog;
|
||||
@ -20,6 +21,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -30,10 +32,15 @@ import org.springframework.transaction.support.TransactionTemplate;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.stream.Collectors.*;
|
||||
|
||||
/**
|
||||
* jogLogMessage 日志合并归档
|
||||
*
|
||||
@ -125,73 +132,59 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
|
||||
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||
|
||||
// Waiting for merge JobTaskBatchList
|
||||
List<Long> ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList());
|
||||
if (ids == null || ids.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Waiting for deletion JobTaskList
|
||||
List<JobTask> jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>().in(JobTask::getTaskBatchId, ids));
|
||||
if (jobTaskList == null || jobTaskList.size() == 0) {
|
||||
List<Long> ids = partitionTasks.stream().map(PartitionTask::getId).collect(Collectors.toList());
|
||||
if (ids == null || ids.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Waiting for deletion JobLogMessageList
|
||||
List<JobLogMessage> jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper<JobLogMessage>().in(JobLogMessage::getTaskBatchId, ids));
|
||||
if (jobLogMessageList == null || jobLogMessageList.size() == 0) {
|
||||
if (jobLogMessageList == null || jobLogMessageList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Map.Entry<Long, List<JobLogMessage>>> jobLogMessageGroupList = jobLogMessageList.stream().collect(Collectors.groupingBy(JobLogMessage::getTaskId)).entrySet().stream()
|
||||
.filter(entry -> entry.getValue().size() >= 2).collect(Collectors.toList());
|
||||
List<Map.Entry<Long, List<JobLogMessage>>> jobLogMessageGroupList = jobLogMessageList.stream().collect(
|
||||
groupingBy(JobLogMessage::getTaskId)).entrySet().stream()
|
||||
.filter(entry -> entry.getValue().size() >= 2).collect(toList());
|
||||
|
||||
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
|
||||
List<JobLogMessage> jobLogMessageUpdateList = new ArrayList<>();
|
||||
for (Map.Entry<Long, List<JobLogMessage>> jobLogMessage : jobLogMessageGroupList) {
|
||||
Integer sumLogNum = 0, jobLogMessageListNum = 0;
|
||||
List mergeJobLogMessageList = new ArrayList();
|
||||
for (int i = 0; i < jobLogMessage.getValue().size(); i++) {
|
||||
for (Map.Entry<Long/*taskId*/, List<JobLogMessage>> jobLogMessageMap : jobLogMessageGroupList) {
|
||||
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
|
||||
List<JobLogMessage> jobLogMessageUpdateList = new ArrayList<>();
|
||||
|
||||
// 累加合并数
|
||||
JobLogMessage logMessage = jobLogMessage.getValue().get(i);
|
||||
sumLogNum = sumLogNum + logMessage.getLogNum();
|
||||
List<String> mergeMessages = jobLogMessageMap.getValue().stream().map(k -> {
|
||||
jobLogMessageDeleteBatchIds.add(k.getId());
|
||||
return JsonUtil.parseObject(k.getMessage(), List.class);
|
||||
})
|
||||
.reduce((a, b) -> {
|
||||
// 合并日志信息
|
||||
List<String> list = new ArrayList<>();
|
||||
list.addAll(a);
|
||||
list.addAll(b);
|
||||
return list;
|
||||
}).get();
|
||||
|
||||
// 最后一次合并小于默认日志数量
|
||||
if (jobLogMessageListNum == 0 && i == jobLogMessage.getValue().size() - 1) {
|
||||
break;
|
||||
}
|
||||
// 需要合并日志数大于日志默认合并的条数,返回
|
||||
if (jobLogMessageListNum == 0 && sumLogNum > systemProperties.getMergeLogNum()) {
|
||||
sumLogNum = 0;
|
||||
continue;
|
||||
}
|
||||
// 合并更新日志
|
||||
logMessage.setLogNum(sumLogNum);
|
||||
mergeJobLogMessageList.addAll(JsonUtil.parseObject(logMessage.getMessage(), List.class));
|
||||
logMessage.setMessage(JsonUtil.toJsonString(mergeJobLogMessageList));
|
||||
if (jobLogMessageListNum > 0 && sumLogNum > systemProperties.getMergeLogNum()) {
|
||||
jobLogMessageUpdateList.add(logMessage);
|
||||
mergeJobLogMessageList.clear();
|
||||
sumLogNum = 0;
|
||||
jobLogMessageListNum = 0;
|
||||
} else if (jobLogMessageListNum > 0 && i == jobLogMessage.getValue().size() - 1) {
|
||||
jobLogMessageUpdateList.add(logMessage);
|
||||
mergeJobLogMessageList.clear();
|
||||
} else {
|
||||
jobLogMessageDeleteBatchIds.add(logMessage.getId());
|
||||
jobLogMessageListNum++;
|
||||
}
|
||||
// 500条数据为一个批次
|
||||
List<List<String>> partitionMessages = Lists.partition(mergeMessages,
|
||||
systemProperties.getMergeLogNum());
|
||||
|
||||
for (int i = 0; i < partitionMessages.size(); i++) {
|
||||
JobLogMessage jobLogMessage = jobLogMessageMap.getValue().get(i);
|
||||
// 剔除不需要删除的数据
|
||||
jobLogMessageDeleteBatchIds.remove(jobLogMessage.getId());
|
||||
|
||||
jobLogMessage.setMessage(JsonUtil.toJsonString(partitionMessages.get(0)));
|
||||
jobLogMessageUpdateList.add(jobLogMessage);
|
||||
}
|
||||
|
||||
// 批量删除、更新日志
|
||||
if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) {
|
||||
jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) {
|
||||
jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList);
|
||||
}
|
||||
// GC
|
||||
mergeJobLogMessageList.clear();
|
||||
}
|
||||
// 批量删除、更新日志
|
||||
if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) {
|
||||
jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds);
|
||||
}
|
||||
if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) {
|
||||
jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList);
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user