feat:2.5.0

1. #I8DAMH 定时批量清除调度任务的历史日志
2. https://gitee.com/aizuda/easy-retry/issues/I8GROG
This commit is contained in:
zhengweilin 2023-11-18 18:58:34 +08:00 committed by byteblogs168
parent 56edc00458
commit 73629980e0
5 changed files with 240 additions and 6 deletions

View File

@ -11,4 +11,6 @@ public class PartitionTask {
protected Long id;
protected String uniqueId;
}

View File

@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.job.task.support.strategy.BlockStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
@ -92,4 +93,6 @@ public interface JobTaskConverter {
List<JobPartitionTask> toJobPartitionTasks(List<Job> jobs);
List<JobPartitionTask> toJobTaskBatchPartitionTasks(List<JobTaskBatch> jobTaskBatches);
}

View File

@ -0,0 +1,154 @@
package com.aizuda.easy.retry.server.job.task.support.schedule;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* Job清理日志 一小时运行一次
*
* @author: www.byteblogs.com
* @date : 2023-07-21 13:32
* @since 2.1.0
*/
@Component
@Slf4j
public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
@Autowired
private SystemProperties systemProperties;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobLogMessageMapper jobLogMessageMapper;
@Autowired
private TransactionTemplate transactionTemplate;
// last clean log time
private static Long lastCleanLogTime = 0L;
@Override
public String lockName() {
return "jobClearLog";
}
@Override
public String lockAtMost() {
return "PT1H";
}
@Override
public String lockAtLeast() {
return "PT1M";
}
@Override
protected void doExecute() {
try {
if (systemProperties.getLogStorage() <= 0 || System.currentTimeMillis() - lastCleanLogTime < 24 * 60 * 60 * 1000) {
return;
}
// clean job log
long total;
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());
total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime),
this::processJobLogPartitionTasks, 0);
LogUtils.debug(log, "Job clear success total:[{}]", total);
} catch (Exception e) {
LogUtils.error(log, "job clear log error", e);
} finally {
// update clean time
lastCleanLogTime = System.currentTimeMillis();
}
}
/**
* JobLog List
*
* @param startId
* @param endTime
* @return
*/
private List<JobPartitionTask> jobTaskBatchList(Long startId, LocalDateTime endTime) {
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectPage(
new Page<>(0, 1000),
new LambdaUpdateWrapper<JobTaskBatch>().ge(JobTaskBatch::getId, startId).le(JobTaskBatch::getCreateDt, endTime)).getRecords();
return JobTaskConverter.INSTANCE.toJobTaskBatchPartitionTasks(jobTaskBatchList);
}
/**
* clean table JobTaskBatch & JobTask & JobLogMessage
*
* @param partitionTasks
*/
public void processJobLogPartitionTasks(List<? extends PartitionTask> partitionTasks) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// Waiting for deletion JobTaskBatchList
List<Long> ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList());
if (ids == null || ids.size() == 0) {
return;
}
jobTaskBatchMapper.deleteBatchIds(ids);
// Waiting for deletion JobTaskList
List<JobTask> jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>().in(JobTask::getTaskBatchId, ids));
if (jobTaskList == null || jobTaskList.size() == 0) {
return;
}
List<Long> jobTaskListIds = jobTaskList.stream().map(i -> i.getId()).collect(Collectors.toList());
jobTaskMapper.deleteBatchIds(jobTaskListIds);
// Waiting for deletion JobLogMessageList
List<JobLogMessage> jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper<JobLogMessage>().in(JobLogMessage::getTaskBatchId, ids));
if (jobLogMessageList == null || jobLogMessageList.size() == 0) {
return;
}
List<Long> jobLogMessageListIds = jobLogMessageList.stream().map(i -> i.getId()).collect(Collectors.toList());
jobTaskMapper.deleteBatchIds(jobLogMessageListIds);
}
});
}
@Override
public void start() {
taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H"));
}
@Override
public void close() {
}
}

View File

