feat(3.2.0): 修复日志合并事务问题

This commit is contained in:
byteblogs168 2024-04-01 18:52:28 +08:00
parent 328d4f0569
commit 6d0c331ffc
5 changed files with 119 additions and 119 deletions

View File

@ -159,7 +159,7 @@ CREATE TABLE `retry_task_log_message`
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`message` text NOT NULL COMMENT '异常信息',
`message` longtext NOT NULL COMMENT '异常信息',
`log_num` int(11) NOT NULL DEFAULT 1 COMMENT '日志数量',
`real_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上报时间',
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',

View File

@ -22,12 +22,14 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDateTime;
@ -50,18 +52,13 @@ import static java.util.stream.Collectors.*;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
@Autowired
private SystemProperties systemProperties;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobLogMessageMapper jobLogMessageMapper;
@Autowired
private TransactionTemplate transactionTemplate;
private final SystemProperties systemProperties;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobLogMessageMapper jobLogMessageMapper;
private final TransactionTemplate transactionTemplate;
// last merge log time
private static Long lastMergeLogTime = 0L;
@ -92,7 +89,7 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
long total;
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays());
total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime),
this::processJobLogPartitionTasks, 0);
this::processJobLogPartitionTasks, 0);
EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total);
} catch (Exception e) {
@ -113,10 +110,10 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
private List<JobPartitionTaskDTO> jobTaskBatchList(Long startId, LocalDateTime endTime) {
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectPage(
new Page<>(0, 1000),
new LambdaUpdateWrapper<JobTaskBatch>().ge(JobTaskBatch::getId, startId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.COMPLETED)
.le(JobTaskBatch::getCreateDt, endTime)).getRecords();
new Page<>(0, 1000),
new LambdaUpdateWrapper<JobTaskBatch>().ge(JobTaskBatch::getId, startId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.COMPLETED)
.le(JobTaskBatch::getCreateDt, endTime)).getRecords();
return JobTaskConverter.INSTANCE.toJobTaskBatchPartitionTasks(jobTaskBatchList);
}
@ -127,54 +124,56 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
*/
public void processJobLogPartitionTasks(List<? extends PartitionTask> partitionTasks) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// Waiting for merge JobTaskBatchList
List<Long> ids = partitionTasks.stream().map(PartitionTask::getId).collect(Collectors.toList());
if (CollectionUtils.isEmpty(ids)) {
return;
}
// Waiting for merge JobTaskBatchList
List<Long> ids = partitionTasks.stream().map(PartitionTask::getId).collect(Collectors.toList());
if (ids == null || ids.isEmpty()) {
return;
}
// Waiting for deletion JobLogMessageList
List<JobLogMessage> jobLogMessageList = jobLogMessageMapper.selectList(
new LambdaQueryWrapper<JobLogMessage>().in(JobLogMessage::getTaskBatchId, ids));
if (CollectionUtils.isEmpty(jobLogMessageList)) {
return;
}
// Waiting for deletion JobLogMessageList
List<JobLogMessage> jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper<JobLogMessage>().in(JobLogMessage::getTaskBatchId, ids));
if (jobLogMessageList == null || jobLogMessageList.isEmpty()) {
return;
}
List<Map.Entry<Long, List<JobLogMessage>>> jobLogMessageGroupList = jobLogMessageList.stream().collect(
groupingBy(JobLogMessage::getTaskId)).entrySet().stream()
.filter(entry -> entry.getValue().size() >= 2).collect(toList());
List<Map.Entry<Long, List<JobLogMessage>>> jobLogMessageGroupList = jobLogMessageList.stream().collect(
groupingBy(JobLogMessage::getTaskId)).entrySet().stream()
.filter(entry -> entry.getValue().size() >= 2).collect(toList());
for (Map.Entry<Long/*taskId*/, List<JobLogMessage>> jobLogMessageMap : jobLogMessageGroupList) {
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
List<JobLogMessage> jobLogMessageInsertBatchIds = new ArrayList<>();
for (Map.Entry<Long/*taskId*/, List<JobLogMessage>> jobLogMessageMap : jobLogMessageGroupList) {
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
List<JobLogMessage> jobLogMessageInsertBatchIds = new ArrayList<>();
List<String> mergeMessages = jobLogMessageMap.getValue().stream().map(k -> {
jobLogMessageDeleteBatchIds.add(k.getId());
return JsonUtil.parseObject(k.getMessage(), List.class);
})
.reduce((a, b) -> {
// 合并日志信息
List<String> list = new ArrayList<>();
list.addAll(a);
list.addAll(b);
return list;
}).get();
List<String> mergeMessages = jobLogMessageMap.getValue().stream().map(k -> {
jobLogMessageDeleteBatchIds.add(k.getId());
return JsonUtil.parseObject(k.getMessage(), List.class);
})
.reduce((a, b) -> {
// 合并日志信息
List<String> list = new ArrayList<>();
list.addAll(a);
list.addAll(b);
return list;
}).get();
// 500条数据为一个批次
List<List<String>> partitionMessages = Lists.partition(mergeMessages, systemProperties.getMergeLogNum());
// 500条数据为一个批次
List<List<String>> partitionMessages = Lists.partition(mergeMessages, systemProperties.getMergeLogNum());
for (List<String> partitionMessage : partitionMessages) {
// 深拷贝
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(
jobLogMessageMap.getValue().get(0));
for (int i = 0; i < partitionMessages.size(); i++) {
// 深拷贝
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogMessageMap.getValue().get(0));
List<String> messages = partitionMessages.get(i);
jobLogMessage.setLogNum(partitionMessage.size());
jobLogMessage.setMessage(JsonUtil.toJsonString(partitionMessage));
jobLogMessageInsertBatchIds.add(jobLogMessage);
}
jobLogMessage.setLogNum(messages.size());
jobLogMessage.setMessage(JsonUtil.toJsonString(messages));
jobLogMessageInsertBatchIds.add(jobLogMessage);
}
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// 批量删除更新日志
if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) {
@ -184,8 +183,9 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
jobLogMessageMapper.batchInsert(jobLogMessageInsertBatchIds);
}
}
}
});
});
}
}
@Override

View File

@ -16,6 +16,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -40,14 +41,11 @@ import java.util.stream.Collectors;
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class WorkflowJobSummarySchedule extends AbstractSchedule implements Lifecycle {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private JobSummaryMapper jobSummaryMapper;
@Autowired
private SystemProperties systemProperties;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobSummaryMapper jobSummaryMapper;
private final SystemProperties systemProperties;
@Override
public String lockName() {

View File

@ -32,7 +32,7 @@ public interface RetryRpcClient {
@Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST)
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);
@Mapping(path = "/retry/sync/version/v1")
@Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST)
Result syncConfig(@Body ConfigDTO configDTO);
}

View File

@ -29,9 +29,10 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
@ -90,7 +91,7 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle
// merge job log
long total;
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays());
total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime),
total = PartitionTaskUtils.process(startId -> retryLogList(startId, endTime),
this::processJobLogPartitionTasks, 0);
EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total);
@ -109,7 +110,7 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle
* @param endTime
* @return
*/
private List<RetryMergePartitionTaskDTO> jobTaskBatchList(Long startId, LocalDateTime endTime) {
private List<RetryMergePartitionTaskDTO> retryLogList(Long startId, LocalDateTime endTime) {
List<RetryTaskLog> jobTaskBatchList = retryTaskLogMapper.selectPage(
new Page<>(0, 1000),
@ -129,62 +130,62 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle
*/
public void processJobLogPartitionTasks(List<? extends PartitionTask> partitionTasks) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// Waiting for merge RetryTaskLog
List<String> ids = partitionTasks.stream().map(PartitionTask::getUniqueId).collect(Collectors.toList());
if (CollectionUtil.isEmpty(ids)) {
return;
}
// Waiting for merge RetryTaskLog
List<String> ids = partitionTasks.stream().map(PartitionTask::getUniqueId).collect(Collectors.toList());
if (CollectionUtil.isEmpty(ids)) {
return;
}
// Waiting for deletion RetryTaskLogMessage
List<RetryTaskLogMessage> retryLogMessageList = retryTaskLogMessageMapper.selectList(
new LambdaQueryWrapper<RetryTaskLogMessage>().in(RetryTaskLogMessage::getUniqueId, ids));
if (CollectionUtil.isEmpty(retryLogMessageList)) {
return;
}
// Waiting for deletion RetryTaskLogMessage
List<RetryTaskLogMessage> retryLogMessageList = retryTaskLogMessageMapper.selectList(
new LambdaQueryWrapper<RetryTaskLogMessage>().in(RetryTaskLogMessage::getUniqueId, ids));
if (CollectionUtil.isEmpty(retryLogMessageList)) {
return;
}
List<Map.Entry<Triple<String, String, String>, List<RetryTaskLogMessage>>> jobLogMessageGroupList = retryLogMessageList.stream()
.collect(
groupingBy(message -> Triple.of(message.getNamespaceId(), message.getGroupName(),
message.getUniqueId())))
.entrySet().stream()
.filter(entry -> entry.getValue().size() >= 2).collect(toList());
List<Map.Entry<Triple<String, String, String>, List<RetryTaskLogMessage>>> jobLogMessageGroupList = retryLogMessageList.stream()
.collect(
groupingBy(message -> Triple.of(message.getNamespaceId(), message.getGroupName(),
message.getUniqueId())))
.entrySet().stream()
.filter(entry -> entry.getValue().size() >= 2).collect(toList());
for (Map.Entry<Triple<String, String, String>/*taskId*/, List<RetryTaskLogMessage>> jobLogMessageMap : jobLogMessageGroupList) {
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
for (Map.Entry<Triple<String, String, String>/*taskId*/, List<RetryTaskLogMessage>> jobLogMessageMap : jobLogMessageGroupList) {
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
List<String> mergeMessages = jobLogMessageMap.getValue().stream().map(k -> {
jobLogMessageDeleteBatchIds.add(k.getId());
return JsonUtil.parseObject(k.getMessage(), List.class);
})
.reduce((a, b) -> {
// 合并日志信息
List<String> list = new ArrayList<>();
list.addAll(a);
list.addAll(b);
return list;
}).get();
List<String> mergeMessages = jobLogMessageMap.getValue().stream().map(k -> {
jobLogMessageDeleteBatchIds.add(k.getId());
return JsonUtil.parseObject(k.getMessage(), List.class);
})
.reduce((a, b) -> {
// 合并日志信息
List<String> list = new ArrayList<>();
list.addAll(a);
list.addAll(b);
return list;
}).get();
// 500条数据为一个批次
List<List<String>> partitionMessages = Lists.partition(mergeMessages,
systemProperties.getMergeLogNum());
// 500条数据为一个批次
List<List<String>> partitionMessages = Lists.partition(mergeMessages,
systemProperties.getMergeLogNum());
List<RetryTaskLogMessage> jobLogMessageUpdateList = new ArrayList<>();
List<RetryTaskLogMessage> jobLogMessageUpdateList = new ArrayList<>();
for (int i = 0; i < partitionMessages.size(); i++) {
// 深拷贝
RetryTaskLogMessage jobLogMessage = RetryTaskLogConverter.INSTANCE.toRetryTaskLogMessage(
jobLogMessageMap.getValue().get(0));
List<String> messages = partitionMessages.get(i);
for (int i = 0; i < partitionMessages.size(); i++) {
// 深拷贝
RetryTaskLogMessage jobLogMessage = RetryTaskLogConverter.INSTANCE.toRetryTaskLogMessage(
jobLogMessageMap.getValue().get(0));
List<String> messages = partitionMessages.get(i);
jobLogMessage.setLogNum(messages.size());
jobLogMessage.setMessage(JsonUtil.toJsonString(messages));
jobLogMessageUpdateList.add(jobLogMessage);
}
jobLogMessage.setLogNum(messages.size());
jobLogMessage.setMessage(JsonUtil.toJsonString(messages));
jobLogMessageUpdateList.add(jobLogMessage);
}
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// 批量删除更新日志
if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) {
retryTaskLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds);
@ -193,9 +194,10 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle
retryTaskLogMessageMapper.batchInsert(jobLogMessageUpdateList);
}
}
});
}
}
});
}
@Override