From 7c070bc3a8743cb691a1ecc768e7958de00661fc Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 14 Mar 2024 22:43:20 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=203.1.1=201.=20=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E7=BA=A7=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/job/task/support/dispatch/JobExecutorActor.java | 6 +++--- .../job/task/support/dispatch/JobExecutorResultActor.java | 2 +- .../server/job/task/support/dispatch/ScanJobTaskActor.java | 2 +- .../job/task/support/dispatch/WorkflowExecutorActor.java | 6 +++--- .../task/support/prepare/job/RunningJobPrepareHandler.java | 2 +- .../task/support/prepare/job/TerminalJobPrepareHandler.java | 2 +- .../job/task/support/prepare/job/WaitJobPrepareHandler.java | 2 +- .../request/ReportDispatchResultPostHttpRequestHandler.java | 2 +- .../retry/server/job/task/support/timer/JobTimerTask.java | 2 +- .../retry/server/job/task/support/timer/JobTimerWheel.java | 2 +- .../server/job/task/support/timer/WorkflowTimerTask.java | 2 +- pom.xml | 2 +- 12 files changed, 16 insertions(+), 16 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 4c7a896e..2a9019ec 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -79,7 +79,7 @@ public class JobExecutorActor extends AbstractActor { public Receive createReceive() { return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> { try { - log.info("准备执行任务. [{}] [{}]", LocalDateTime.now(), JsonUtil.toJsonString(taskExecute)); + log.debug("准备执行任务. [{}] [{}]", LocalDateTime.now(), JsonUtil.toJsonString(taskExecute)); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { @@ -148,7 +148,7 @@ public class JobExecutorActor extends AbstractActor { JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList)); } finally { - log.info("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute)); + log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute)); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCompletion(int status) { @@ -224,7 +224,7 @@ public class JobExecutorActor extends AbstractActor { // 获取时间差的毫秒数 long milliseconds = nextTriggerAt - preTriggerAt; - log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000); + log.debug("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000); job.setNextTriggerAt(nextTriggerAt); JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java index 2605c1b7..8e5fcb23 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -58,7 +58,7 @@ public class JobExecutorResultActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder().match(JobExecutorResultDTO.class, result -> { - log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); + log.debug("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); try { JobTask jobTask = new JobTask(); jobTask.setTaskStatus(result.getTaskStatus()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index bc8fe95b..386bd7a8 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -76,7 +76,7 @@ public class ScanJobTaskActor extends AbstractActor { long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), this::processJobPartitionTasks, 0); - log.info("job scan end. total:[{}]", total); + log.debug("job scan end. total:[{}]", total); } private void processJobPartitionTasks(List partitionTasks) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index 6f21d7f5..e2722c0d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -63,7 +63,7 @@ public class WorkflowExecutorActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> { - log.info("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute)); + log.debug("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute)); try { doExecutor(taskExecute); @@ -195,14 +195,14 @@ public class WorkflowExecutorActor extends AbstractActor { List jobTaskBatches = jobTaskBatchMap.get(nodeId); // 说明此节点未执行, 继续等待执行完成 if (CollectionUtils.isEmpty(jobTaskBatches)) { - EasyRetryLog.LOCAL.info("存在未完成的兄弟节点. [{}]", nodeId); + EasyRetryLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId); return Boolean.FALSE; } boolean isCompleted = jobTaskBatches.stream().anyMatch( jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())); if (isCompleted) { - EasyRetryLog.LOCAL.info("存在未完成的兄弟节点. [{}]", nodeId); + EasyRetryLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId); return Boolean.FALSE; } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/RunningJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/RunningJobPrepareHandler.java index cf7f32d2..96c2765c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/RunningJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/RunningJobPrepareHandler.java @@ -40,7 +40,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler { @Override protected void doHandler(JobTaskPrepareDTO prepare) { - log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare)); + log.debug("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare)); // 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理 int blockStrategy = prepare.getBlockStrategy(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/TerminalJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/TerminalJobPrepareHandler.java index 0839aea7..dbd820cb 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/TerminalJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/TerminalJobPrepareHandler.java @@ -35,7 +35,7 @@ public class TerminalJobPrepareHandler extends AbstractJobPrePareHandler { @Override protected void doHandler(JobTaskPrepareDTO jobPrepareDTO) { - log.info("无处理中的数据. jobId:[{}]", jobPrepareDTO.getJobId()); + log.debug("无处理中的数据. jobId:[{}]", jobPrepareDTO.getJobId()); // 生成任务批次 jobTaskBatchGenerator.generateJobTaskBatch(JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(jobPrepareDTO)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/WaitJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/WaitJobPrepareHandler.java index 8dacb63f..5fdf67f4 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/WaitJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/job/WaitJobPrepareHandler.java @@ -30,7 +30,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { @Override protected void doHandler(JobTaskPrepareDTO jobPrepareDTO) { - log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); + log.debug("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); // 若时间轮中数据不存在则重新加入 if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId())) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java index e4f17fec..2fb6a1b5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java @@ -41,7 +41,7 @@ public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestH @Override public String doHandler(String content, UrlQuery query, HttpHeaders headers) { - EasyRetryLog.LOCAL.info("Client Callback Request. content:[{}]", content); + EasyRetryLog.LOCAL.debug("Client Callback Request. content:[{}]", content); EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class); Object[] args = retryRequest.getArgs(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java index 6a11f8f6..a053b944 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java @@ -25,7 +25,7 @@ public class JobTimerTask implements TimerTask { @Override public void run(final Timeout timeout) throws Exception { // 执行任务调度 - log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); + log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); try { TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java index 6e89d0f1..5b165f02 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerWheel.java @@ -44,7 +44,7 @@ public class JobTimerWheel implements Lifecycle { public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) { if (!isExisted(taskType, uniqueId)) { - log.info("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); + log.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId); delay = delay < 0 ? 0 : delay; try { timer.newTimeout(task, delay, unit); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java index 535d5081..fcd81956 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java @@ -26,7 +26,7 @@ public class WorkflowTimerTask implements TimerTask { @Override public void run(final Timeout timeout) throws Exception { // 执行任务调度 - log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId()); + log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId()); try { diff --git a/pom.xml b/pom.xml index 5a2bc9ae..0a4e0979 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ 17 17 17 - 3.1.0 + 3.2.0-SNAPSHOT 1.0.0 4.1.94.Final 5.8.25 From 811663472246b386a4e9c30faae6295dd9da06db Mon Sep 17 00:00:00 2001 From: zhengweilin Date: Fri, 15 Mar 2024 16:23:55 +0800 Subject: [PATCH 2/2] =?UTF-8?q?[dev=5F3.2.0=5F]=201.1=20=E5=88=86=E7=89=87?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E6=97=B6=E6=97=A5=E5=BF=97=E5=AE=9A=E6=9C=9F?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E5=90=88=E5=B9=B6,I93NFY?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mapper/JobLogMessageMapper.java | 2 + .../mysql/mapper/JobLogMessageMapper.xml | 15 ++ .../common/config/SystemProperties.java | 10 + .../support/schedule/JobClearLogSchedule.java | 1 + .../schedule/JobLogArchivingSchedule.java | 16 -- .../support/schedule/JobLogMergeSchedule.java | 208 ++++++++++++++++++ 6 files changed, 236 insertions(+), 16 deletions(-) delete mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogArchivingSchedule.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java index ba11c5ef..497218ef 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobLogMessageMapper.java @@ -18,4 +18,6 @@ import java.util.List; public interface JobLogMessageMapper extends BaseMapper { int batchInsert(List list); + + int batchUpdate(List list); } diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml index 98696452..923a9401 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobLogMessageMapper.xml @@ -35,4 +35,19 @@ ) + + + UPDATE job_log_message jlm, + ( + + SELECT + #{item.message} AS message, + #{item.logNum} AS log_num, + #{item.id} AS id + + ) tt + SET + jlm.message = tt.message, jlm.log_num = tt.log_num + WHERE jlm.id = tt.id + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java index b07bcd35..6ed680e6 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java @@ -63,6 +63,16 @@ public class SystemProperties { */ private int logStorage = 90; + /** + * 合并日志默认保存天数 + */ + private int mergeLogDays = 1; + + /** + * 合并日志默认的条数 + */ + private int mergeLogNum = 500; + /** * 数据库类型 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java index 36dad514..692908d7 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java @@ -72,6 +72,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle { @Override protected void doExecute() { try { + // 清楚日志默认保存天数大于零、最少保留最近一天的日志数据 if (systemProperties.getLogStorage() <= 0 || System.currentTimeMillis() - lastCleanLogTime < 24 * 60 * 60 * 1000) { return; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogArchivingSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogArchivingSchedule.java deleted file mode 100644 index ef663ec1..00000000 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogArchivingSchedule.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.aizuda.easy.retry.server.job.task.support.schedule; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * jogLogMessage 日志归档 - * - * @author zhengweilin - * @version 2.6.0 - * @date 2024/01/04 - */ -@Slf4j -@Component -public class JobLogArchivingSchedule { -} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java new file mode 100644 index 00000000..da8918ea --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobLogMergeSchedule.java @@ -0,0 +1,208 @@ +package com.aizuda.easy.retry.server.job.task.support.schedule; + +import cn.hutool.core.collection.CollectionUtil; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * jogLogMessage 日志合并归档 + * + * @author zhengweilin + * @version 3.2.0 + * @date 2024/03/15 + */ +@Slf4j +@Component +public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle { + + @Autowired + private SystemProperties systemProperties; + @Autowired + private JobTaskBatchMapper jobTaskBatchMapper; + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobLogMessageMapper jobLogMessageMapper; + @Autowired + private TransactionTemplate transactionTemplate; + + // last merge log time + private static Long lastMergeLogTime = 0L; + + @Override + public String lockName() { + return "jobLogMerge"; + } + + @Override + public String lockAtMost() { + return "PT1H"; + } + + @Override + public String lockAtLeast() { + return "PT1M"; + } + + @Override + protected void doExecute() { + try { + // 合并日志数据最少保留最近一天的日志数据 + if (System.currentTimeMillis() - lastMergeLogTime < 24 * 60 * 60 * 1000) { + return; + } + // merge job log + long total; + LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getMergeLogDays()); + total = PartitionTaskUtils.process(startId -> jobTaskBatchList(startId, endTime), + this::processJobLogPartitionTasks, 0); + + EasyRetryLog.LOCAL.debug("job merge success total:[{}]", total); + } catch (Exception e) { + EasyRetryLog.LOCAL.error("job merge log error", e); + } finally { + // update merge time + lastMergeLogTime = System.currentTimeMillis(); + } + } + + /** + * JobLog List + * + * @param startId + * @param endTime + * @return + */ + private List jobTaskBatchList(Long startId, LocalDateTime endTime) { + + List jobTaskBatchList = jobTaskBatchMapper.selectPage( + new Page<>(0, 1000), + new LambdaUpdateWrapper().ge(JobTaskBatch::getId, startId) + .eq(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.SUCCESS.getStatus()) + .le(JobTaskBatch::getCreateDt, endTime)).getRecords(); + return JobTaskConverter.INSTANCE.toJobTaskBatchPartitionTasks(jobTaskBatchList); + } + + /** + * merge job_log_message + * + * @param partitionTasks + */ + public void processJobLogPartitionTasks(List partitionTasks) { + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + + // Waiting for merge JobTaskBatchList + List ids = partitionTasks.stream().map(i -> i.getId()).collect(Collectors.toList()); + if (ids == null || ids.size() == 0) { + return; + } + + // Waiting for deletion JobTaskList + List jobTaskList = jobTaskMapper.selectList(new LambdaQueryWrapper().in(JobTask::getTaskBatchId, ids)); + if (jobTaskList == null || jobTaskList.size() == 0) { + return; + } + + // Waiting for deletion JobLogMessageList + List jobLogMessageList = jobLogMessageMapper.selectList(new LambdaQueryWrapper().in(JobLogMessage::getTaskBatchId, ids)); + if (jobLogMessageList == null || jobLogMessageList.size() == 0) { + return; + } + + List>> jobLogMessageGroupList = jobLogMessageList.stream().collect(Collectors.groupingBy(JobLogMessage::getTaskId)).entrySet().stream() + .filter(entry -> entry.getValue().size() >= 2).collect(Collectors.toList()); + + List jobLogMessageDeleteBatchIds = new ArrayList<>(); + List jobLogMessageUpdateList = new ArrayList<>(); + for (Map.Entry> jobLogMessage : jobLogMessageGroupList) { + Integer sumLogNum = 0, jobLogMessageListNum = 0; + List mergeJobLogMessageList = new ArrayList(); + for (int i = 0; i < jobLogMessage.getValue().size(); i++) { + + // 累加合并数 + JobLogMessage logMessage = jobLogMessage.getValue().get(i); + sumLogNum = sumLogNum + logMessage.getLogNum(); + + // 最后一次合并小于默认日志数量 + if (jobLogMessageListNum == 0 && i == jobLogMessage.getValue().size() - 1) { + break; + } + // 需要合并日志数大于日志默认合并的条数,返回 + if (jobLogMessageListNum == 0 && sumLogNum > systemProperties.getMergeLogNum()) { + sumLogNum = 0; + continue; + } + // 合并更新日志 + logMessage.setLogNum(sumLogNum); + mergeJobLogMessageList.addAll(JsonUtil.parseObject(logMessage.getMessage(), List.class)); + logMessage.setMessage(JsonUtil.toJsonString(mergeJobLogMessageList)); + if (jobLogMessageListNum > 0 && sumLogNum > systemProperties.getMergeLogNum()) { + jobLogMessageUpdateList.add(logMessage); + mergeJobLogMessageList.clear(); + sumLogNum = 0; + jobLogMessageListNum = 0; + } else if (jobLogMessageListNum > 0 && i == jobLogMessage.getValue().size() - 1) { + jobLogMessageUpdateList.add(logMessage); + mergeJobLogMessageList.clear(); + } else { + jobLogMessageDeleteBatchIds.add(logMessage.getId()); + jobLogMessageListNum++; + } + } + // GC + mergeJobLogMessageList.clear(); + } + // 批量删除、更新日志 + if (CollectionUtil.isNotEmpty(jobLogMessageDeleteBatchIds)) { + jobLogMessageMapper.deleteBatchIds(jobLogMessageDeleteBatchIds); + } + if (CollectionUtil.isNotEmpty(jobLogMessageUpdateList)) { + jobLogMessageMapper.batchUpdate(jobLogMessageUpdateList); + } + } + }); + } + + @Override + public void start() { + taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H")); + } + + @Override + public void close() { + + } +}