feat(1.5.0-beta1): 定时清理日志中加入对 workflowBatch 的清理逻辑

This commit is contained in:
Srzou 2025-04-09 22:30:50 +08:00 committed by opensnail
parent 50003098ec
commit cf91d88ac2

View File

@ -12,6 +12,7 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
@ -48,6 +49,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobTaskMapper jobTaskMapper;
private final JobLogMessageMapper jobLogMessageMapper;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final TransactionTemplate transactionTemplate;
@Override
@ -120,6 +122,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
Set<Long> jobTaskListIds = new HashSet<>();
Set<Long> jobLogMessageListIds = new HashSet<>();
Set<Long> workflowBatchIds = new HashSet<>();
for (List<Long> ids : idsPartition) {
// Waiting for deletion JobTaskList
@ -138,6 +141,15 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
Set<Long> jobLogMessage = jobLogMessageList.stream().map(JobLogMessage::getId).collect(Collectors.toSet());
jobLogMessageListIds.addAll(jobLogMessage);
}
// 先找出对应的 workflowTaskBatchId
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>().
select(JobTaskBatch::getWorkflowTaskBatchId)
.in(JobTaskBatch::getId, ids));
if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
Set<Long> workflowTaskBatchId = jobTaskBatchList.stream().map(JobTaskBatch::getWorkflowTaskBatchId).collect(Collectors.toSet());
workflowBatchIds.addAll(workflowTaskBatchId);
}
}
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@ -151,6 +163,9 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
if (!CollectionUtils.isEmpty(jobLogMessageListIds)) {
Lists.partition(jobLogMessageListIds.stream().toList(), 500).forEach(jobLogMessageMapper::deleteByIds);
}
if (!CollectionUtils.isEmpty(workflowBatchIds)) {
Lists.partition(workflowBatchIds.stream().toList(), 500).forEach(workflowTaskBatchMapper::deleteByIds);
}
}
});
}