From 304fab86f84974044bf86e240d9e4002f7a05047 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Thu, 11 Jan 2024 00:12:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=AB=AF=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_mysql.sql | 2 + .../core/constant/LogFieldConstant.java | 17 +---- .../retry/common/core/log/LogContentDTO.java | 35 ---------- .../server/common/lock/JdbcLockProvider.java | 3 - .../retry/server/job/task/dto/JobLogDTO.java | 4 ++ .../retry/server/job/task/dto/LogMetaDTO.java | 43 ++++++++++++ .../EasyRetryServerLogbackAppender.java | 65 ++++++++++++++++--- .../task/support/dispatch/JobLogActor.java | 7 +- .../workflow/CallbackWorkflowExecutor.java | 31 ++++++--- .../handler/DistributedLockHandler.java | 7 ++ 10 files changed, 137 insertions(+), 77 deletions(-) create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/LogMetaDTO.java diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index d01e50155..69732a6a7 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -319,6 +319,8 @@ CREATE TABLE `job_log_message` `task_batch_id` bigint(20) NOT NULL COMMENT '任务批次id', `task_id` bigint(20) NOT NULL COMMENT '调度任务id', `message` text NOT NULL COMMENT '调度信息', + `log_num` int(11) NULL DEFAULT NULL COMMENT '日志数量', + `real_time` bigint(13) NULL DEFAULT NULL COMMENT '上报时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', PRIMARY KEY (`id`), diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/LogFieldConstant.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/LogFieldConstant.java index 421f3d1a9..be5f67c5e 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/LogFieldConstant.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/LogFieldConstant.java @@ -17,21 +17,6 @@ public class LogFieldConstant { public static final String LEVEL = "level"; public static final String LOCATION = "location"; public static final String THROWABLE = "throwable"; - public static final String LOG = "log"; + public static final String LOG_META = "log_meta"; - public static final String JOB_ID = "jobId"; - - public static final String JOB_INSTANCE_ID = "job_instance_id"; - - public static final String JOB_INSTANCE_TASK_ID = "job_instance_task_id"; - - public static final String JOB_DISPATCH_VERSION = "job_dispatch_version"; - - public static final String TASK_ID = "job_instance_taskId"; - - public static final String CIRCLE_ID = "circle_id"; - - public static final String WORKER_ADDRESS = "worker_address"; - - public static final String DELAY_TOPIC = "delay_topic"; } diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogContentDTO.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogContentDTO.java index a744c34e0..951af740e 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogContentDTO.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/log/LogContentDTO.java @@ -59,39 +59,4 @@ public class LogContentDTO { this.addField(LogFieldConstant.THROWABLE, throwable); } - public void addLogField(String log) { - this.addField(LogFieldConstant.LOG, log); - } - - public void addJobIdField(Long jobInstanceId) { - this.addField(LogFieldConstant.JOB_ID, String.valueOf(jobInstanceId)); - } - - public void addJobInstanceIdField(Long jobInstanceId) { - this.addField(LogFieldConstant.JOB_INSTANCE_ID, String.valueOf(jobInstanceId)); - } - - public void addJobInstanceTaskIdField(Long taskId) { - this.addField(LogFieldConstant.JOB_INSTANCE_TASK_ID, String.valueOf(taskId)); - } - - public void addJobDispatchVersionTaskIdField(Long version) { - this.addField(LogFieldConstant.JOB_DISPATCH_VERSION, String.valueOf(version)); - } - - public void addTaskIdField(String taskId) { - this.addField(LogFieldConstant.TASK_ID, taskId); - } - - public void addCircleIdField(Long circleId) { - this.addField(LogFieldConstant.CIRCLE_ID, String.valueOf(circleId)); - } - - public void addDelayTopic(String topic) { - this.addField(LogFieldConstant.DELAY_TOPIC, topic); - } - - public void addWorkerAddressField(String workerAddress) { - this.addField(LogFieldConstant.WORKER_ADDRESS, workerAddress); - } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java index b86bf30bb..46e54a568 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java @@ -117,9 +117,6 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle @Override public void start() { - // 删除已经过期的锁记录 - distributedLockMapper.delete(new LambdaQueryWrapper() - .le(DistributedLock::getLockUntil, LocalDateTime.now().minusSeconds(10))); } @Override diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobLogDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobLogDTO.java index 184b1b36a..789be467b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobLogDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobLogDTO.java @@ -41,5 +41,9 @@ public class JobLogDTO { */ private String message; + /** + * 真实上报时间 + */ + private Long realTime; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/LogMetaDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/LogMetaDTO.java new file mode 100644 index 000000000..24a061df1 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/LogMetaDTO.java @@ -0,0 +1,43 @@ +package com.aizuda.easy.retry.server.job.task.dto; + +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import lombok.Data; + +/** + * @author xiaowoniu + * @date 2024-01-10 22:56:33 + * @since 2.6.0 + */ +@Data +public class LogMetaDTO { + + /** + * 命名空间 + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 任务信息id + */ + private Long jobId; + + /** + * 任务实例id + */ + private Long taskBatchId; + + /** + * 调度任务id + */ + private Long taskId; + + @Override + public String toString() { + return JsonUtil.toJsonString(taskBatchId); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/appender/EasyRetryServerLogbackAppender.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/appender/EasyRetryServerLogbackAppender.java index 4739b88fa..61c4563f9 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/appender/EasyRetryServerLogbackAppender.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/appender/EasyRetryServerLogbackAppender.java @@ -1,16 +1,30 @@ package com.aizuda.easy.retry.server.job.task.support.appender; +import akka.actor.ActorRef; import ch.qos.logback.classic.spi.IThrowableProxy; import ch.qos.logback.classic.spi.LoggingEvent; import ch.qos.logback.classic.spi.StackTraceElementProxy; import ch.qos.logback.classic.spi.ThrowableProxyUtil; import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.UnsynchronizedAppenderBase; +import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.constant.LogFieldConstant; import com.aizuda.easy.retry.common.core.log.LogContentDTO; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.EasyRetryLog; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; +import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO; +import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; import org.slf4j.MDC; +import org.springframework.util.CollectionUtils; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * @author wodeyangzipingpingwuqi @@ -32,20 +46,53 @@ public class EasyRetryServerLogbackAppender extends UnsynchronizedAppenderBas return; } - MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE); - // Prepare processing ((LoggingEvent) eventObject).prepareForDeferredProcessing(); LoggingEvent event = (LoggingEvent) eventObject; - LogContentDTO logContentDTO = new LogContentDTO(); - logContentDTO.addTimeStamp(event.getTimeStamp()); - logContentDTO.addLevelField(event.getLevel().levelStr); - logContentDTO.addThreadField(event.getThreadName()); - logContentDTO.addMessageField(event.getFormattedMessage()); - logContentDTO.addLocationField(getLocationField(event)); - logContentDTO.addThrowableField(getThrowableField(event)); + LogMetaDTO logMetaDTO = null; + try { + // 第一种是MDC + String logMetaStr = MDC.getMDCAdapter().get(LogFieldConstant.LOG_META); + if (StrUtil.isNotBlank(logMetaStr)) { + logMetaDTO = JsonUtil.parseObject(logMetaStr, LogMetaDTO.class); + } else { + // 第二种规则是按照正则匹配 + String patternString = "<\\|>(.*?)<\\|>"; + Pattern pattern = Pattern.compile(patternString); + Matcher matcher = pattern.matcher(event.getFormattedMessage()); + while (matcher.find()) { + String extractedData = matcher.group(1); + if (StrUtil.isBlank(extractedData)) { + continue; + } + logMetaDTO = JsonUtil.parseObject(extractedData, LogMetaDTO.class); + } + } + + } catch (Exception e) { + EasyRetryLog.LOCAL.error("日志解析失败. msg:[{}]", event.getFormattedMessage(), e); + } finally { + MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE); + MDC.getMDCAdapter().remove(LogFieldConstant.LOG_META); + } + + if (Objects.isNull(logMetaDTO)) { + return; + } + + // 保存执行的日志 + JobLogDTO jobLogDTO = new JobLogDTO(); + jobLogDTO.setMessage(event.getFormattedMessage()); + jobLogDTO.setTaskId(logMetaDTO.getTaskId()); + jobLogDTO.setJobId(logMetaDTO.getJobId()); + jobLogDTO.setGroupName(logMetaDTO.getGroupName()); + jobLogDTO.setNamespaceId(logMetaDTO.getNamespaceId()); + jobLogDTO.setTaskBatchId(logMetaDTO.getTaskBatchId()); + jobLogDTO.setRealTime(DateUtils.toNowMilli()); + ActorRef actorRef = ActorGenerator.jobLogActor(); + actorRef.tell(jobLogDTO, actorRef); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobLogActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobLogActor.java index ac9b046ea..f2d19a86f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobLogActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobLogActor.java @@ -2,11 +2,9 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.util.StrUtil; -import com.aizuda.easy.retry.common.core.log.LogContentDTO; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; -import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; -import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; import lombok.extern.slf4j.Slf4j; @@ -16,7 +14,8 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.time.LocalDateTime; -import java.util.*; +import java.util.List; +import java.util.Optional; /** * @author www.byteblogs.com diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index b63844538..5085ac60b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -8,11 +8,13 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.client.RequestInterceptor; import com.aizuda.easy.retry.server.common.dto.CallbackConfig; import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; +import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; @@ -120,15 +122,24 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { JobTask jobTask = generateJobTask(context, jobTaskBatch); // 保存执行的日志 - JobLogDTO jobLogDTO = new JobLogDTO(); - // TODO 等实时日志处理完毕后,再处理 - jobLogDTO.setMessage(context.getLogMessage()); - jobLogDTO.setTaskId(jobTask.getId()); - jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID); - jobLogDTO.setGroupName(context.getGroupName()); - jobLogDTO.setNamespaceId(context.getNamespaceId()); - jobLogDTO.setTaskBatchId(jobTaskBatch.getId()); - ActorRef actorRef = ActorGenerator.jobLogActor(); - actorRef.tell(jobLogDTO, actorRef); +// JobLogDTO jobLogDTO = new JobLogDTO(); +// // TODO 等实时日志处理完毕后,再处理 +// jobLogDTO.setMessage(context.getLogMessage()); +// jobLogDTO.setTaskId(jobTask.getId()); +// jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID); +// jobLogDTO.setGroupName(context.getGroupName()); +// jobLogDTO.setNamespaceId(context.getNamespaceId()); +// jobLogDTO.setTaskBatchId(jobTaskBatch.getId()); +// ActorRef actorRef = ActorGenerator.jobLogActor(); +// actorRef.tell(jobLogDTO, actorRef); + + LogMetaDTO logMetaDTO = new LogMetaDTO(); + logMetaDTO.setNamespaceId(context.getNamespaceId()); + logMetaDTO.setGroupName(context.getGroupName()); + logMetaDTO.setTaskBatchId(context.getTaskBatchId()); + logMetaDTO.setJobId(SystemConstants.CALLBACK_JOB_ID); + logMetaDTO.setTaskId(jobTask.getId()); + + EasyRetryLog.REMOTE.info("回调成功. logMeta:<|>{}<|>", logMetaDTO); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/DistributedLockHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/DistributedLockHandler.java index 1bb1c1daf..0e02d8bf8 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/DistributedLockHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/DistributedLockHandler.java @@ -89,6 +89,13 @@ public class DistributedLockHandler { return lock; } + /** + * TODO 超时处理、自旋处理 + * + * @param lockName + * @param lockAtMost + * @param lockExecutor + */ public void lockAndProcessAfterUnlockDel(String lockName, String lockAtMost, LockExecutor lockExecutor) { LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), Duration.ofMillis(1),