定时任务实时日志合并提交

This commit is contained in:
zhengweilin 2024-03-19 10:24:08 +08:00
parent a76c8f45aa
commit 71a0e984f8
6 changed files with 236 additions and 16 deletions

View File

@ -18,4 +18,6 @@ import java.util.List;
public interface JobLogMessageMapper extends BaseMapper<JobLogMessage> {
int batchInsert(List<JobLogMessage> list);
int batchUpdate(List<JobLogMessage> list);
}

View File

@ -35,4 +35,19 @@
)
</foreach>
</insert>
<update id="batchUpdate" parameterType="java.util.List">
UPDATE job_log_message jlm,
(
<foreach collection="list" item="item" index="index" separator=" UNION ALL ">
SELECT
#{item.message} AS message,
#{item.logNum} AS log_num,
#{item.id} AS id
</foreach>
) tt
SET
jlm.message = tt.message, jlm.log_num = tt.log_num
WHERE jlm.id = tt.id
</update>
</mapper>

View File

@ -63,6 +63,16 @@ public class SystemProperties {
*/
private int logStorage = 90;
/**
* 合并日志默认保存天数
*/
private int mergeLogDays = 1;
/**
* 合并日志默认的条数
*/
private int mergeLogNum = 500;
/**
* 数据库类型
*/

View File

@ -72,6 +72,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
@Override
protected void doExecute() {
try {
// 清楚日志默认保存天数大于零最少保留最近一天的日志数据
if (systemProperties.getLogStorage() <= 0 || System.currentTimeMillis() - lastCleanLogTime < 24 * 60 * 60 * 1000) {
return;
}

View File

@ -1,16 +0,0 @@
package com.aizuda.easy.retry.server.job.task.support.schedule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* jogLogMessage 日志归档
*
* @author zhengweilin
* @version 2.6.0
* @date 2024/01/04
*/
@Slf4j
@Component
public class JobLogArchivingSchedule {
}

View File

@ -0,0 +1,208 @@
package com.aizuda.easy.retry.server.job.task.support.schedule;
import cn.hutool.core.collection.CollectionUtil;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* jogLogMessage 日志合并归档
*
* @author zhengweilin
* @version 3.2.0
* @date 2024/03/15
*/
@Slf4j
@Component
public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
@Autowired
private SystemProperties systemProperties;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobLogMessageMapper jobLogMessageMapper;
@Autowired
private TransactionTemplate transactionTemplate;
// last merge log time
private static Long lastMergeLogTime = 0L;
@Override
public String lockName() {
return "jobLogMerge";
}
@Override
public String lockAtMost() {
return "PT1H";
}
@Override
public String lockAtLeast() {
return "PT1M";
}
@Override
protected void doExecute() {
try {
// 合并日志数据最少保留最近一天的日志数据
if (System.currentTimeMillis() - lastMergeLogTime < 24 * 60 * 60 * 1000) {
return;
}
// merge job log
long total;
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays());
total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime),
this::processJobLogPartitionTasks, 0);
EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total);
} catch (Exception e) {
EasyRetryLog.LOCAL.error("job merge log error", e);
} finally {
// update merge time
lastMergeLogTime = System.currentTimeMillis();
}
}
/**
* JobLog List
*
* @param startId
* @param endTime
* @return
*/
private List<JobPartitionTaskDTO> jobTaskBatchList(Long startId, LocalDateTime endTime) {
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectPage(
new Page<>(0, 1000),
new LambdaUpdateWrapper<JobTaskBatch>().ge(JobTaskBatch::getId, startId)
.eq(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.SUCCESS.getStatus())
.le(JobTaskBatch::getCreateDt, endTime)).getRecords();
return JobTaskConverter.INSTANCE.toJobTaskBatchPartitionTasks(jobTaskBatchList);
}
/**
* merge job_log_message
*
* @param partitionTasks
*/
public void processJobLogPartitionTasks(List<? extends PartitionTask> partitionTasks) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// Waiting for merge JobTaskBatchList
List<Long> ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList());
if (ids == null || ids.size() == 0) {
return;
}
// Waiting for deletion JobTaskList
List<JobTask> jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>().in(JobTask::getTaskBatchId, ids));
if (jobTaskList == null || jobTaskList.size() == 0) {
return;
}
// Waiting for deletion JobLogMessageList
List<JobLogMessage> jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper<JobLogMessage>().in(JobLogMessage::getTaskBatchId, ids));
if (jobLogMessageList == null || jobLogMessageList.size() == 0) {
return;
}
List<Map.Entry<Long, List<JobLogMessage>>> jobLogMessageGroupList = jobLogMessageList.stream().collect(Collectors.groupingBy(JobLogMessage::getTaskId)).entrySet().stream()
.filter(entry -> entry.getValue().size() >= 2).collect(Collectors.toList());
List<Long> jobLogMessageDeleteBatchIds = new ArrayList<>();
List<JobLogMessage> jobLogMessageUpdateList = new ArrayList<>();
for (Map.Entry<Long, List<JobLogMessage>> jobLogMessage : jobLogMessageGroupList) {
Integer sumLogNum = 0, jobLogMessageListNum = 0;
List mergeJobLogMessageList = new ArrayList();
for (int i = 0; i < jobLogMessage.getValue().size(); i++) {
// 累加合并数
JobLogMessage logMessage = jobLogMessage.getValue().get(i);
sumLogNum = sumLogNum + logMessage.getLogNum();
// 最后一次合并小于默认日志数量
if (jobLogMessageListNum == 0 && i == jobLogMessage.getValue().size() - 1) {
break;
}
// 需要合并日志数大于日志默认合并的条数,返回
if (jobLogMessageListNum == 0 && sumLogNum > systemProperties.getMergeLogNum()) {
sumLogNum = 0;
continue;
}
// 合并更新日志
logMessage.setLogNum(sumLogNum);
mergeJobLogMessageList.addAll(JsonUtil.parseObject(logMessage.getMessage(), List.class));
logMessage.setMessage(JsonUtil.toJsonString(mergeJobLogMessageList));
if (jobLogMessageListNum > 0 && sumLogNum > systemProperties.getMergeLogNum()) {
jobLogMessageUpdateList.add(logMessage);
mergeJobLogMessageList.clear();
sumLogNum = 0;
jobLogMessageListNum = 0;
} else if (jobLogMessageListNum > 0 && i == jobLogMessage.getValue().size() - 1) {
jobLogMessageUpdateList.add(logMessage);
mergeJobLogMessageList.clear();
} else {
jobLogMessageDeleteBatchIds.add(logMessage.getId());
jobLogMessageListNum++;
}
}
// GC
mergeJobLogMessageList.clear();
}
// 批量删除更新日志
if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) {
jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds);
}
if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) {
jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList);
}
}
});
}
@Override
public void start() {
taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H"));
}
@Override
public void close() {
}
}