@ -5,6 +5,7 @@ import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
@ -35,4 +36,7 @@ public interface RetryTaskConverter {
RetryTask toRetryTask(TaskContext.TaskInfo taskInfo);
List<RetryPartitionTask> toRetryPartitionTasks(List<RetryTask> retryTasks);
List<RetryPartitionTask> toRetryTaskLogPartitionTasks(List<RetryTaskLog> retryTaskLogList);
}

View File

@ -3,21 +3,31 @@ package com.aizuda.easy.retry.server.retry.task.support.schedule;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* 清理日志 一小时运行一次
* Retry清理日志 一小时运行一次
*
* @author: www.byteblogs.com
* @date : 2023-07-21 13:32
@ -33,6 +43,11 @@ public class ClearLogSchedule extends AbstractSchedule implements Lifecycle {
private SystemProperties systemProperties;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Autowired
private TransactionTemplate transactionTemplate;
// last clean log time
private static Long lastCleanLogTime = 0L;
@Override
public String lockName() {
@ -52,14 +67,70 @@ public class ClearLogSchedule extends AbstractSchedule implements Lifecycle {
@Override
protected void doExecute() {
try {
if (systemProperties.getLogStorage() <= 0 || System.currentTimeMillis() - lastCleanLogTime < 24 * 60 * 60 * 1000) {
return;
}
// clean retry log
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());
retryTaskLogMapper.delete(new LambdaUpdateWrapper<RetryTaskLog>().le(RetryTaskLog::getCreateDt, endTime));
retryTaskLogMessageMapper.delete(new LambdaUpdateWrapper<RetryTaskLogMessage>().le(RetryTaskLogMessage::getCreateDt, endTime));
long total = PartitionTaskUtils.process(startId -> retryTaskBatchList(startId, endTime),
this::processRetryLogPartitionTasks, 0);
LogUtils.debug(log, "Retry clear success total:[{}]", total);
} catch (Exception e) {
LogUtils.error(log, "clear log error", e);
} finally {
// update clean time
lastCleanLogTime = System.currentTimeMillis();
}
}
/**
* RetryLog List
*
* @param startId
* @param endTime
* @return
*/
private List<RetryPartitionTask> retryTaskBatchList(Long startId, LocalDateTime endTime) {
List<RetryTaskLog> retryTaskLogList = retryTaskLogMapper.selectPage(
new Page<>(0, 1000),
new LambdaUpdateWrapper<RetryTaskLog>().ge(RetryTaskLog::getId, startId).le(RetryTaskLog::getCreateDt, endTime)).getRecords();
return RetryTaskConverter.INSTANCE.toRetryTaskLogPartitionTasks(retryTaskLogList);
}
/**
* clean table RetryTaskLog & RetryTaskLogMessage
*
* @param partitionTasks
*/
public void processRetryLogPartitionTasks(List<? extends PartitionTask> partitionTasks) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
List<String> uniqueIdIds = partitionTasks.stream().map(i -> i.getUniqueId()).collect(Collectors.toList());
if (uniqueIdIds == null || uniqueIdIds.size() == 0) {
return;
}
// Waiting for deletion RetryLog
List<RetryTaskLog> retryTaskLogList = retryTaskLogMapper.selectList(new LambdaQueryWrapper<RetryTaskLog>().in(RetryTaskLog::getUniqueId, uniqueIdIds));
if (retryTaskLogList != null && retryTaskLogList.size() > 0) {
List<Long> retryTaskListIds = retryTaskLogList.stream().map(i -> i.getId()).collect(Collectors.toList());
retryTaskLogMapper.deleteBatchIds(retryTaskListIds);
}
// Waiting for deletion RetryTaskLogMessage
List<RetryTaskLogMessage> retryTaskLogMessageList = retryTaskLogMessageMapper.selectList(new LambdaQueryWrapper<RetryTaskLogMessage>().in(RetryTaskLogMessage::getUniqueId, uniqueIdIds));
if (retryTaskLogMessageList != null && retryTaskLogMessageList.size() > 0) {
List<Long> retryTaskListIds = retryTaskLogMessageList.stream().map(i -> i.getId()).collect(Collectors.toList());
retryTaskLogMessageMapper.deleteBatchIds(retryTaskListIds);
}
}
});
}
@Override
public void start() {
taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H"));