Merge remote-tracking branch 'origin/dev_3.2.0' into dev_3.2.0

This commit is contained in:
zhengweilin 2024-03-26 09:08:56 +08:00
commit 3fc200f993

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.schedule;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.Partition;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
@ -20,6 +21,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -30,10 +32,15 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.*;
/**
* jogLogMessage 日志合并归档
*
@ -125,73 +132,59 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// Waiting for merge JobTaskBatchList
List<Long> ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList());
if (ids == null || ids.size() == 0) {
return;
}
// Waiting for deletion JobTaskList
List<JobTask> jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>().in(JobTask::getTaskBatchId, ids));
if (jobTaskList == null || jobTaskList.size() == 0) {
List<Long> ids = partitionTasks.stream().map(PartitionTask::getId).collect(Collectors.toList());
if (ids == null || ids.isEmpty()) {
return;
}
// Waiting for deletion JobLogMessageList
List<JobLogMessage> jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper<JobLogMessage>().in(JobLogMessage::getTaskBatchId, ids));
if (jobLogMessageList == null || jobLogMessageList.size() == 0) {
if (jobLogMessageList == null || jobLogMessageList.isEmpty()) {
return;
}
List<Map.Entry<Long, List<JobLogMessage>>> jobLogMessageGroupList = jobLogMessageList.stream().collect(Collectors.groupingBy(JobLogMessage::getTaskId)).entrySet().stream()
.filter(entry -> entry.getValue().size() >= 2).collect(Collectors.toList());
List<Map.Entry<Long, List<JobLogMessage>>> jobLogMessageGroupList = jobLogMessageList.stream().collect(
groupingBy(JobLogMessage::getTaskId)).entrySet().stream()
.filter(entry -> entry.getValue().size() >= 2).collect(toList());
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
List<JobLogMessage> jobLogMessageUpdateList = new ArrayList<>();
for (Map.Entry<Long, List<JobLogMessage>> jobLogMessage : jobLogMessageGroupList) {
Integer sumLogNum = 0, jobLogMessageListNum = 0;
List mergeJobLogMessageList = new ArrayList();
for (int i = 0; i < jobLogMessage.getValue().size(); i++) {
for (Map.Entry<Long/*taskId*/, List<JobLogMessage>> jobLogMessageMap : jobLogMessageGroupList) {
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
List<JobLogMessage> jobLogMessageUpdateList = new ArrayList<>();
// 累加合并数
JobLogMessage logMessage = jobLogMessage.getValue().get(i);
sumLogNum = sumLogNum + logMessage.getLogNum();
List<String> mergeMessages = jobLogMessageMap.getValue().stream().map(k -> {
jobLogMessageDeleteBatchIds.add(k.getId());
return JsonUtil.parseObject(k.getMessage(), List.class);
})
.reduce((a, b) -> {
// 合并日志信息
List<String> list = new ArrayList<>();
list.addAll(a);
list.addAll(b);
return list;
}).get();
// 最后一次合并小于默认日志数量
if (jobLogMessageListNum == 0 && i == jobLogMessage.getValue().size() - 1) {
break;
}
// 需要合并日志数大于日志默认合并的条数,返回
if (jobLogMessageListNum == 0 && sumLogNum > systemProperties.getMergeLogNum()) {
sumLogNum = 0;
continue;
}
// 合并更新日志
logMessage.setLogNum(sumLogNum);
mergeJobLogMessageList.addAll(JsonUtil.parseObject(logMessage.getMessage(), List.class));
logMessage.setMessage(JsonUtil.toJsonString(mergeJobLogMessageList));
if (jobLogMessageListNum > 0 && sumLogNum > systemProperties.getMergeLogNum()) {
jobLogMessageUpdateList.add(logMessage);
mergeJobLogMessageList.clear();
sumLogNum = 0;
jobLogMessageListNum = 0;
} else if (jobLogMessageListNum > 0 && i == jobLogMessage.getValue().size() - 1) {
jobLogMessageUpdateList.add(logMessage);
mergeJobLogMessageList.clear();
} else {
jobLogMessageDeleteBatchIds.add(logMessage.getId());
jobLogMessageListNum++;
}
// 500条数据为一个批次
List<List<String>> partitionMessages = Lists.partition(mergeMessages,
systemProperties.getMergeLogNum());
for (int i = 0; i < partitionMessages.size(); i++) {
JobLogMessage jobLogMessage = jobLogMessageMap.getValue().get(i);
// 剔除不需要删除的数据
jobLogMessageDeleteBatchIds.remove(jobLogMessage.getId());
jobLogMessage.setMessage(JsonUtil.toJsonString(partitionMessages.get(0)));
jobLogMessageUpdateList.add(jobLogMessage);
}
// 批量删除更新日志
if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) {
jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds);
}
if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) {
jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList);
}
// GC
mergeJobLogMessageList.clear();
}
// 批量删除更新日志
if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) {
jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds);
}
if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) {
jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList);
}
}
});
}