feat: 2.6.0

1. 服务端添加日志解析功能
This commit is contained in:
byteblogs168 2024-01-11 00:12:57 +08:00
parent 75c8309fba
commit 304fab86f8
10 changed files with 137 additions and 77 deletions

View File

@ -319,6 +319,8 @@ CREATE TABLE `job_log_message`
`task_batch_id` bigint(20) NOT NULL COMMENT '任务批次id', `task_batch_id` bigint(20) NOT NULL COMMENT '任务批次id',
`task_id` bigint(20) NOT NULL COMMENT '调度任务id', `task_id` bigint(20) NOT NULL COMMENT '调度任务id',
`message` text NOT NULL COMMENT '调度信息', `message` text NOT NULL COMMENT '调度信息',
`log_num` int(11) NULL DEFAULT NULL COMMENT '日志数量',
`real_time` bigint(13) NULL DEFAULT NULL COMMENT '上报时间',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),

View File

@ -17,21 +17,6 @@ public class LogFieldConstant {
public static final String LEVEL = "level"; public static final String LEVEL = "level";
public static final String LOCATION = "location"; public static final String LOCATION = "location";
public static final String THROWABLE = "throwable"; public static final String THROWABLE = "throwable";
public static final String LOG = "log"; public static final String LOG_META = "log_meta";
public static final String JOB_ID = "jobId";
public static final String JOB_INSTANCE_ID = "job_instance_id";
public static final String JOB_INSTANCE_TASK_ID = "job_instance_task_id";
public static final String JOB_DISPATCH_VERSION = "job_dispatch_version";
public static final String TASK_ID = "job_instance_taskId";
public static final String CIRCLE_ID = "circle_id";
public static final String WORKER_ADDRESS = "worker_address";
public static final String DELAY_TOPIC = "delay_topic";
} }

View File

@ -59,39 +59,4 @@ public class LogContentDTO {
this.addField(LogFieldConstant.THROWABLE, throwable); this.addField(LogFieldConstant.THROWABLE, throwable);
} }
public void addLogField(String log) {
this.addField(LogFieldConstant.LOG, log);
}
public void addJobIdField(Long jobInstanceId) {
this.addField(LogFieldConstant.JOB_ID, String.valueOf(jobInstanceId));
}
public void addJobInstanceIdField(Long jobInstanceId) {
this.addField(LogFieldConstant.JOB_INSTANCE_ID, String.valueOf(jobInstanceId));
}
public void addJobInstanceTaskIdField(Long taskId) {
this.addField(LogFieldConstant.JOB_INSTANCE_TASK_ID, String.valueOf(taskId));
}
public void addJobDispatchVersionTaskIdField(Long version) {
this.addField(LogFieldConstant.JOB_DISPATCH_VERSION, String.valueOf(version));
}
public void addTaskIdField(String taskId) {
this.addField(LogFieldConstant.TASK_ID, taskId);
}
public void addCircleIdField(Long circleId) {
this.addField(LogFieldConstant.CIRCLE_ID, String.valueOf(circleId));
}
public void addDelayTopic(String topic) {
this.addField(LogFieldConstant.DELAY_TOPIC, topic);
}
public void addWorkerAddressField(String workerAddress) {
this.addField(LogFieldConstant.WORKER_ADDRESS, workerAddress);
}
} }

View File

@ -117,9 +117,6 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle
@Override @Override
public void start() { public void start() {
// 删除已经过期的锁记录
distributedLockMapper.delete(new LambdaQueryWrapper<DistributedLock>()
.le(DistributedLock::getLockUntil, LocalDateTime.now().minusSeconds(10)));
} }
@Override @Override

View File

@ -41,5 +41,9 @@ public class JobLogDTO {
*/ */
private String message; private String message;
/**
* 真实上报时间
*/
private Long realTime;
} }

View File

@ -0,0 +1,43 @@
package com.aizuda.easy.retry.server.job.task.dto;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import lombok.Data;
/**
* @author xiaowoniu
* @date 2024-01-10 22:56:33
* @since 2.6.0
*/
@Data
public class LogMetaDTO {
/**
* 命名空间
*/
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 任务信息id
*/
private Long jobId;
/**
* 任务实例id
*/
private Long taskBatchId;
/**
* 调度任务id
*/
private Long taskId;
@Override
public String toString() {
return JsonUtil.toJsonString(taskBatchId);
}
}

View File

