diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java index ba11c5ef9..497218efe 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java @@ -18,4 +18,6 @@ import java.util.List; public interface JobLogMessageMapper extends BaseMapper { int batchInsert(List list); + + int batchUpdate(List list); } diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml index 986964521..923a94010 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml @@ -35,4 +35,19 @@ ) + + + UPDATE job_log_message jlm, + ( + + SELECT + #{item.message} AS message, + #{item.logNum} AS log_num, + #{item.id} AS id + + ) tt + SET + jlm.message = tt.message, jlm.log_num = tt.log_num + WHERE jlm.id = tt.id + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java index b07bcd358..6ed680e63 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java @@ -63,6 +63,16 @@ public class SystemProperties { */ private int logStorage = 90; + /** + * 合并日志默认保存天数 + */ + private int mergeLogDays = 1; + + /** + * 合并日志默认的条数 + */ + private int mergeLogNum = 500; + /** * 数据库类型 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java index 36dad5142..692908d73 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java @@ -72,6 +72,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle { @Override protected void doExecute() { try { + // 清楚日志默认保存天数大于零、最少保留最近一天的日志数据 if (systemProperties.getLogStorage() <= 0 || System.currentTimeMillis() - lastCleanLogTime < 24 * 60 * 60 * 1000) { return; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogArchivingSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogArchivingSchedule.java deleted file mode 100644 index ef663ec1a..000000000 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogArchivingSchedule.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.aizuda.easy.retry.server.job.task.support.schedule; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * jogLogMessage 日志归档 - * - * @author zhengweilin - * @version 2.6.0 - * @date 2024/01/04 - */ -@Slf4j -@Component -public class JobLogArchivingSchedule { -} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java new file mode 100644 index 000000000..da8918eaa --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java @@ -0,0 +1,208 @@ +package com.aizuda.easy.retry.server.job.task.support.schedule; + +import cn.hutool.core.collection.CollectionUtil; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * jogLogMessage 日志合并归档 + * + * @author zhengweilin + * @version 3.2.0 + * @date 2024/03/15 + */ +@Slf4j +@Component +public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { + + @Autowired + private SystemProperties systemProperties; + @Autowired + private JobTaskBatchMapper jobTaskBatchMapper; + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobLogMessageMapper jobLogMessageMapper; + @Autowired + private TransactionTemplate transactionTemplate; + + // last merge log time + private static Long lastMergeLogTime = 0L; + + @Override + public String lockName() { + return "jobLogMerge"; + } + + @Override + public String lockAtMost() { + return "PT1H"; + } + + @Override + public String lockAtLeast() { + return "PT1M"; + } + + @Override + protected void doExecute() { + try { + // 合并日志数据最少保留最近一天的日志数据 + if (System.currentTimeMillis() - lastMergeLogTime < 24 * 60 * 60 * 1000) { + return; + } + // merge job log + long total; + LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays()); + total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime), + this::processJobLogPartitionTasks, 0); + + EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total); + } catch (Exception e) { + EasyRetryLog.LOCAL.error("job merge log error", e); + } finally { + // update merge time + lastMergeLogTime = System.currentTimeMillis(); + } + } + + /** + * JobLog List + * + * @param startId + * @param endTime + * @return + */ + private List jobTaskBatchList(Long startId, LocalDateTime endTime) { + + List jobTaskBatchList = jobTaskBatchMapper.selectPage( + new Page<>(0, 1000), + new LambdaUpdateWrapper().ge(JobTaskBatch::getId, startId) + .eq(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.SUCCESS.getStatus()) + .le(JobTaskBatch::getCreateDt, endTime)).getRecords(); + return JobTaskConverter.INSTANCE.toJobTaskBatchPartitionTasks(jobTaskBatchList); + } + + /** + * merge job_log_message + * + * @param partitionTasks + */ + public void processJobLogPartitionTasks(List partitionTasks) { + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + + // Waiting for merge JobTaskBatchList + List ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList()); + if (ids == null || ids.size() == 0) { + return; + } + + // Waiting for deletion JobTaskList + List jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper().in(JobTask::getTaskBatchId, ids)); + if (jobTaskList == null || jobTaskList.size() == 0) { + return; + } + + // Waiting for deletion JobLogMessageList + List jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper().in(JobLogMessage::getTaskBatchId, ids)); + if (jobLogMessageList == null || jobLogMessageList.size() == 0) { + return; + } + + List>> jobLogMessageGroupList = jobLogMessageList.stream().collect(Collectors.groupingBy(JobLogMessage::getTaskId)).entrySet().stream() + .filter(entry -> entry.getValue().size() >= 2).collect(Collectors.toList()); + + List jobLogMessageDeleteBatchIds = new ArrayList<>(); + List jobLogMessageUpdateList = new ArrayList<>(); + for (Map.Entry> jobLogMessage : jobLogMessageGroupList) { + Integer sumLogNum = 0, jobLogMessageListNum = 0; + List mergeJobLogMessageList = new ArrayList(); + for (int i = 0; i < jobLogMessage.getValue().size(); i++) { + + // 累加合并数 + JobLogMessage logMessage = jobLogMessage.getValue().get(i); + sumLogNum = sumLogNum + logMessage.getLogNum(); + + // 最后一次合并小于默认日志数量 + if (jobLogMessageListNum == 0 && i == jobLogMessage.getValue().size() - 1) { + break; + } + // 需要合并日志数大于日志默认合并的条数,返回 + if (jobLogMessageListNum == 0 && sumLogNum > systemProperties.getMergeLogNum()) { + sumLogNum = 0; + continue; + } + // 合并更新日志 + logMessage.setLogNum(sumLogNum); + mergeJobLogMessageList.addAll(JsonUtil.parseObject(logMessage.getMessage(), List.class)); + logMessage.setMessage(JsonUtil.toJsonString(mergeJobLogMessageList)); + if (jobLogMessageListNum > 0 && sumLogNum > systemProperties.getMergeLogNum()) { + jobLogMessageUpdateList.add(logMessage); + mergeJobLogMessageList.clear(); + sumLogNum = 0; + jobLogMessageListNum = 0; + } else if (jobLogMessageListNum > 0 && i == jobLogMessage.getValue().size() - 1) { + jobLogMessageUpdateList.add(logMessage); + mergeJobLogMessageList.clear(); + } else { + jobLogMessageDeleteBatchIds.add(logMessage.getId()); + jobLogMessageListNum++; + } + } + // GC + mergeJobLogMessageList.clear(); + } + // 批量删除、更新日志 + if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) { + jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds); + } + if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) { + jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList); + } + } + }); + } + + @Override + public void start() { + taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H")); + } + + @Override + public void close() { + + } +}