diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 4bbd8ce3..8e9ee0c7 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -138,6 +138,7 @@ CREATE TABLE `retry_task_log` `retry_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '重试状态 0、重试中 1、成功 2、最大次数', `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), KEY `idx_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`), KEY `idx_retry_status` (`retry_status`), diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/RetryTaskLog.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/RetryTaskLog.java index 9e2735ae..b4c46f91 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/RetryTaskLog.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/RetryTaskLog.java @@ -39,6 +39,8 @@ public class RetryTaskLog implements Serializable { private LocalDateTime createDt; + private LocalDateTime updateDt; + private static final long serialVersionUID = 1L; } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryMergePartitionTaskDTO.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryMergePartitionTaskDTO.java new file mode 100644 index 00000000..c9799532 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryMergePartitionTaskDTO.java @@ -0,0 +1,16 @@ +package com.aizuda.easy.retry.server.retry.task.dto; + +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author: xiaowoniu + * @date : 2024-03-27 + * @since : 3.2.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class RetryMergePartitionTaskDTO extends PartitionTask { + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskLogConverter.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskLogConverter.java index 0a5f3675..b254c184 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskLogConverter.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskLogConverter.java @@ -1,13 +1,17 @@ package com.aizuda.easy.retry.server.retry.task.support; +import com.aizuda.easy.retry.server.retry.task.dto.RetryMergePartitionTaskDTO; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Mappings; import org.mapstruct.factory.Mappers; +import java.util.List; + /** * @author: www.byteblogs.com * @date : 2023-05-05 16:15 @@ -23,4 +27,8 @@ public interface RetryTaskLogConverter { RetryTaskLog toRetryTask(RetryTask retryTask); RetryTaskLogDTO toRetryTaskLogDTO(RetryTask retryTask); + + List toRetryMergePartitionTaskDTOs(List retryTaskLogList); + + RetryTaskLogMessage toRetryTaskLogMessage(RetryTaskLogMessage message); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetryLogMergeSchedule.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetryLogMergeSchedule.java new file mode 100644 index 00000000..24cf4fe1 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetryLogMergeSchedule.java @@ -0,0 +1,210 @@ +package com.aizuda.easy.retry.server.retry.task.support.schedule; + +import akka.io.SelectionHandler.Retry; +import cn.hutool.core.collection.CollectionUtil; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.triple.Triple; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; +import com.aizuda.easy.retry.server.retry.task.dto.RetryMergePartitionTaskDTO; +import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; + +/** + * jogLogMessage 日志合并归档 + * + * @author zhengweilin + * @version 3.2.0 + * @date 2024/03/15 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle { + + private final SystemProperties systemProperties; + private final RetryTaskLogMapper retryTaskLogMapper; + private final RetryTaskLogMessageMapper retryTaskLogMessageMapper; + private final TransactionTemplate transactionTemplate; + + // last merge log time + private static Long lastMergeLogTime = 0L; + + @Override + public String lockName() { + return "retryLogMerge"; + } + + @Override + public String lockAtMost() { + return "PT1H"; + } + + @Override + public String lockAtLeast() { + return "PT1M"; + } + + @Override + protected void doExecute() { + try { + // 合并日志数据最少保留最近一天的日志数据 + if (System.currentTimeMillis() - lastMergeLogTime < 24 * 60 * 60 * 1000) { + return; + } + // merge job log + long total; + LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays()); + total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime), + this::processJobLogPartitionTasks, 0); + + EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total); + } catch (Exception e) { + EasyRetryLog.LOCAL.error("job merge log error", e); + } finally { + // update merge time + lastMergeLogTime = System.currentTimeMillis(); + } + } + + /** + * JobLog List + * + * @param startId + * @param endTime + * @return + */ + private List jobTaskBatchList(Long startId, LocalDateTime endTime) { + + List jobTaskBatchList = retryTaskLogMapper.selectPage( + new Page<>(0, 1000), + new LambdaUpdateWrapper() + .ge(RetryTaskLog::getId, startId) + .in(RetryTaskLog::getRetryStatus, Lists.newArrayList( + RetryStatusEnum.FINISH.getStatus(), + RetryStatusEnum.MAX_COUNT.getStatus())) + .le(RetryTaskLog::getCreateDt, endTime)).getRecords(); + return RetryTaskLogConverter.INSTANCE.toRetryMergePartitionTaskDTOs(jobTaskBatchList); + } + + /** + * merge job_log_message + * + * @param partitionTasks + */ + public void processJobLogPartitionTasks(List partitionTasks) { + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + + // Waiting for merge RetryTaskLog + List ids = partitionTasks.stream().map(PartitionTask::getUniqueId).collect(Collectors.toList()); + if (CollectionUtil.isEmpty(ids)) { + return; + } + + // Waiting for deletion RetryTaskLogMessage + List retryLogMessageList = retryTaskLogMessageMapper.selectList( + new LambdaQueryWrapper().in(RetryTaskLogMessage::getUniqueId, ids)); + if (CollectionUtil.isEmpty(retryLogMessageList)) { + return; + } + + List, List>> jobLogMessageGroupList = retryLogMessageList.stream() + .collect( + groupingBy(message -> Triple.of(message.getNamespaceId(), message.getGroupName(), + message.getUniqueId()))) + .entrySet().stream() + .filter(entry -> entry.getValue().size() >= 2).collect(toList()); + + for (Map.Entry/*taskId*/, List> jobLogMessageMap : jobLogMessageGroupList) { + List jobLogMessageDeleteBatchIds = new ArrayList<>(); + + List mergeMessages = jobLogMessageMap.getValue().stream().map(k -> { + jobLogMessageDeleteBatchIds.add(k.getId()); + return JsonUtil.parseObject(k.getMessage(), List.class); + }) + .reduce((a, b) -> { + // 合并日志信息 + List list = new ArrayList<>(); + list.addAll(a); + list.addAll(b); + return list; + }).get(); + + // 500条数据为一个批次 + List> partitionMessages = Lists.partition(mergeMessages, + systemProperties.getMergeLogNum()); + + List jobLogMessageUpdateList = new ArrayList<>(); + + for (int i = 0; i < partitionMessages.size(); i++) { + // 深拷贝 + RetryTaskLogMessage jobLogMessage = RetryTaskLogConverter.INSTANCE.toRetryTaskLogMessage( + jobLogMessageMap.getValue().get(0)); + List messages = partitionMessages.get(i); + + jobLogMessage.setLogNum(messages.size()); + jobLogMessage.setMessage(JsonUtil.toJsonString(messages)); + jobLogMessageUpdateList.add(jobLogMessage); + } + + // 批量删除、更新日志 + if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) { + retryTaskLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds); + } + if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) { + retryTaskLogMessageMapper.batchInsert(jobLogMessageUpdateList); + } + } + + } + }); + } + + @Override + public void start() { + taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H")); + } + + @Override + public void close() { + + } +}