diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/PartitionTask.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/PartitionTask.java index 982b68b2..eeb9b419 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/PartitionTask.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/PartitionTask.java @@ -11,4 +11,6 @@ public class PartitionTask { protected Long id; + protected String uniqueId; + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java index 62427f42..ce90c332 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Mappings; @@ -92,4 +93,6 @@ public interface JobTaskConverter { List toJobPartitionTasks(List jobs); + List toJobTaskBatchPartitionTasks(List jobTaskBatches); + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java new file mode 100644 index 00000000..8a914ba7 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java @@ -0,0 +1,154 @@ +package com.aizuda.easy.retry.server.job.task.support.schedule; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Job清理日志 一小时运行一次 + * + * @author: www.byteblogs.com + * @date : 2023-07-21 13:32 + * @since 2.1.0 + */ +@Component +@Slf4j +public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle { + + @Autowired + private SystemProperties systemProperties; + @Autowired + private JobTaskBatchMapper jobTaskBatchMapper; + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobLogMessageMapper jobLogMessageMapper; + @Autowired + private TransactionTemplate transactionTemplate; + + // last clean log time + private static Long lastCleanLogTime = 0L; + + @Override + public String lockName() { + return "jobClearLog"; + } + + @Override + public String lockAtMost() { + return "PT1H"; + } + + @Override + public String lockAtLeast() { + return "PT1M"; + } + + @Override + protected void doExecute() { + try { + if (systemProperties.getLogStorage() <= 0 || System.currentTimeMillis() - lastCleanLogTime < 24 * 60 * 60 * 1000) { + return; + } + // clean job log + long total; + LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); + total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime), + this::processJobLogPartitionTasks, 0); + + LogUtils.debug(log, "Job clear success total:[{}]", total); + } catch (Exception e) { + LogUtils.error(log, "job clear log error", e); + } finally { + // update clean time + lastCleanLogTime = System.currentTimeMillis(); + } + } + + /** + * JobLog List + * + * @param startId + * @param endTime + * @return + */ + private List jobTaskBatchList(Long startId, LocalDateTime endTime) { + + List jobTaskBatchList = jobTaskBatchMapper.selectPage( + new Page<>(0, 1000), + new LambdaUpdateWrapper().ge(JobTaskBatch::getId, startId).le(JobTaskBatch::getCreateDt, endTime)).getRecords(); + return JobTaskConverter.INSTANCE.toJobTaskBatchPartitionTasks(jobTaskBatchList); + } + + /** + * clean table JobTaskBatch & JobTask & JobLogMessage + * + * @param partitionTasks + */ + public void processJobLogPartitionTasks(List partitionTasks) { + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + + // Waiting for deletion JobTaskBatchList + List ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList()); + if (ids == null || ids.size() == 0) { + return; + } + jobTaskBatchMapper.deleteBatchIds(ids); + + // Waiting for deletion JobTaskList + List jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper().in(JobTask::getTaskBatchId, ids)); + if (jobTaskList == null || jobTaskList.size() == 0) { + return; + } + List jobTaskListIds = jobTaskList.stream().map(i -> i.getId()).collect(Collectors.toList()); + jobTaskMapper.deleteBatchIds(jobTaskListIds); + + // Waiting for deletion JobLogMessageList + List jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper().in(JobLogMessage::getTaskBatchId, ids)); + if (jobLogMessageList == null || jobLogMessageList.size() == 0) { + return; + } + List jobLogMessageListIds = jobLogMessageList.stream().map(i -> i.getId()).collect(Collectors.toList()); + jobTaskMapper.deleteBatchIds(jobLogMessageListIds); + } + }); + } + + @Override + public void start() { + taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H")); + } + + @Override + public void close() { + + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java index 7a078061..1f00083f 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java @@ -5,6 +5,7 @@ import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Mappings; @@ -26,7 +27,7 @@ public interface RetryTaskConverter { RetryTask toRetryTask(RetryTask retryTask); @Mappings({ - @Mapping(target = "id", ignore = true), + @Mapping(target = "id", ignore = true), }) RetryTask toRetryTask(RetryDeadLetter retryDeadLetter); @@ -35,4 +36,7 @@ public interface RetryTaskConverter { RetryTask toRetryTask(TaskContext.TaskInfo taskInfo); List toRetryPartitionTasks(List retryTasks); + + List toRetryTaskLogPartitionTasks(List retryTaskLogList); + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/ClearLogSchedule.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/ClearLogSchedule.java index f7ece827..bceecdba 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/ClearLogSchedule.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/ClearLogSchedule.java @@ -3,21 +3,31 @@ package com.aizuda.easy.retry.server.retry.task.support.schedule; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; +import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; +import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; -import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; -import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage; +import com.aizuda.easy.retry.template.datasource.persistence.po.*; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; import java.time.Duration; import java.time.LocalDateTime; +import java.util.List; +import java.util.stream.Collectors; /** - * 清理日志 一小时运行一次 + * Retry清理日志 一小时运行一次 * * @author: www.byteblogs.com * @date : 2023-07-21 13:32 @@ -33,6 +43,11 @@ public class ClearLogSchedule extends AbstractSchedule implements Lifecycle { private SystemProperties systemProperties; @Autowired private RetryTaskLogMessageMapper retryTaskLogMessageMapper; + @Autowired + private TransactionTemplate transactionTemplate; + + // last clean log time + private static Long lastCleanLogTime = 0L; @Override public String lockName() { @@ -52,14 +67,70 @@ public class ClearLogSchedule extends AbstractSchedule implements Lifecycle { @Override protected void doExecute() { try { + if (systemProperties.getLogStorage() <= 0 || System.currentTimeMillis() - lastCleanLogTime < 24 * 60 * 60 * 1000) { + return; + } + // clean retry log LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); - retryTaskLogMapper.delete(new LambdaUpdateWrapper().le(RetryTaskLog::getCreateDt, endTime)); - retryTaskLogMessageMapper.delete(new LambdaUpdateWrapper().le(RetryTaskLogMessage::getCreateDt, endTime)); + long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime), + this::processRetryLogPartitionTasks, 0); + + LogUtils.debug(log, "Retry clear success total:[{}]", total); } catch (Exception e) { LogUtils.error(log, "clear log error", e); + } finally { + // update clean time + lastCleanLogTime = System.currentTimeMillis(); } } + /** + * RetryLog List + * + * @param startId + * @param endTime + * @return + */ + private List retryTaskBatchList(Long startId, LocalDateTime endTime) { + + List retryTaskLogList = retryTaskLogMapper.selectPage( + new Page<>(0, 1000), + new LambdaUpdateWrapper().ge(RetryTaskLog::getId, startId).le(RetryTaskLog::getCreateDt, endTime)).getRecords(); + return RetryTaskConverter.INSTANCE.toRetryTaskLogPartitionTasks(retryTaskLogList); + } + + /** + * clean table RetryTaskLog & RetryTaskLogMessage + * + * @param partitionTasks + */ + public void processRetryLogPartitionTasks(List partitionTasks) { + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + + List uniqueIdIds = partitionTasks.stream().map(i -> i.getUniqueId()).collect(Collectors.toList()); + if (uniqueIdIds == null || uniqueIdIds.size() == 0) { + return; + } + // Waiting for deletion RetryLog + List retryTaskLogList = retryTaskLogMapper.selectList(new LambdaQueryWrapper().in(RetryTaskLog::getUniqueId, uniqueIdIds)); + if (retryTaskLogList != null && retryTaskLogList.size() > 0) { + List retryTaskListIds = retryTaskLogList.stream().map(i -> i.getId()).collect(Collectors.toList()); + retryTaskLogMapper.deleteBatchIds(retryTaskListIds); + } + + // Waiting for deletion RetryTaskLogMessage + List retryTaskLogMessageList = retryTaskLogMessageMapper.selectList(new LambdaQueryWrapper().in(RetryTaskLogMessage::getUniqueId, uniqueIdIds)); + if (retryTaskLogMessageList != null && retryTaskLogMessageList.size() > 0) { + List retryTaskListIds = retryTaskLogMessageList.stream().map(i -> i.getId()).collect(Collectors.toList()); + retryTaskLogMessageMapper.deleteBatchIds(retryTaskListIds); + } + } + }); + } + @Override public void start() { taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H"));