feat: 2.6.0

1.工作流决策和回调添加日志信息
This commit is contained in:
byteblogs168 2024-01-11 11:35:40 +08:00
parent 304fab86f8
commit c811d81fb0
9 changed files with 70 additions and 61 deletions

View File

@ -319,8 +319,8 @@ CREATE TABLE `job_log_message`
`task_batch_id` bigint(20) NOT NULL COMMENT '任务批次id',
`task_id` bigint(20) NOT NULL COMMENT '调度任务id',
`message` text NOT NULL COMMENT '调度信息',
`log_num` int(11) NULL DEFAULT NULL COMMENT '日志数量',
`real_time` bigint(13) NULL DEFAULT NULL COMMENT '上报时间',
`log_num` int(11) NOT NULL DEFAULT 1 COMMENT '日志数量',
`real_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上报时间',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),

View File

@ -6,7 +6,7 @@ package com.aizuda.easy.retry.common.core.constant;
* @since 2.6.0
*/
public class LogFieldConstant {
public static final String MDC_REMOTE = "remote";
public static final String TIME = "time";
@ -17,6 +17,5 @@ public class LogFieldConstant {
public static final String LEVEL = "level";
public static final String LOCATION = "location";
public static final String THROWABLE = "throwable";
public static final String LOG_META = "log_meta";
}

View File

@ -5,7 +5,9 @@ import lombok.Data;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@ -22,6 +24,13 @@ public class LogContentDTO {
this.fieldList = new ArrayList<>();
}
public Map<String, String> toMap() {
return fieldList
.stream()
.filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue));
}
public void addField(String name, String value) {
fieldList.add(new TaskLogFieldDTO(name, value));
}
@ -36,7 +45,8 @@ public class LogContentDTO {
public Long getTimeStamp() {
return Long.parseLong(fieldList.stream().filter(taskLogFieldDTO -> !Objects.isNull(taskLogFieldDTO.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)).get(LogFieldConstant.TIME_STAMP));
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))
.get(LogFieldConstant.TIME_STAMP));
}
public void addLevelField(String level) {

View File

@ -38,6 +38,6 @@ public class LogMetaDTO {
@Override
public String toString() {
return JsonUtil.toJsonString(taskBatchId);
return JsonUtil.toJsonString(this);
}
}

View File

@ -10,6 +10,7 @@ import ch.qos.logback.core.UnsynchronizedAppenderBase;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.core.log.LogContentDTO;
import com.aizuda.easy.retry.common.core.log.TaskLogFieldDTO;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
@ -22,9 +23,11 @@ import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* @author wodeyangzipingpingwuqi
@ -33,49 +36,46 @@ import java.util.regex.Pattern;
*/
public class EasyRetryServerLogbackAppender<E> extends UnsynchronizedAppenderBase<E> {
@Override
public void start() {
super.start();
}
@Override
protected void append(E eventObject) {
// Not job context
if (!(eventObject instanceof LoggingEvent) || Objects.isNull(MDC.getMDCAdapter().get(LogFieldConstant.MDC_REMOTE))) {
if (!(eventObject instanceof LoggingEvent) || Objects.isNull(
MDC.getMDCAdapter().get(LogFieldConstant.MDC_REMOTE))) {
return;
}
MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE);
// Prepare processing
((LoggingEvent) eventObject).prepareForDeferredProcessing();
LoggingEvent event = (LoggingEvent) eventObject;
LogContentDTO logContentDTO = new LogContentDTO();
logContentDTO.addTimeStamp(event.getTimeStamp());
logContentDTO.addLevelField(event.getLevel().levelStr);
logContentDTO.addThreadField(event.getThreadName());
logContentDTO.addLocationField(getLocationField(event));
logContentDTO.addThrowableField(getThrowableField(event));
LogMetaDTO logMetaDTO = null;
try {
// 第一种是MDC
String logMetaStr = MDC.getMDCAdapter().get(LogFieldConstant.LOG_META);
if (StrUtil.isNotBlank(logMetaStr)) {
logMetaDTO = JsonUtil.parseObject(logMetaStr, LogMetaDTO.class);
} else {
// 第二种规则是按照正则匹配
String patternString = "<\\|>(.*?)<\\|>";
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(event.getFormattedMessage());
while (matcher.find()) {
String extractedData = matcher.group(1);
if (StrUtil.isBlank(extractedData)) {
continue;
}
logMetaDTO = JsonUtil.parseObject(extractedData, LogMetaDTO.class);
String patternString = "<\\|>(.*?)<\\|>";
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(event.getFormattedMessage());
while (matcher.find()) {
String extractedData = matcher.group(1);
if (StrUtil.isBlank(extractedData)) {
continue;
}
logMetaDTO = JsonUtil.parseObject(extractedData, LogMetaDTO.class);
String message = event.getFormattedMessage().replaceFirst(patternString, StrUtil.EMPTY);
logContentDTO.addMessageField(message);
break;
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("日志解析失败. msg:[{}]", event.getFormattedMessage(), e);
} finally {
MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE);
MDC.getMDCAdapter().remove(LogFieldConstant.LOG_META);
}
if (Objects.isNull(logMetaDTO)) {
@ -83,17 +83,30 @@ public class EasyRetryServerLogbackAppender<E> extends UnsynchronizedAppenderBas
}
// 保存执行的日志
saveLog(logContentDTO, logMetaDTO);
}
/**
* 保存日志
*
* @param logContentDTO 日志内容
* @param logMetaDTO 日志元数据
*/
private void saveLog(final LogContentDTO logContentDTO, final LogMetaDTO logMetaDTO) {
JobLogDTO jobLogDTO = new JobLogDTO();
jobLogDTO.setMessage(event.getFormattedMessage());
Map<String, String> messageMap = logContentDTO.getFieldList()
.stream()
.filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue));
jobLogDTO.setMessage(JsonUtil.toJsonString(messageMap));
jobLogDTO.setTaskId(logMetaDTO.getTaskId());
jobLogDTO.setJobId(logMetaDTO.getJobId());
jobLogDTO.setGroupName(logMetaDTO.getGroupName());
jobLogDTO.setNamespaceId(logMetaDTO.getNamespaceId());
jobLogDTO.setTaskBatchId(logMetaDTO.getTaskBatchId());
jobLogDTO.setRealTime(DateUtils.toNowMilli());
jobLogDTO.setRealTime(logContentDTO.getTimeStamp());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
}
private String getThrowableField(LoggingEvent event) {

View File

@ -22,6 +22,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@ -121,25 +122,12 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
JobTask jobTask = generateJobTask(context, jobTaskBatch);
// 保存执行的日志
// JobLogDTO jobLogDTO = new JobLogDTO();
// // TODO 等实时日志处理完毕后再处理
// jobLogDTO.setMessage(context.getLogMessage());
// jobLogDTO.setTaskId(jobTask.getId());
// jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID);
// jobLogDTO.setGroupName(context.getGroupName());
// jobLogDTO.setNamespaceId(context.getNamespaceId());
// jobLogDTO.setTaskBatchId(jobTaskBatch.getId());
// ActorRef actorRef = ActorGenerator.jobLogActor();
// actorRef.tell(jobLogDTO, actorRef);
LogMetaDTO logMetaDTO = new LogMetaDTO();
logMetaDTO.setNamespaceId(context.getNamespaceId());
logMetaDTO.setGroupName(context.getGroupName());
logMetaDTO.setTaskBatchId(context.getTaskBatchId());
logMetaDTO.setTaskBatchId(jobTaskBatch.getId());
logMetaDTO.setJobId(SystemConstants.CALLBACK_JOB_ID);
logMetaDTO.setTaskId(jobTask.getId());
EasyRetryLog.REMOTE.info("回调成功. logMeta:<|>{}<|>", logMetaDTO);
EasyRetryLog.REMOTE.info("workflowNodeId:[{}] 回调成功. <|>{}<|>", context.getWorkflowNodeId(), logMetaDTO);
}
}

View File

@ -8,12 +8,14 @@ import com.aizuda.easy.retry.common.core.enums.*;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.ExpressionFactory;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum;
import com.aizuda.easy.retry.server.common.enums.LogicalConditionEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -132,16 +134,12 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
JobTask jobTask = generateJobTask(context, jobTaskBatch);
// 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO();
// TODO 等实时日志处理完毕后再处理
jobLogDTO.setMessage(context.getLogMessage());
jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.DECISION_JOB_ID);
jobLogDTO.setGroupName(context.getGroupName());
jobLogDTO.setNamespaceId(context.getNamespaceId());
jobLogDTO.setTaskBatchId(jobTaskBatch.getId());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
LogMetaDTO logMetaDTO = new LogMetaDTO();
logMetaDTO.setNamespaceId(context.getNamespaceId());
logMetaDTO.setGroupName(context.getGroupName());
logMetaDTO.setTaskBatchId(jobTaskBatch.getId());
logMetaDTO.setJobId(SystemConstants.DECISION_JOB_ID);
logMetaDTO.setTaskId(jobTask.getId());
EasyRetryLog.REMOTE.info("workflowNodeId:[{}]决策完成. <|>{}<|>", context.getWorkflowNodeId(), logMetaDTO);
}
}

View File

@ -31,7 +31,7 @@ public class WorkflowBatchController {
return workflowBatchService.listPage(queryVO);
}
@LoginRequired
// @LoginRequired
@GetMapping("{id}")
public WorkflowDetailResponseVO getWorkflowBatchDetail(@PathVariable("id") Long id) {
return workflowBatchService.getWorkflowBatchDetail(id);

View File

@ -36,6 +36,7 @@ import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.MDC;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;