@ -1,16 +1,30 @@
package com.aizuda.easy.retry.server.job.task.support.appender; package com.aizuda.easy.retry.server.job.task.support.appender;
import akka.actor.ActorRef;
import ch.qos.logback.classic.spi.IThrowableProxy; import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggingEvent; import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.classic.spi.StackTraceElementProxy; import ch.qos.logback.classic.spi.StackTraceElementProxy;
import ch.qos.logback.classic.spi.ThrowableProxyUtil; import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.UnsynchronizedAppenderBase; import ch.qos.logback.core.UnsynchronizedAppenderBase;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.LogFieldConstant; import com.aizuda.easy.retry.common.core.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.core.log.LogContentDTO; import com.aizuda.easy.retry.common.core.log.LogContentDTO;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import org.slf4j.MDC; import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* @author wodeyangzipingpingwuqi * @author wodeyangzipingpingwuqi
@ -32,20 +46,53 @@ public class EasyRetryServerLogbackAppender<E> extends UnsynchronizedAppenderBas
return; return;
} }
MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE);
// Prepare processing // Prepare processing
((LoggingEvent) eventObject).prepareForDeferredProcessing(); ((LoggingEvent) eventObject).prepareForDeferredProcessing();
LoggingEvent event = (LoggingEvent) eventObject; LoggingEvent event = (LoggingEvent) eventObject;
LogContentDTO logContentDTO = new LogContentDTO(); LogMetaDTO logMetaDTO = null;
logContentDTO.addTimeStamp(event.getTimeStamp()); try {
logContentDTO.addLevelField(event.getLevel().levelStr); // 第一种是MDC
logContentDTO.addThreadField(event.getThreadName()); String logMetaStr = MDC.getMDCAdapter().get(LogFieldConstant.LOG_META);
logContentDTO.addMessageField(event.getFormattedMessage()); if (StrUtil.isNotBlank(logMetaStr)) {
logContentDTO.addLocationField(getLocationField(event)); logMetaDTO = JsonUtil.parseObject(logMetaStr, LogMetaDTO.class);
logContentDTO.addThrowableField(getThrowableField(event)); } else {
// 第二种规则是按照正则匹配
String patternString = "<\\|>(.*?)<\\|>";
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(event.getFormattedMessage());
while (matcher.find()) {
String extractedData = matcher.group(1);
if (StrUtil.isBlank(extractedData)) {
continue;
}
logMetaDTO = JsonUtil.parseObject(extractedData, LogMetaDTO.class);
}
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("日志解析失败. msg:[{}]", event.getFormattedMessage(), e);
} finally {
MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE);
MDC.getMDCAdapter().remove(LogFieldConstant.LOG_META);
}
if (Objects.isNull(logMetaDTO)) {
return;
}
// 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO();
jobLogDTO.setMessage(event.getFormattedMessage());
jobLogDTO.setTaskId(logMetaDTO.getTaskId());
jobLogDTO.setJobId(logMetaDTO.getJobId());
jobLogDTO.setGroupName(logMetaDTO.getGroupName());
jobLogDTO.setNamespaceId(logMetaDTO.getNamespaceId());
jobLogDTO.setTaskBatchId(logMetaDTO.getTaskBatchId());
jobLogDTO.setRealTime(DateUtils.toNowMilli());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
} }

View File

@ -2,11 +2,9 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.log.LogContentDTO;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -16,7 +14,8 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.*; import java.util.List;
import java.util.Optional;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com

View File

@ -8,11 +8,13 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestInterceptor; import com.aizuda.easy.retry.server.common.client.RequestInterceptor;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig; import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum; import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO; import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
@ -120,15 +122,24 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
JobTask jobTask = generateJobTask(context, jobTaskBatch); JobTask jobTask = generateJobTask(context, jobTaskBatch);
// 保存执行的日志 // 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO(); // JobLogDTO jobLogDTO = new JobLogDTO();
// TODO 等实时日志处理完毕后再处理 // // TODO 等实时日志处理完毕后再处理
jobLogDTO.setMessage(context.getLogMessage()); // jobLogDTO.setMessage(context.getLogMessage());
jobLogDTO.setTaskId(jobTask.getId()); // jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID); // jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID);
jobLogDTO.setGroupName(context.getGroupName()); // jobLogDTO.setGroupName(context.getGroupName());
jobLogDTO.setNamespaceId(context.getNamespaceId()); // jobLogDTO.setNamespaceId(context.getNamespaceId());
jobLogDTO.setTaskBatchId(jobTaskBatch.getId()); // jobLogDTO.setTaskBatchId(jobTaskBatch.getId());
ActorRef actorRef = ActorGenerator.jobLogActor(); // ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef); // actorRef.tell(jobLogDTO, actorRef);
LogMetaDTO logMetaDTO = new LogMetaDTO();
logMetaDTO.setNamespaceId(context.getNamespaceId());
logMetaDTO.setGroupName(context.getGroupName());
logMetaDTO.setTaskBatchId(context.getTaskBatchId());
logMetaDTO.setJobId(SystemConstants.CALLBACK_JOB_ID);
logMetaDTO.setTaskId(jobTask.getId());
EasyRetryLog.REMOTE.info("回调成功. logMeta:<|>{}<|>", logMetaDTO);
} }
} }

View File

@ -89,6 +89,13 @@ public class DistributedLockHandler {
return lock; return lock;
} }
/**
* TODO 超时处理自旋处理
*
* @param lockName
* @param lockAtMost
* @param lockExecutor
*/
public void lockAndProcessAfterUnlockDel(String lockName, String lockAtMost, LockExecutor lockExecutor) { public void lockAndProcessAfterUnlockDel(String lockName, String lockAtMost, LockExecutor lockExecutor) {
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.ofMillis(1), Duration.ofMillis(1),