diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index d9a980b9..a2195248 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -159,7 +159,7 @@ CREATE TABLE `retry_task_log_message` `group_name` varchar(64) NOT NULL COMMENT '组名称', `unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `message` text NOT NULL COMMENT '异常信息', + `message` longtext NOT NULL COMMENT '异常信息', `log_num` int(11) NOT NULL DEFAULT 1 COMMENT '日志数量', `real_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上报时间', `client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port', diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java index ee250c5c..d9cc8e94 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java @@ -22,12 +22,14 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.util.CollectionUtils; import java.time.Duration; import java.time.LocalDateTime; @@ -50,18 +52,13 @@ import static java.util.stream.Collectors.*; */ @Slf4j @Component +@RequiredArgsConstructor public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { - @Autowired - private SystemProperties systemProperties; - @Autowired - private JobTaskBatchMapper jobTaskBatchMapper; - @Autowired - private JobTaskMapper jobTaskMapper; - @Autowired - private JobLogMessageMapper jobLogMessageMapper; - @Autowired - private TransactionTemplate transactionTemplate; + private final SystemProperties systemProperties; + private final JobTaskBatchMapper jobTaskBatchMapper; + private final JobLogMessageMapper jobLogMessageMapper; + private final TransactionTemplate transactionTemplate; // last merge log time private static Long lastMergeLogTime = 0L; @@ -92,7 +89,7 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { long total; LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays()); total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime), - this::processJobLogPartitionTasks, 0); + this::processJobLogPartitionTasks, 0); EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total); } catch (Exception e) { @@ -113,10 +110,10 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { private List jobTaskBatchList(Long startId, LocalDateTime endTime) { List jobTaskBatchList = jobTaskBatchMapper.selectPage( - new Page<>(0, 1000), - new LambdaUpdateWrapper().ge(JobTaskBatch::getId, startId) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.COMPLETED) - .le(JobTaskBatch::getCreateDt, endTime)).getRecords(); + new Page<>(0, 1000), + new LambdaUpdateWrapper().ge(JobTaskBatch::getId, startId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.COMPLETED) + .le(JobTaskBatch::getCreateDt, endTime)).getRecords(); return JobTaskConverter.INSTANCE.toJobTaskBatchPartitionTasks(jobTaskBatchList); } @@ -127,54 +124,56 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { */ public void processJobLogPartitionTasks(List partitionTasks) { - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(final TransactionStatus status) { + // Waiting for merge JobTaskBatchList + List ids = partitionTasks.stream().map(PartitionTask::getId).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(ids)) { + return; + } - // Waiting for merge JobTaskBatchList - List ids = partitionTasks.stream().map(PartitionTask::getId).collect(Collectors.toList()); - if (ids == null || ids.isEmpty()) { - return; - } + // Waiting for deletion JobLogMessageList + List jobLogMessageList = jobLogMessageMapper.selectList( + new LambdaQueryWrapper().in(JobLogMessage::getTaskBatchId, ids)); + if (CollectionUtils.isEmpty(jobLogMessageList)) { + return; + } - // Waiting for deletion JobLogMessageList - List jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper().in(JobLogMessage::getTaskBatchId, ids)); - if (jobLogMessageList == null || jobLogMessageList.isEmpty()) { - return; - } + List>> jobLogMessageGroupList = jobLogMessageList.stream().collect( + groupingBy(JobLogMessage::getTaskId)).entrySet().stream() + .filter(entry -> entry.getValue().size() >= 2).collect(toList()); - List>> jobLogMessageGroupList = jobLogMessageList.stream().collect( - groupingBy(JobLogMessage::getTaskId)).entrySet().stream() - .filter(entry -> entry.getValue().size() >= 2).collect(toList()); + for (Map.Entry> jobLogMessageMap : jobLogMessageGroupList) { + List jobLogMessageDeleteBatchIds = new ArrayList<>(); + List jobLogMessageInsertBatchIds = new ArrayList<>(); - for (Map.Entry> jobLogMessageMap : jobLogMessageGroupList) { - List jobLogMessageDeleteBatchIds = new ArrayList<>(); - List jobLogMessageInsertBatchIds = new ArrayList<>(); + List mergeMessages = jobLogMessageMap.getValue().stream().map(k -> { + jobLogMessageDeleteBatchIds.add(k.getId()); + return JsonUtil.parseObject(k.getMessage(), List.class); + }) + .reduce((a, b) -> { + // 合并日志信息 + List list = new ArrayList<>(); + list.addAll(a); + list.addAll(b); + return list; + }).get(); - List mergeMessages = jobLogMessageMap.getValue().stream().map(k -> { - jobLogMessageDeleteBatchIds.add(k.getId()); - return JsonUtil.parseObject(k.getMessage(), List.class); - }) - .reduce((a, b) -> { - // 合并日志信息 - List list = new ArrayList<>(); - list.addAll(a); - list.addAll(b); - return list; - }).get(); + // 500条数据为一个批次 + List> partitionMessages = Lists.partition(mergeMessages, systemProperties.getMergeLogNum()); - // 500条数据为一个批次 - List> partitionMessages = Lists.partition(mergeMessages, systemProperties.getMergeLogNum()); + for (List partitionMessage : partitionMessages) { + // 深拷贝 + JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage( + jobLogMessageMap.getValue().get(0)); - for (int i = 0; i < partitionMessages.size(); i++) { - // 深拷贝 - JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogMessageMap.getValue().get(0)); - List messages = partitionMessages.get(i); + jobLogMessage.setLogNum(partitionMessage.size()); + jobLogMessage.setMessage(JsonUtil.toJsonString(partitionMessage)); + jobLogMessageInsertBatchIds.add(jobLogMessage); + } - jobLogMessage.setLogNum(messages.size()); - jobLogMessage.setMessage(JsonUtil.toJsonString(messages)); - jobLogMessageInsertBatchIds.add(jobLogMessage); - } + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { // 批量删除、更新日志 if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) { @@ -184,8 +183,9 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { jobLogMessageMapper.batchInsert(jobLogMessageInsertBatchIds); } } - } - }); + }); + } + } @Override diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java index 7e0aec74..3ec83368 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java @@ -16,6 +16,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -40,14 +41,11 @@ import java.util.stream.Collectors; */ @Component @Slf4j +@RequiredArgsConstructor public class WorkflowJobSummarySchedule extends AbstractSchedule implements Lifecycle { - - @Autowired - private JobTaskBatchMapper jobTaskBatchMapper; - @Autowired - private JobSummaryMapper jobSummaryMapper; - @Autowired - private SystemProperties systemProperties; + private final JobTaskBatchMapper jobTaskBatchMapper; + private final JobSummaryMapper jobSummaryMapper; + private final SystemProperties systemProperties; @Override public String lockName() { diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java index 512de391..eac313d2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java @@ -32,7 +32,7 @@ public interface RetryRpcClient { @Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST) Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO); - @Mapping(path = "/retry/sync/version/v1") + @Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST) Result syncConfig(@Body ConfigDTO configDTO); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetryLogMergeSchedule.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetryLogMergeSchedule.java index 24cf4fe1..4db4efee 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetryLogMergeSchedule.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetryLogMergeSchedule.java @@ -29,9 +29,10 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; @@ -90,7 +91,7 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle // merge job log long total; LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays()); - total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime), + total = PartitionTaskUtils.process(startId -> retryLogList(startId, endTime), this::processJobLogPartitionTasks, 0); EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total); @@ -109,7 +110,7 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle * @param endTime * @return */ - private List jobTaskBatchList(Long startId, LocalDateTime endTime) { + private List retryLogList(Long startId, LocalDateTime endTime) { List jobTaskBatchList = retryTaskLogMapper.selectPage( new Page<>(0, 1000), @@ -129,62 +130,62 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle */ public void processJobLogPartitionTasks(List partitionTasks) { - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(final TransactionStatus status) { + // Waiting for merge RetryTaskLog + List ids = partitionTasks.stream().map(PartitionTask::getUniqueId).collect(Collectors.toList()); + if (CollectionUtil.isEmpty(ids)) { + return; + } - // Waiting for merge RetryTaskLog - List ids = partitionTasks.stream().map(PartitionTask::getUniqueId).collect(Collectors.toList()); - if (CollectionUtil.isEmpty(ids)) { - return; - } + // Waiting for deletion RetryTaskLogMessage + List retryLogMessageList = retryTaskLogMessageMapper.selectList( + new LambdaQueryWrapper().in(RetryTaskLogMessage::getUniqueId, ids)); + if (CollectionUtil.isEmpty(retryLogMessageList)) { + return; + } - // Waiting for deletion RetryTaskLogMessage - List retryLogMessageList = retryTaskLogMessageMapper.selectList( - new LambdaQueryWrapper().in(RetryTaskLogMessage::getUniqueId, ids)); - if (CollectionUtil.isEmpty(retryLogMessageList)) { - return; - } + List, List>> jobLogMessageGroupList = retryLogMessageList.stream() + .collect( + groupingBy(message -> Triple.of(message.getNamespaceId(), message.getGroupName(), + message.getUniqueId()))) + .entrySet().stream() + .filter(entry -> entry.getValue().size() >= 2).collect(toList()); - List, List>> jobLogMessageGroupList = retryLogMessageList.stream() - .collect( - groupingBy(message -> Triple.of(message.getNamespaceId(), message.getGroupName(), - message.getUniqueId()))) - .entrySet().stream() - .filter(entry -> entry.getValue().size() >= 2).collect(toList()); + for (Map.Entry/*taskId*/, List> jobLogMessageMap : jobLogMessageGroupList) { + List jobLogMessageDeleteBatchIds = new ArrayList<>(); - for (Map.Entry/*taskId*/, List> jobLogMessageMap : jobLogMessageGroupList) { - List jobLogMessageDeleteBatchIds = new ArrayList<>(); + List mergeMessages = jobLogMessageMap.getValue().stream().map(k -> { + jobLogMessageDeleteBatchIds.add(k.getId()); + return JsonUtil.parseObject(k.getMessage(), List.class); + }) + .reduce((a, b) -> { + // 合并日志信息 + List list = new ArrayList<>(); + list.addAll(a); + list.addAll(b); + return list; + }).get(); - List mergeMessages = jobLogMessageMap.getValue().stream().map(k -> { - jobLogMessageDeleteBatchIds.add(k.getId()); - return JsonUtil.parseObject(k.getMessage(), List.class); - }) - .reduce((a, b) -> { - // 合并日志信息 - List list = new ArrayList<>(); - list.addAll(a); - list.addAll(b); - return list; - }).get(); + // 500条数据为一个批次 + List> partitionMessages = Lists.partition(mergeMessages, + systemProperties.getMergeLogNum()); - // 500条数据为一个批次 - List> partitionMessages = Lists.partition(mergeMessages, - systemProperties.getMergeLogNum()); + List jobLogMessageUpdateList = new ArrayList<>(); - List jobLogMessageUpdateList = new ArrayList<>(); + for (int i = 0; i < partitionMessages.size(); i++) { + // 深拷贝 + RetryTaskLogMessage jobLogMessage = RetryTaskLogConverter.INSTANCE.toRetryTaskLogMessage( + jobLogMessageMap.getValue().get(0)); + List messages = partitionMessages.get(i); - for (int i = 0; i < partitionMessages.size(); i++) { - // 深拷贝 - RetryTaskLogMessage jobLogMessage = RetryTaskLogConverter.INSTANCE.toRetryTaskLogMessage( - jobLogMessageMap.getValue().get(0)); - List messages = partitionMessages.get(i); - - jobLogMessage.setLogNum(messages.size()); - jobLogMessage.setMessage(JsonUtil.toJsonString(messages)); - jobLogMessageUpdateList.add(jobLogMessage); - } + jobLogMessage.setLogNum(messages.size()); + jobLogMessage.setMessage(JsonUtil.toJsonString(messages)); + jobLogMessageUpdateList.add(jobLogMessage); + } + transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { // 批量删除、更新日志 if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) { retryTaskLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds); @@ -193,9 +194,10 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle retryTaskLogMessageMapper.batchInsert(jobLogMessageUpdateList); } } + }); + } + - } - }); } @Override