fix: 3.2.0

重试模块接入实时日志
This commit is contained in:
byteblogs168 2024-03-21 18:34:29 +08:00
parent 9465234369
commit 9cd2d0f62e
47 changed files with 941 additions and 327 deletions

View File

@ -158,6 +158,8 @@ CREATE TABLE `retry_task_log_message`
`unique_id` varchar(64) NOT NULL COMMENT '同组下id唯一',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`message` text NOT NULL COMMENT '异常信息',
`log_num` int(11) NOT NULL DEFAULT 1 COMMENT '日志数量',
`real_time` bigint(13) NOT NULL DEFAULT 0 COMMENT '上报时间',
`client_info` varchar(128) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `unique_id`),

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.client.common.appender;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.report.LogReportFactory;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
@ -19,6 +20,7 @@ import org.apache.logging.log4j.core.util.Throwables;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
/**
* @author wodeyangzipingpingwuqi
@ -46,7 +48,7 @@ public class EasyRetryLog4j2Appender extends AbstractAppender {
logContentDTO.addMessageField(event.getMessage().getFormattedMessage());
// slidingWindow syncReportLog
SpringContext.getBeanByType(AsyncReportLog.class).syncReportLog(logContentDTO);
Optional.ofNullable(LogReportFactory.get()).ifPresent(logReport -> logReport.report(logContentDTO));
}
protected EasyRetryLog4j2Appender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions) {

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.client.common.appender;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.report.LogReportFactory;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
@ -11,6 +12,7 @@ import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;
import java.util.Objects;
import java.util.Optional;
/**
* @author wodeyangzipingpingwuqi
@ -42,7 +44,7 @@ public class EasyRetryLog4jAppender extends AppenderSkeleton {
logContentDTO.addThrowableField(getThrowableField(event));
// slidingWindow syncReportLog
SpringContext.getBeanByType(AsyncReportLog.class).syncReportLog(logContentDTO);
Optional.ofNullable(LogReportFactory.get()).ifPresent(logReport -> logReport.report(logContentDTO));
}
@Override

View File

@ -7,6 +7,7 @@ import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.report.LogReportFactory;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
@ -14,6 +15,7 @@ import com.aizuda.easy.retry.common.core.context.SpringContext;
import org.slf4j.MDC;
import java.util.Objects;
import java.util.Optional;
/**
* @author wodeyangzipingpingwuqi
@ -52,7 +54,7 @@ public class EasyRetryLogbackAppender<E> extends UnsynchronizedAppenderBase<E> {
logContentDTO.addThrowableField(getThrowableField(event));
// slidingWindow syncReportLog
SpringContext.getBeanByType(AsyncReportLog.class).syncReportLog(logContentDTO);
Optional.ofNullable(LogReportFactory.get()).ifPresent(logReport -> logReport.report(logContentDTO));
}
private String getThrowableField(LoggingEvent event) {

View File

@ -3,9 +3,11 @@ package com.aizuda.easy.retry.client.common.report;
import com.aizuda.easy.retry.client.common.Lifecycle;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.common.window.SlidingWindow;
import com.aizuda.easy.retry.common.core.window.Listener;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Objects;
@ -15,29 +17,34 @@ import java.util.Objects;
* @date 2024-03-20 22:56:53
* @since 3.2.0
*/
public abstract class AbstractLogReport implements Lifecycle, LogReport {
public abstract class AbstractLogReport<T extends LogTaskDTO> implements Lifecycle, InitializingBean, LogReport {
@Autowired
private EasyRetryProperties easyRetryProperties;
private SlidingWindow<? super LogTaskDTO> slidingWindow;
private SlidingWindow<LogTaskDTO> slidingWindow;
@Override
public void report(LogContentDTO logContentDTO) {
slidingWindow.add(buildLogTaskDTO(logContentDTO));
}
protected abstract LogTaskDTO buildLogTaskDTO(LogContentDTO logContentDTO);
protected abstract T buildLogTaskDTO(LogContentDTO logContentDTO);
@Override
public void start() {
if (Objects.nonNull(slidingWindow)) {
return;
}
EasyRetryProperties.LogSlidingWindowConfig logSlidingWindow = easyRetryProperties.getLogSlidingWindow();
Listener<LogTaskDTO> reportLogListener = new ReportLogListener();
slidingWindow = SlidingWindow
.Builder
.<LogTaskDTO>newBuilder()
.withTotalThreshold(logSlidingWindow.getTotalThreshold())
.withWindowTotalThreshold(logSlidingWindow.getWindowTotalThreshold())
.withDuration(logSlidingWindow.getDuration(), logSlidingWindow.getChronoUnit())
.withListener(new ReportLogListener())
.withListener(reportLogListener)
.build();
slidingWindow.start();
@ -45,10 +52,17 @@ public abstract class AbstractLogReport implements Lifecycle, LogReport {
@Override
public void close() {
EasyRetryLog.LOCAL.info("AsyncReport Log about to shutdown");
if (Objects.nonNull(slidingWindow)) {
slidingWindow.end();
if (Objects.isNull(slidingWindow)) {
return;
}
EasyRetryLog.LOCAL.info("AsyncReport Log about to shutdown");
slidingWindow.end();
EasyRetryLog.LOCAL.info("AsyncReport Log has been shutdown");
}
@Override
public void afterPropertiesSet() throws Exception {
LogReportFactory.add(this);
}
}

View File

@ -1,11 +1,9 @@
package com.aizuda.easy.retry.client.common.report;
import com.aizuda.easy.retry.client.common.Lifecycle;
import com.aizuda.easy.retry.client.common.Report;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.client.common.window.SlidingWindow;
import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
@ -22,8 +20,9 @@ import java.util.Objects;
* @date 2023-12-27
* @since 2.6.0
*/
@Component
//@Component
@Slf4j
@Deprecated
public class AsyncReportLog implements Lifecycle {
@Autowired
@ -36,8 +35,8 @@ public class AsyncReportLog implements Lifecycle {
*/
public Boolean syncReportLog(LogContentDTO logContent) {
LogTaskDTO logTaskDTO = buildLogTaskDTO(logContent);
slidingWindow.add(logTaskDTO);
// LogTaskDTO logTaskDTO = buildLogTaskDTO(logContent);
// slidingWindow.add(logTaskDTO);
return Boolean.TRUE;
}
@ -67,22 +66,22 @@ public class AsyncReportLog implements Lifecycle {
EasyRetryLog.LOCAL.info("AsyncReport Log has been shutdown");
}
/**
* 构建上报任务对象
*
* @return logContent 上报服务端对象
*/
protected LogTaskDTO buildLogTaskDTO(LogContentDTO logContentDTO) {
JobContext context = ThreadLocalLogUtil.getContext();
LogTaskDTO logTaskDTO = new LogTaskDTO();
logTaskDTO.setJobId(context.getJobId());
logTaskDTO.setTaskId(context.getTaskId());
logTaskDTO.setTaskBatchId(context.getTaskBatchId());
logTaskDTO.setRealTime(logContentDTO.getTimeStamp());
logTaskDTO.setNamespaceId(context.getNamespaceId());
logTaskDTO.setGroupName(context.getGroupName());
logTaskDTO.setFieldList(logContentDTO.getFieldList());
return logTaskDTO;
}
// /**
// * 构建上报任务对象
// *
// * @return logContent 上报服务端对象
// */
// protected LogTaskDTO buildLogTaskDTO(LogContentDTO logContentDTO) {
// LogMeta context = ThreadLocalLogUtil.getContext();
//
// LogTaskDTO logTaskDTO = new LogTaskDTO();
// logTaskDTO.setJobId(context.getJobId());
// logTaskDTO.setTaskId(context.getTaskId());
// logTaskDTO.setTaskBatchId(context.getTaskBatchId());
// logTaskDTO.setRealTime(logContentDTO.getTimeStamp());
// logTaskDTO.setNamespaceId(context.getNamespaceId());
// logTaskDTO.setGroupName(context.getGroupName());
// logTaskDTO.setFieldList(logContentDTO.getFieldList());
// return logTaskDTO;
// }
}

View File

@ -0,0 +1,21 @@
package com.aizuda.easy.retry.client.common.report;
import lombok.Data;
/**
* @author: xiaowoniu
* @date : 2024-03-21
* @since : 3.2.0
*/
@Data
public class LogMeta {
/**
* 命名空间
*/
private String namespaceId;
/**
* 组名称
*/
private String groupName;
}

View File

@ -9,5 +9,7 @@ import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
*/
public interface LogReport {
boolean supports();
void report(LogContentDTO logContentDTO);
}

View File

@ -0,0 +1,31 @@
package com.aizuda.easy.retry.client.common.report;
import com.google.common.collect.Lists;
import java.util.List;
/**
* @author: xiaowoniu
* @date : 2024-03-21
* @since : 3.2.0
*/
public final class LogReportFactory {
private static final List<LogReport> reports = Lists.newArrayList();
static void add(LogReport logReport) {
reports.add(logReport);
}
public static LogReport get() {
for (final LogReport report : reports) {
if (report.supports()) {
return report;
}
}
return null;
}
}

View File

@ -1,7 +1,8 @@
package com.aizuda.easy.retry.client.common.util;
import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.client.common.report.LogMeta;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
/**
* @author wodeyangzipingpingwuqi·
@ -9,18 +10,42 @@ import com.aizuda.easy.retry.common.core.model.JobContext;
* @since 1.0.0
*/
public class ThreadLocalLogUtil {
private static final ThreadLocal<LogTypeEnum> LOG_TYPE = new ThreadLocal<>();
private static final ThreadLocal<LogMeta> JOB_CONTEXT_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<JobContext> JOB_CONTEXT_LOCAL = new ThreadLocal<>();
public static void setContext(JobContext jobContext) {
JOB_CONTEXT_LOCAL.set(jobContext);
public static void initLogInfo(LogMeta logMeta, LogTypeEnum logType) {
setContext(logMeta);
setLogType(logType);
}
public static JobContext getContext() {
public static void setContext(LogMeta logMeta) {
JOB_CONTEXT_LOCAL.set(logMeta);
}
public static LogMeta getContext() {
return JOB_CONTEXT_LOCAL.get();
}
public static void removeContext() {
JOB_CONTEXT_LOCAL.remove();
}
public static void removeAll() {
JOB_CONTEXT_LOCAL.remove();
LOG_TYPE.remove();
}
public static void setLogType (LogTypeEnum logType) {
LOG_TYPE.set(logType);
}
public static LogTypeEnum getLogType () {
return LOG_TYPE.get();
}
public static void removeLogType () {
LOG_TYPE.remove();
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.client.core.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.client.core.IdempotentIdGenerate;
import com.aizuda.easy.retry.client.core.RetryArgSerializer;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
@ -9,6 +10,7 @@ import com.aizuda.easy.retry.client.core.callback.RetryCompleteCallback;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot;
import com.aizuda.easy.retry.client.core.loader.EasyRetrySpiLoader;
import com.aizuda.easy.retry.client.core.log.RetryLogMeta;
import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;
import com.aizuda.easy.retry.client.core.retryer.RetryerResultContext;
import com.aizuda.easy.retry.client.core.serializer.JacksonSerializer;
@ -24,6 +26,7 @@ import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.core.model.IdempotentIdContext;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import com.aizuda.easy.retry.server.model.dto.ConfigDTO;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
@ -64,7 +67,8 @@ public class RetryEndPoint {
RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName());
if (Objects.isNull(retryerInfo)) {
EasyRetryLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", executeReqDto.getScene());
throw new EasyRetryClientException("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", executeReqDto.getScene());
throw new EasyRetryClientException("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在",
executeReqDto.getScene());
}
RetryArgSerializer retryArgSerializer = EasyRetrySpiLoader.loadRetryArgSerializer();
@ -83,6 +87,13 @@ public class RetryEndPoint {
try {
RetrySiteSnapshot.setAttemptNumber(executeReqDto.getRetryCount());
// 初始化实时日志上下文
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(executeReqDto.getGroupName());
retryLogMeta.setNamespaceId(executeReqDto.getNamespaceId());
retryLogMeta.setUniqueId(executeReqDto.getUniqueId());
ThreadLocalLogUtil.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(),
executeReqDto.getExecutorName(), deSerialize);
@ -101,15 +112,20 @@ public class RetryEndPoint {
}
if (Objects.equals(RetryResultStatusEnum.SUCCESS.getStatus(), executeRespDto.getStatusCode())) {
EasyRetryLog.REMOTE.info("remote retry complete. count:[{}] result:[{}]", executeReqDto.getRetryCount(), executeRespDto.getResultJson());
} if (Objects.equals(RetryResultStatusEnum.STOP.getStatus(), executeRespDto.getStatusCode())) {
EasyRetryLog.REMOTE.info("remote retry complete. count:[{}] exceptionMsg:[{}]", executeReqDto.getRetryCount(), executeRespDto.getExceptionMsg());
EasyRetryLog.REMOTE.info("remote retry complete. count:[{}] result:[{}]", executeReqDto.getRetryCount(),
executeRespDto.getResultJson());
}
if (Objects.equals(RetryResultStatusEnum.STOP.getStatus(), executeRespDto.getStatusCode())) {
EasyRetryLog.REMOTE.warn("remote retry complete. count:[{}] exceptionMsg:[{}]",
executeReqDto.getRetryCount(), executeRespDto.getExceptionMsg());
} else {
EasyRetryLog.REMOTE.info("remote retry complete. count:[{}] ", executeReqDto.getRetryCount(), retryerResultContext.getThrowable());
EasyRetryLog.REMOTE.error("remote retry complete. count:[{}] ", executeReqDto.getRetryCount(),
retryerResultContext.getThrowable());
}
} finally {
RetrySiteSnapshot.removeAll();
ThreadLocalLogUtil.removeAll();
}
return new Result<>(executeRespDto);
@ -126,27 +142,39 @@ public class RetryEndPoint {
@PostMapping("/callback/v1")
public Result callback(@RequestBody @Validated RetryCallbackDTO callbackDTO) {
RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName());
if (Objects.isNull(retryerInfo)) {
throw new EasyRetryClientException("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getScene());
}
RetryArgSerializer retryArgSerializer = EasyRetrySpiLoader.loadRetryArgSerializer();
Object[] deSerialize;
RetryerInfo retryerInfo = null;
Object[] deSerialize = null;
try {
// 初始化实时日志上下文
RetryLogMeta retryLogMeta = new RetryLogMeta();
retryLogMeta.setGroupName(callbackDTO.getGroup());
retryLogMeta.setNamespaceId(callbackDTO.getNamespaceId());
retryLogMeta.setUniqueId(callbackDTO.getUniqueId());
ThreadLocalLogUtil.initLogInfo(retryLogMeta, LogTypeEnum.RETRY);
retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName());
if (Objects.isNull(retryerInfo)) {
EasyRetryLog.REMOTE.error("场景:[{}]配置不存在, 请检查您的场景和执行器是否存在", callbackDTO.getScene());
return new Result(0, "回调失败");
}
RetryArgSerializer retryArgSerializer = EasyRetrySpiLoader.loadRetryArgSerializer();
deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(),
retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
} catch (JsonProcessingException e) {
throw new EasyRetryClientException("参数解析异常", e);
}
try {
// 以Spring Bean模式回调
return doCallbackForSpringBean(callbackDTO, retryerInfo, deSerialize);
} catch (JsonProcessingException e) {
EasyRetryLog.REMOTE.error("参数解析异常", e);
return new Result(0, "回调失败");
} catch (NoSuchBeanDefinitionException e) {
// 若不是SpringBean 则直接反射以普通类调用
return doCallbackForOrdinaryClass(callbackDTO, retryerInfo, deSerialize);
} finally {
ThreadLocalLogUtil.removeAll();
}
}
@ -196,7 +224,8 @@ public class RetryEndPoint {
* @param deSerialize 参数信息
* @return Result
*/
private Result doCallbackForSpringBean(RetryCallbackDTO callbackDTO, RetryerInfo retryerInfo, Object[] deSerialize) {
private Result doCallbackForSpringBean(RetryCallbackDTO callbackDTO, RetryerInfo retryerInfo,
Object[] deSerialize) {
Class<? extends RetryCompleteCallback> retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback();
RetryCompleteCallback retryCompleteCallback = SpringContext.getBeanByType(retryCompleteCallbackClazz);

View File

@ -27,11 +27,11 @@ public abstract class AbstractRetryExecutor<BR, SR> implements RetryExecutor<BR,
Class<? extends ExecutorMethod> retryMethodClass = retryerInfo.getExecutorMethod();
if (retryMethodClass.isAssignableFrom(ExecutorAnnotationMethod.class)) {
EasyRetryLog.LOCAL.info("执行注解重试方法:{},参数为:{}", retryMethodClass.getName(), JsonUtil.toJsonString(params));
EasyRetryLog.LOCAL.debug("执行注解重试方法:{},参数为:{}", retryMethodClass.getName(), JsonUtil.toJsonString(params));
ExecutorAnnotationMethod retryAnnotationMethod = new ExecutorAnnotationMethod(retryerInfo);
return retryAnnotationMethod.doExecute(params);
} else {
EasyRetryLog.LOCAL.info("执行自定义重试方法:{},参数为:{}", retryMethodClass.getName(), JsonUtil.toJsonString(params));
EasyRetryLog.LOCAL.debug("执行自定义重试方法:{},参数为:{}", retryMethodClass.getName(), JsonUtil.toJsonString(params));
ExecutorMethod executorMethod = SpringContext.getBeanByType(retryMethodClass);
return executorMethod.doExecute(params);
}

View File

@ -57,7 +57,7 @@ public class GuavaRetryExecutor extends AbstractRetryExecutor<WaitStrategy, Stop
retrySuccess.accept(result);
} catch (RetryException e){
// 重试完成仍然失败
EasyRetryLog.LOCAL.error("业务系统重试异常:",e.getLastFailedAttempt().getExceptionCause());
EasyRetryLog.LOCAL.debug("业务系统重试异常:",e.getLastFailedAttempt().getExceptionCause());
retryError.accept(e.getLastFailedAttempt().getExceptionCause());
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.client.core.log;
import com.aizuda.easy.retry.client.common.report.LogMeta;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author: xiaowoniu
* @date : 2024-03-21
* @since : 3.2.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RetryLogMeta extends LogMeta {
private String uniqueId;
}

View File

@ -2,9 +2,9 @@ package com.aizuda.easy.retry.client.core.log;
import com.aizuda.easy.retry.client.common.report.AbstractLogReport;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import com.aizuda.easy.retry.server.model.dto.RetryLogTaskDTO;
import org.springframework.stereotype.Component;
/**
@ -12,20 +12,23 @@ import org.springframework.stereotype.Component;
* @date 2024-03-20 23:25:24
* @since 3.2.0
*/
//@Component
public class RetryLogReport extends AbstractLogReport {
@Component
public class RetryLogReport extends AbstractLogReport<RetryLogTaskDTO> {
@Override
protected LogTaskDTO buildLogTaskDTO(LogContentDTO logContentDTO) {
// JobContext context = ThreadLocalLogUtil.getContext();
LogTaskDTO logTaskDTO = new LogTaskDTO();
// logTaskDTO.setJobId(context.getJobId());
// logTaskDTO.setTaskId(context.getTaskId());
// logTaskDTO.setTaskBatchId(context.getTaskBatchId());
// logTaskDTO.setRealTime(logContentDTO.getTimeStamp());
// logTaskDTO.setNamespaceId(context.getNamespaceId());
// logTaskDTO.setGroupName(context.getGroupName());
protected RetryLogTaskDTO buildLogTaskDTO(LogContentDTO logContentDTO) {
RetryLogMeta context = (RetryLogMeta) ThreadLocalLogUtil.getContext();
RetryLogTaskDTO logTaskDTO = new RetryLogTaskDTO();
logTaskDTO.setLogType(LogTypeEnum.RETRY.name());
logTaskDTO.setUniqueId(context.getUniqueId());
logTaskDTO.setRealTime(logContentDTO.getTimeStamp());
logTaskDTO.setNamespaceId(context.getNamespaceId());
logTaskDTO.setGroupName(context.getGroupName());
logTaskDTO.setFieldList(logContentDTO.getFieldList());
return logTaskDTO;
}
@Override
public boolean supports() {
return LogTypeEnum.RETRY == ThreadLocalLogUtil.getLogType();
}
}

View File

@ -108,9 +108,7 @@ public class RemoteRetryStrategies extends AbstractRetryStrategies {
return Collections.singletonList(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = Objects.requireNonNull(attributes).getRequest();
Integer attemptNumber = (Integer) request.getAttribute("attemptNumber");
Integer attemptNumber = RetrySiteSnapshot.getAttemptNumber();
if (attempt.hasResult()) {
EasyRetryLog.LOCAL.info("easy-retry 远程重试成功,第[{}]次调度", attemptNumber);
}

View File

@ -7,12 +7,14 @@ import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
import com.aizuda.easy.retry.client.job.core.executor.AbstractJobExecutor;
import com.aizuda.easy.retry.client.job.core.executor.AnnotationJobExecutor;
import com.aizuda.easy.retry.client.job.core.log.JobLogMeta;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
@ -39,7 +41,7 @@ public class JobEndPoint {
JobContext jobContext = buildJobContext(dispatchJob);
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
initLogContext(jobContext);
if (Objects.nonNull(dispatchJob.getRetryCount()) && dispatchJob.getRetryCount() > 0) {
EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}]",
@ -75,6 +77,17 @@ public class JobEndPoint {
return new Result<>(Boolean.TRUE);
}
private void initLogContext(JobContext jobContext) {
JobLogMeta logMeta = new JobLogMeta();
logMeta.setNamespaceId(jobContext.getNamespaceId());
logMeta.setTaskId(jobContext.getTaskId());
logMeta.setGroupName(jobContext.getGroupName());
logMeta.setJobId(jobContext.getJobId());
logMeta.setTaskBatchId(jobContext.getTaskBatchId());
ThreadLocalLogUtil.initLogInfo(logMeta, LogTypeEnum.JOB);
}
private static JobContext buildJobContext(DispatchJobRequest dispatchJob) {
JobContext jobContext = new JobContext();
jobContext.setJobId(dispatchJob.getJobId());

View File

@ -6,11 +6,13 @@ import com.aizuda.easy.retry.client.job.core.cache.FutureCache;
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
import com.aizuda.easy.retry.client.job.core.dto.JobArgs;
import com.aizuda.easy.retry.client.job.core.dto.ShardingJobArgs;
import com.aizuda.easy.retry.client.job.core.log.JobLogMeta;
import com.aizuda.easy.retry.client.job.core.timer.StopTaskTimerTask;
import com.aizuda.easy.retry.client.job.core.timer.TimerManager;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -51,7 +53,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
try {
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
initLogContext(jobContext);
return doJobExecute(jobArgs);
} finally {
ThreadLocalLogUtil.removeContext();
@ -63,6 +65,16 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator);
}
private void initLogContext(JobContext jobContext) {
JobLogMeta logMeta = new JobLogMeta();
logMeta.setNamespaceId(jobContext.getNamespaceId());
logMeta.setTaskId(jobContext.getTaskId());
logMeta.setGroupName(jobContext.getGroupName());
logMeta.setJobId(jobContext.getJobId());
logMeta.setTaskBatchId(jobContext.getTaskBatchId());
ThreadLocalLogUtil.initLogInfo(logMeta, LogTypeEnum.JOB);
}
private static JobArgs buildJobArgs(JobContext jobContext) {
JobArgs jobArgs = new JobArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());

View File

@ -4,6 +4,7 @@ import com.aizuda.easy.retry.client.common.proxy.RequestBuilder;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
import com.aizuda.easy.retry.client.job.core.client.JobNettyClient;
import com.aizuda.easy.retry.client.job.core.log.JobLogMeta;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
@ -13,7 +14,7 @@ import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import com.google.common.util.concurrent.FutureCallback;
import lombok.extern.slf4j.Slf4j;
@ -43,7 +44,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
try {
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
initLogContext();
// 上报执行成功
EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(),
@ -74,7 +75,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
try {
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
initLogContext();
// 上报执行失败
EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
@ -97,6 +98,16 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
}
}
private void initLogContext() {
JobLogMeta logMeta = new JobLogMeta();
logMeta.setNamespaceId(jobContext.getNamespaceId());
logMeta.setTaskId(jobContext.getTaskId());
logMeta.setGroupName(jobContext.getGroupName());
logMeta.setJobId(jobContext.getJobId());
logMeta.setTaskBatchId(jobContext.getTaskBatchId());
ThreadLocalLogUtil.initLogInfo(logMeta, LogTypeEnum.JOB);
}
private void stopThreadPool() {
if (jobContext.getTaskType() == JobTaskTypeEnum.CLUSTER.getType()) {
ThreadPoolCache.stopThreadPool(jobContext.getTaskBatchId());

View File

@ -0,0 +1,30 @@
package com.aizuda.easy.retry.client.job.core.log;
import com.aizuda.easy.retry.client.common.report.LogMeta;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author: xiaowoniu
* @date : 2024-03-21
* @since : 3.2.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class JobLogMeta extends LogMeta {
/**
* 任务信息id
*/
private Long jobId;
/**
* 任务实例id
*/
private Long taskBatchId;
/**
* 调度任务id
*/
private Long taskId;
}

View File

@ -2,9 +2,9 @@ package com.aizuda.easy.retry.client.job.core.log;
import com.aizuda.easy.retry.client.common.report.AbstractLogReport;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import com.aizuda.easy.retry.server.model.dto.JobLogTaskDTO;
import org.springframework.stereotype.Component;
/**
@ -12,14 +12,20 @@ import org.springframework.stereotype.Component;
* @date 2024-03-20 23:25:24
* @since 3.2.0
*/
//@Component
public class JobLogReport extends AbstractLogReport {
@Override
protected LogTaskDTO buildLogTaskDTO(LogContentDTO logContentDTO) {
JobContext context = ThreadLocalLogUtil.getContext();
@Component
public class JobLogReport extends AbstractLogReport<JobLogTaskDTO> {
LogTaskDTO logTaskDTO = new LogTaskDTO();
@Override
public boolean supports () {
return LogTypeEnum.JOB == ThreadLocalLogUtil.getLogType();
}
@Override
protected JobLogTaskDTO buildLogTaskDTO(LogContentDTO logContentDTO) {
JobLogMeta context = (JobLogMeta) ThreadLocalLogUtil.getContext();
JobLogTaskDTO logTaskDTO = new JobLogTaskDTO();
logTaskDTO.setJobId(context.getJobId());
logTaskDTO.setLogType(LogTypeEnum.JOB.name());
logTaskDTO.setTaskId(context.getTaskId());
logTaskDTO.setTaskBatchId(context.getTaskBatchId());
logTaskDTO.setRealTime(logContentDTO.getTimeStamp());

View File

@ -13,6 +13,10 @@ import lombok.Data;
*/
@Data
public class DispatchRetryDTO {
@NotBlank(message = "namespaceId 不能为空")
private String namespaceId;
@NotBlank(message = "group 不能为空")
private String groupName;
@NotBlank(message = "scene 不能为空")
private String scene;
@NotBlank(message = "参数 不能为空")

View File

@ -27,4 +27,6 @@ public class RetryCallbackDTO {
private Integer retryStatus;
@NotBlank(message = "uniqueId 不能为空")
private String uniqueId;
@NotBlank(message = "namespaceId 不能为空")
private String namespaceId;
}

View File

@ -0,0 +1,12 @@
package com.aizuda.easy.retry.common.log.enums;
/**
* @author: xiaowoniu
* @date : 2024-03-21
* @since : 3.2.0
*/
public enum LogTypeEnum {
RETRY,
JOB
}

View File

@ -16,6 +16,11 @@ import java.util.List;
@Data
public class LogTaskDTO implements Serializable {
/**
* 日志类型
*/
private String logType;
/**
* 命名空间
*/
@ -44,15 +49,15 @@ public class LogTaskDTO implements Serializable {
@Deprecated
private Long taskId;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 调度信息
*/
private String message;
// /**
// * 创建时间
// */
// private LocalDateTime createDt;
//
// /**
// * 调度信息
// */
// private String message;
/**
* 上报时间

View File

@ -12,4 +12,7 @@ import lombok.EqualsAndHashCode;
@Data
public class RetryLogTaskDTO extends LogTaskDTO {
private String uniqueId;
private String clientInfo;
}

View File

@ -1,9 +1,12 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* <p>
* 重试日志异常信息记录表 Mapper 接口
@ -15,4 +18,7 @@ import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface RetryTaskLogMessageMapper extends BaseMapper<RetryTaskLogMessage> {
int batchInsert(List<RetryTaskLogMessage> list);
int batchUpdate(List<RetryTaskLogMessage> list);
}

View File

@ -59,4 +59,15 @@ public class RetryTaskLogMessage implements Serializable {
* 异常信息
*/
private String message;
/**
* 日志数量
*/
private Integer logNum;
/**
* 真实上报时间
*/
private Long realTime;
}

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="unique_id" property="uniqueId" />
<result column="create_dt" property="createDt" />
<result column="log_num" property="logNum"/>
<result column="message" property="message"/>
<result column="real_time" property="realTime"/>
<result column="client_info" property="clientInfo"/>
</resultMap>
<!-- 定义批量新增的 SQL 映射 -->
<insert id="batchInsert" parameterType="java.util.List">
INSERT INTO retry_task_log_message (namespace_id, group_name, unique_id, log_num, message,
create_dt, real_time, client_info)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.namespaceId},
#{item.groupName},
#{item.uniqueId},
#{item.logNum},
#{item.message},
#{item.createDt},
#{item.realTime},
#{item.clientInfo}
)
</foreach>
</insert>
<update id="batchUpdate" parameterType="java.util.List">
UPDATE retry_task_log_message jlm,
(
<foreach collection="list" item="item" index="index" separator=" UNION ALL ">
SELECT
#{item.message} AS message,
#{item.logNum} AS log_num,
#{item.id} AS id
</foreach>
) tt
SET
jlm.message = tt.message, jlm.log_num = tt.log_num
WHERE jlm.id = tt.id
</update>
</mapper>

View File

@ -31,7 +31,7 @@ public class ActorGenerator {
public static final String NO_RETRY_ACTOR = "NoRetryActor";
public static final String EXEC_CALLBACK_UNIT_ACTOR = "ExecCallbackUnitActor";
public static final String EXEC_UNIT_ACTOR = "ExecUnitActor";
public static final String LOG_ACTOR = "LogActor";
public static final String LOG_ACTOR = "RetryLogActor";
private static final String RETRY_TASK_EXECUTOR_DISPATCHER = "akka.actor.retry-task-executor-dispatcher";
private static final String RETRY_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.retry-task-executor-result-dispatcher";

View File

@ -2,9 +2,13 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.dto.TaskLogFieldDTO;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.model.dto.JobLogTaskDTO;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import lombok.extern.slf4j.Slf4j;
@ -12,10 +16,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @author www.byteblogs.com
@ -33,24 +42,47 @@ public class JobLogActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(List.class, (list -> {
try {
jobLogMessageMapper.batchInsert(list);
} catch (Exception e) {
log.error("保存客户端日志异常.", e);
} finally {
getContext().stop(getSelf());
.match(List.class, (list -> {
try {
if (CollectionUtils.isEmpty(list)) {
return;
}
}))
.match(JobLogDTO.class, (jobLogDTO -> {
try {
saveLogMessage(jobLogDTO);
} catch (Exception e) {
log.error("保存日志异常.", e);
} finally {
getContext().stop(getSelf());
List<JobLogTaskDTO> jobLogTasks = (List<JobLogTaskDTO>) list;
Map<Long, List<JobLogTaskDTO>> logTaskDTOMap = jobLogTasks.
stream().collect(Collectors.groupingBy(JobLogTaskDTO::getTaskId, Collectors.toList()));
List<JobLogMessage> jobLogMessageList = new ArrayList<>();
for (List<JobLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0));
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(logTaskDTOList.size());
List<Map<String, String>> messageMapList = logTaskDTOList.stream()
.map(taskDTO -> taskDTO.getFieldList()
.stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)))
.collect(Collectors.toList());
jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList));
jobLogMessageList.add(jobLogMessage);
}
})).build();
jobLogMessageMapper.batchInsert(jobLogMessageList);
} catch (Exception e) {
log.error("保存客户端日志异常.", e);
} finally {
getContext().stop(getSelf());
}
}))
.match(JobLogDTO.class, (jobLogDTO -> {
try {
saveLogMessage(jobLogDTO);
} catch (Exception e) {
log.error("保存日志异常.", e);
} finally {
getContext().stop(getSelf());
}
})).build();
}

View File

@ -1,81 +0,0 @@
package com.aizuda.easy.retry.server.job.task.support.request;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.log.dto.TaskLogFieldDTO;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.PostHttpRequestHandler;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.BATCH_LOG_REPORT;
/**
* 处理日志上报数据
*
* @author: wodeyangzipingpingwuqi
* @date : 2023-12-26
* @since 1.0.0
*/
@Component
public class ReportLogHttpRequestHandler extends PostHttpRequestHandler {
@Override
public boolean supports(String path) {
return BATCH_LOG_REPORT.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
EasyRetryLog.LOCAL.info("Begin Handler Log Report Data. [{}]", content);
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
Object[] args = retryRequest.getArgs();
Assert.notEmpty(args, () -> new EasyRetryServerException("日志上报的数据不能为空. ReqId:[{}]", retryRequest.getReqId()));
Map<Long, List<LogTaskDTO>> logTaskDTOMap = JsonUtil.parseList(JsonUtil.toJsonString(args[0]), LogTaskDTO.class).stream().collect(Collectors.groupingBy(i -> i.getTaskId(), Collectors.toList()));
List<JobLogMessage> jobLogMessageList = new ArrayList<>();
for (List<LogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(logTaskDTOList.get(0));
jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setLogNum(logTaskDTOList.size());
List<Map<String, String>> messageMapList = logTaskDTOList.stream().map(taskDTO -> taskDTO.getFieldList()
.stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))).collect(Collectors.toList());
jobLogMessage.setMessage(JsonUtil.toJsonString(messageMapList));
jobLogMessageList.add(jobLogMessage);
}
// 批量新增日志数据
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogMessageList, actorRef);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Batch Log Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
}
}

View File

@ -0,0 +1,40 @@
package com.aizuda.easy.retry.server.retry.task.dto;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import lombok.Builder;
import lombok.Data;
/**
* @author xiaowoniu
* @date 2024-01-10 22:56:33
* @since 3.2.0
*/
@Data
@Builder
public class LogMetaDTO {
/**
* 命名空间
*/
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 组名称
*/
private String uniqueId;
/**
* 时间
*/
private Long timestamp;
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -1,6 +1,8 @@
package com.aizuda.easy.retry.server.retry.task.support;
import com.aizuda.easy.retry.server.model.dto.RetryLogTaskDTO;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.retry.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext;
@ -9,6 +11,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
@ -45,4 +48,9 @@ public interface RetryTaskConverter {
RetryTimerContext toRetryTimerContext(RetryPartitionTask retryPartitionTask);
List<NotifyConfigPartitionTask> toNotifyConfigPartitionTask(List<NotifyConfig> notifyConfigs);
RetryTaskLogMessage toRetryTaskLogMessage(RetryLogTaskDTO retryLogTaskDTO);
LogMetaDTO toLogMetaDTO(RetryTask retryTask);
}

View File

@ -0,0 +1,141 @@
package com.aizuda.easy.retry.server.retry.task.support.appender;
import akka.actor.ActorRef;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.classic.spi.StackTraceElementProxy;
import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.dto.TaskLogFieldDTO;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.retry.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.google.common.collect.Lists;
import org.slf4j.MDC;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
* @date 2023-12-27
* @since 3.2.0
*/
public class EasyRetryServerLogbackAppender<E> extends UnsynchronizedAppenderBase<E> {
@Override
protected void append(E eventObject) {
// Not job context
if (!(eventObject instanceof LoggingEvent) || Objects.isNull(
MDC.getMDCAdapter().get(LogFieldConstants.MDC_REMOTE))) {
return;
}
MDC.getMDCAdapter().remove(LogFieldConstants.MDC_REMOTE);
// Prepare processing
((LoggingEvent) eventObject).prepareForDeferredProcessing();
LoggingEvent event = (LoggingEvent) eventObject;
LogContentDTO logContentDTO = new LogContentDTO();
logContentDTO.addLevelField(event.getLevel().levelStr);
logContentDTO.addThreadField(event.getThreadName());
logContentDTO.addLocationField(getLocationField(event));
logContentDTO.addThrowableField(getThrowableField(event));
LogMetaDTO logMetaDTO = null;
try {
String patternString = "<\\|>(.*?)<\\|>";
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(event.getFormattedMessage());
while (matcher.find()) {
String extractedData = matcher.group(1);
if (StrUtil.isBlank(extractedData)) {
continue;
}
logMetaDTO = JsonUtil.parseObject(extractedData, LogMetaDTO.class);
String message = event.getFormattedMessage().replaceFirst(patternString, StrUtil.EMPTY);
logContentDTO.addMessageField(message);
logContentDTO.addTimeStamp(Optional.ofNullable(logMetaDTO.getTimestamp()).orElse(event.getTimeStamp()));
break;
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("日志解析失败. msg:[{}]", event.getFormattedMessage(), e);
}
if (Objects.isNull(logMetaDTO)) {
return;
}
// 保存执行的日志
saveLog(logContentDTO, logMetaDTO);
}
/**
* 保存日志
*
* @param logContentDTO 日志内容
* @param logMetaDTO 日志元数据
*/
private void saveLog(final LogContentDTO logContentDTO, final LogMetaDTO logMetaDTO) {
RetryTaskLogDTO jobLogDTO = new RetryTaskLogDTO();
Map<String, String> messageMap = logContentDTO.getFieldList()
.stream()
.filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue));
jobLogDTO.setMessage(JsonUtil.toJsonString(Lists.newArrayList(messageMap)));
jobLogDTO.setGroupName(logMetaDTO.getGroupName());
jobLogDTO.setNamespaceId(logMetaDTO.getNamespaceId());
jobLogDTO.setUniqueId(logMetaDTO.getUniqueId());
jobLogDTO.setRealTime(logMetaDTO.getTimestamp());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
}
private String getThrowableField(LoggingEvent event) {
IThrowableProxy iThrowableProxy = event.getThrowableProxy();
if (iThrowableProxy != null) {
String throwable = getExceptionInfo(iThrowableProxy);
throwable += formatThrowable(event.getThrowableProxy().getStackTraceElementProxyArray());
return throwable;
}
return null;
}
private String getLocationField(LoggingEvent event) {
StackTraceElement[] caller = event.getCallerData();
if (caller != null && caller.length > 0) {
return caller[0].toString();
}
return null;
}
private String formatThrowable(StackTraceElementProxy[] stackTraceElementProxyArray) {
StringBuilder builder = new StringBuilder();
for (StackTraceElementProxy step : stackTraceElementProxyArray) {
builder.append(CoreConstants.LINE_SEPARATOR);
String string = step.toString();
builder.append(CoreConstants.TAB).append(string);
ThrowableProxyUtil.subjoinPackagingData(builder, step);
}
return builder.toString();
}
private String getExceptionInfo(IThrowableProxy iThrowableProxy) {
String s = iThrowableProxy.getClassName();
String message = iThrowableProxy.getMessage();
return (message != null) ? (s + ": " + message) : s;
}
}

View File

@ -14,7 +14,10 @@ import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
@ -62,9 +65,9 @@ public class ExecCallbackUnitActor extends AbstractActor {
RegisterNodeInfo serverNode = context.getServerNode();
SceneConfig sceneConfig = context.getSceneConfig();
RetryTaskLogDTO retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask);
retryTaskLog.setTriggerTime(LocalDateTime.now());
retryTaskLog.setClientInfo(ClientInfoUtils.generate(serverNode));
// RetryTaskLogDTO retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask);
// retryTaskLog.setTriggerTime(LocalDateTime.now());
// retryTaskLog.setClientInfo(ClientInfoUtils.generate(serverNode));
try {
@ -80,23 +83,24 @@ public class ExecCallbackUnitActor extends AbstractActor {
message = "回调客户端失败: 异常信息为空";
}
}
retryTaskLog.setMessage(message);
// retryTaskLog.setMessage(message);
return result;
});
if (context.hasException()) {
retryTaskLog.setMessage(context.getException().getMessage());
// retryTaskLog.setMessage(context.getException().getMessage());
}
} else {
retryTaskLog.setMessage("There are currently no available client PODs.");
// retryTaskLog.setMessage("There are currently no available client PODs.");
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("callback client error. retryTask:[{}]", JsonUtil.toJsonString(retryTask), e);
retryTaskLog.setMessage(StringUtils.isBlank(e.getMessage()) ? StrUtil.EMPTY : e.getMessage());
LogMetaDTO logMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryTask);
logMetaDTO.setTimestamp(DateUtils.toNowMilli());
EasyRetryLog.REMOTE.error("请求客户端异常. <|>{}<|>", retryTask.getUniqueId(), logMetaDTO, e);// retryTaskLog.setMessage(StringUtils.isBlank(e.getMessage()) ? StrUtil.EMPTY : e.getMessage());
} finally {
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTaskLog, actorRef);
// ActorRef actorRef = ActorGenerator.logActor();
// actorRef.tell(retryTaskLog, actorRef);
getContext().stop(getSelf());
@ -135,6 +139,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
retryCallbackDTO.setGroup(callbackTask.getGroupName());
retryCallbackDTO.setExecutorName(callbackTask.getExecutorName());
retryCallbackDTO.setUniqueId(callbackTask.getUniqueId());
retryCallbackDTO.setNamespaceId(callbackTask.getNamespaceId());
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(serverNode)

View File

@ -15,7 +15,10 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
@ -52,9 +55,9 @@ public class ExecUnitActor extends AbstractActor {
RegisterNodeInfo serverNode = context.getServerNode();
SceneConfig sceneConfig = context.getSceneConfig();
RetryTaskLogDTO retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask);
retryTaskLog.setTriggerTime(LocalDateTime.now());
retryTaskLog.setClientInfo(ClientInfoUtils.generate(serverNode));
// RetryTaskLogDTO retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask);
// retryTaskLog.setTriggerTime(LocalDateTime.now());
// retryTaskLog.setClientInfo(ClientInfoUtils.generate(serverNode));
try {
@ -67,9 +70,9 @@ public class ExecUnitActor extends AbstractActor {
// 回调接口请求成功处理返回值
if (StatusEnum.YES.getStatus() != result.getStatus()) {
if (StrUtil.isNotBlank(result.getMessage())) {
retryTaskLog.setMessage(result.getMessage());
// retryTaskLog.setMessage(result.getMessage());
} else {
retryTaskLog.setMessage("客户端执行失败: 异常信息为空");
// retryTaskLog.setMessage("客户端执行失败: 异常信息为空");
}
} else {
DispatchRetryResultDTO data = JsonUtil.parseObject(JsonUtil.toJsonString(result.getData()), DispatchRetryResultDTO.class);
@ -77,14 +80,14 @@ public class ExecUnitActor extends AbstractActor {
if (Objects.nonNull(data)) {
if (RetryResultStatusEnum.FAILURE.getStatus().equals(data.getStatusCode())) {
if (StrUtil.isNotBlank(data.getExceptionMsg())) {
retryTaskLog.setMessage(data.getExceptionMsg());
// retryTaskLog.setMessage(data.getExceptionMsg());
} else {
retryTaskLog.setMessage("客户端重试失败: 异常信息为空");
// retryTaskLog.setMessage("客户端重试失败: 异常信息为空");
}
} else if (RetryResultStatusEnum.STOP.getStatus().equals(data.getStatusCode())) {
retryTaskLog.setMessage("客户端主动停止任务");
// retryTaskLog.setMessage("客户端主动停止任务");
} else {
retryTaskLog.setMessage("客户端执行成功");
// retryTaskLog.setMessage("客户端执行成功");
}
}
@ -95,19 +98,20 @@ public class ExecUnitActor extends AbstractActor {
// 请求发生异常
if (context.hasException()) {
retryTaskLog.setMessage(context.getException().getMessage());
// retryTaskLog.setMessage(context.getException().getMessage());
}
} else {
retryTaskLog.setMessage("There are currently no available client PODs.");
// retryTaskLog.setMessage("There are currently no available client PODs.");
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("callback client error. retryTask:[{}]", JsonUtil.toJsonString(retryTask), e);
retryTaskLog.setMessage(e.getMessage());
LogMetaDTO logMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryTask);
logMetaDTO.setTimestamp(DateUtils.toNowMilli());
EasyRetryLog.REMOTE.error("请求客户端异常. <|>{}<|>", retryTask.getUniqueId(), logMetaDTO, e);
} finally {
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTaskLog, actorRef);
// ActorRef actorRef = ActorGenerator.logActor();
// actorRef.tell(retryTaskLog, actorRef);
getContext().stop(getSelf());
}
@ -130,6 +134,8 @@ public class ExecUnitActor extends AbstractActor {
dispatchRetryDTO.setArgsStr(retryTask.getArgsStr());
dispatchRetryDTO.setUniqueId(retryTask.getUniqueId());
dispatchRetryDTO.setRetryCount(retryTask.getRetryCount());
dispatchRetryDTO.setGroupName(retryTask.getGroupName());
dispatchRetryDTO.setNamespaceId(retryTask.getNamespaceId());
// 设置header
EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders();

View File

@ -1,80 +0,0 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log;
import akka.actor.AbstractActor;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Optional;
/**
* 处理日志信息
*
* @author: www.byteblogs.com
* @date : 2023-06-16 11:33
* @since 2.0.0
*/
@Component(ActorGenerator.LOG_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class LogActor extends AbstractActor {
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTaskLogDTO.class,
retryTaskLogDTO -> RetryStatusEnum.RUNNING.getStatus().equals(retryTaskLogDTO.getRetryStatus()),
retryTaskLogDTO -> {
saveRetryTaskLogMessage(retryTaskLogDTO);
getContext().stop(getSelf());
}).match(RetryTaskLogDTO.class, (retryTaskLogDTO) ->
RetryStatusEnum.MAX_COUNT.getStatus().equals(retryTaskLogDTO.getRetryStatus())
|| RetryStatusEnum.FINISH.getStatus().equals(retryTaskLogDTO.getRetryStatus()),
retryTaskLogDTO -> {
// 变动日志的状态
RetryTaskLog retryTaskLog = new RetryTaskLog();
retryTaskLog.setRetryStatus(retryTaskLogDTO.getRetryStatus());
retryTaskLogMapper.update(retryTaskLog, new LambdaUpdateWrapper<RetryTaskLog>()
.eq(RetryTaskLog::getNamespaceId, retryTaskLogDTO.getNamespaceId())
.eq(RetryTaskLog::getUniqueId, retryTaskLogDTO.getUniqueId())
.eq(RetryTaskLog::getGroupName, retryTaskLogDTO.getGroupName()));
getContext().stop(getSelf());
}).build();
}
/**
* 报错日志详情
*/
private void saveRetryTaskLogMessage(final RetryTaskLogDTO retryTaskLogDTO) {
// 记录重试日志
RetryTaskLogMessage retryTaskLogMessage = new RetryTaskLogMessage();
retryTaskLogMessage.setUniqueId(retryTaskLogDTO.getUniqueId());
retryTaskLogMessage.setGroupName(retryTaskLogDTO.getGroupName());
retryTaskLogMessage.setClientInfo(retryTaskLogDTO.getClientInfo());
retryTaskLogMessage.setNamespaceId(retryTaskLogDTO.getNamespaceId());
String errorMessage = retryTaskLogDTO.getMessage();
retryTaskLogMessage.setMessage(
StrUtil.isBlank(errorMessage) ? StrUtil.EMPTY : errorMessage);
retryTaskLogMessage.setCreateDt(Optional.ofNullable(retryTaskLogDTO.getTriggerTime()).orElse(LocalDateTime.now()));
retryTaskLogMessageMapper.insert(retryTaskLogMessage);
}
}

View File

@ -0,0 +1,100 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log;
import akka.actor.AbstractActor;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.dto.TaskLogFieldDTO;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.model.dto.RetryLogTaskDTO;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 处理日志信息
*
* @author: www.byteblogs.com
* @date : 2023-06-16 11:33
* @since 2.0.0
*/
@Component(ActorGenerator.LOG_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class RetryLogActor extends AbstractActor {
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(List.class,
list -> {
if (CollectionUtils.isEmpty(list)) {
return;
}
saveRetryTaskLogMessage((List<RetryLogTaskDTO>) list);
getContext().stop(getSelf());
}).match(RetryTaskLogDTO.class,
retryTaskLogDTO -> {
saveRetryTaskLogMessage(retryTaskLogDTO);
getContext().stop(getSelf());
}).build();
}
private void saveRetryTaskLogMessage(final List<RetryLogTaskDTO> list) {
List<RetryLogTaskDTO> jobLogTasks = list;
Map<String, List<RetryLogTaskDTO>> logTaskDTOMap = jobLogTasks.
stream().collect(Collectors.groupingBy(RetryLogTaskDTO::getUniqueId, Collectors.toList()));
List<RetryTaskLogMessage> retryTaskLogMessages = new ArrayList<>();
for (List<RetryLogTaskDTO> logTaskDTOList : logTaskDTOMap.values()) {
RetryTaskLogMessage retryTaskLogMessage = RetryTaskConverter.INSTANCE.toRetryTaskLogMessage(
logTaskDTOList.get(0));
retryTaskLogMessage.setCreateDt(LocalDateTime.now());
retryTaskLogMessage.setLogNum(logTaskDTOList.size());
List<Map<String, String>> messageMapList = logTaskDTOList.stream()
.map(taskDTO -> taskDTO.getFieldList()
.stream().filter(logTaskDTO_ -> !Objects.isNull(logTaskDTO_.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)))
.collect(Collectors.toList());
retryTaskLogMessage.setMessage(JsonUtil.toJsonString(messageMapList));
retryTaskLogMessages.add(retryTaskLogMessage);
}
retryTaskLogMessageMapper.batchInsert(retryTaskLogMessages);
}
/**
* 报错日志详情
*/
private void saveRetryTaskLogMessage(final RetryTaskLogDTO retryTaskLogDTO) {
// 记录重试日志
RetryTaskLogMessage retryTaskLogMessage = new RetryTaskLogMessage();
retryTaskLogMessage.setUniqueId(retryTaskLogDTO.getUniqueId());
retryTaskLogMessage.setGroupName(retryTaskLogDTO.getGroupName());
retryTaskLogMessage.setClientInfo(retryTaskLogDTO.getClientInfo());
retryTaskLogMessage.setNamespaceId(retryTaskLogDTO.getNamespaceId());
String errorMessage = retryTaskLogDTO.getMessage();
retryTaskLogMessage.setMessage(
StrUtil.isBlank(errorMessage) ? StrUtil.EMPTY : errorMessage);
retryTaskLogMessage.setCreateDt(Optional.ofNullable(retryTaskLogDTO.getTriggerTime()).orElse(LocalDateTime.now()));
retryTaskLogMessageMapper.insert(retryTaskLogMessage);
}
}

View File

@ -49,4 +49,9 @@ public class RetryTaskLogDTO {
*/
private String clientInfo;
/**
* 真实上报时间
*/
private Long realTime;
}

View File

@ -11,13 +11,17 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent;
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -55,13 +59,13 @@ public class FailureActor extends AbstractActor {
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTask.class, retryTask -> {
EasyRetryLog.LOCAL.debug("FailureActor params:[{}]", retryTask);
try {
// 超过最大等级
SceneConfig sceneConfig =
@ -91,6 +95,14 @@ public class FailureActor extends AbstractActor {
() -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
// 变动日志的状态
RetryTaskLog retryTaskLog = new RetryTaskLog();
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
retryTaskLogMapper.update(retryTaskLog, new LambdaUpdateWrapper<RetryTaskLog>()
.eq(RetryTaskLog::getNamespaceId, retryTask.getNamespaceId())
.eq(RetryTaskLog::getUniqueId, retryTask.getUniqueId())
.eq(RetryTaskLog::getGroupName, retryTask.getGroupName()));
context.publishEvent(new RetryTaskFailMoreThresholdAlarmEvent(retryTask));
}
});
@ -99,15 +111,6 @@ public class FailureActor extends AbstractActor {
} finally {
// 清除幂等标识位
idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId());
if (RetryStatusEnum.MAX_COUNT.getStatus().equals(retryTask.getRetryStatus())) {
RetryTaskLogDTO retryTaskLogDTO = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask);
retryTaskLogDTO.setMessage("任务已经到达最大执行次数了.");
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTaskLogDTO, actorRef);
}
getContext().stop(getSelf());
}

View File

@ -13,7 +13,10 @@ import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -40,6 +43,7 @@ import java.time.LocalDateTime;
@Slf4j
public class FinishActor extends AbstractActor {
@Autowired
private AccessTemplate accessTemplate;
@Autowired
@ -49,6 +53,8 @@ public class FinishActor extends AbstractActor {
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Override
public Receive createReceive() {
@ -70,6 +76,15 @@ public class FinishActor extends AbstractActor {
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
// 变动日志的状态
RetryTaskLog retryTaskLog = new RetryTaskLog();
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
retryTaskLogMapper.update(retryTaskLog, new LambdaUpdateWrapper<RetryTaskLog>()
.eq(RetryTaskLog::getNamespaceId, retryTask.getNamespaceId())
.eq(RetryTaskLog::getUniqueId, retryTask.getUniqueId())
.eq(RetryTaskLog::getGroupName, retryTask.getGroupName()));
}
});
@ -79,11 +94,6 @@ public class FinishActor extends AbstractActor {
// 清除幂等标识位
idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId());
RetryTaskLogDTO retryTaskLogDTO = RetryTaskLogConverter.INSTANCE.toRetryTaskLogDTO(retryTask);
retryTaskLogDTO.setMessage("任务已经执行成功了.");
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTaskLogDTO, actorRef);
getContext().stop(getSelf());
}

View File

@ -2,11 +2,15 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
@ -66,14 +70,18 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
retryTask.getUniqueId(), pair.getValue().toString());
// 记录日志
RetryTaskLogDTO retryTaskLog = new RetryTaskLogDTO();
retryTaskLog.setGroupName(retryTask.getGroupName());
retryTaskLog.setUniqueId(retryTask.getUniqueId());
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
retryTaskLog.setMessage(pair.getValue().toString());
retryTaskLog.setTriggerTime(LocalDateTime.now());
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTaskLog, actorRef);
// RetryTaskLogDTO retryTaskLog = new RetryTaskLogDTO();
// retryTaskLog.setGroupName(retryTask.getGroupName());
// retryTaskLog.setUniqueId(retryTask.getUniqueId());
// retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
// retryTaskLog.setMessage(pair.getValue().toString());
// retryTaskLog.setTriggerTime(LocalDateTime.now());
// ActorRef actorRef = ActorGenerator.logActor();
LogMetaDTO logMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryTask);
logMetaDTO.setTimestamp(DateUtils.toNowMilli());
EasyRetryLog.REMOTE.error("触发条件不满足 原因: [{}] <|>{}<|>", pair.getValue().toString(), logMetaDTO);
return false;
}

View File

@ -103,7 +103,6 @@ public class RequestHandlerActor extends AbstractActor {
EasyRetryLog.LOCAL.warn("client register error. groupName:[{}]", groupName);
}
UrlBuilder builder = UrlBuilder.ofHttp(uri);
Collection<HttpRequestHandler> httpRequestHandlers = SpringContext.CONTEXT
.getBeansOfType(HttpRequestHandler.class).values();

View File

@ -37,7 +37,7 @@ public class BeatHttpRequestHandler extends GetHttpRequestHandler {
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
EasyRetryLog.LOCAL.info("Beat check content:[{}]", content);
EasyRetryLog.LOCAL.debug("Beat check content:[{}]", content);
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
return JsonUtil.toJsonString(new NettyResult(PONG, retryRequest.getReqId()));
}

View File

@ -0,0 +1,102 @@
package com.aizuda.easy.retry.server.starter.server.handler;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.log.enums.LogTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.PostHttpRequestHandler;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.model.dto.JobLogTaskDTO;
import com.aizuda.easy.retry.server.model.dto.RetryLogTaskDTO;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.BATCH_LOG_REPORT;
/**
* 处理日志上报数据
*
* @author: wodeyangzipingpingwuqi
* @date : 2023-12-26
* @since 1.0.0
*/
@Component
public class ReportLogHttpRequestHandler extends PostHttpRequestHandler {
private static final String JSON_FILED = "logType" ;
@Override
public boolean supports(String path) {
return BATCH_LOG_REPORT.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
EasyRetryLog.LOCAL.info("Begin Handler Log Report Data. [{}]", content);
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
Object[] args = retryRequest.getArgs();
Assert.notEmpty(args, () -> new EasyRetryServerException("日志上报的数据不能为空. ReqId:[{}]", retryRequest.getReqId()));
JsonNode jsonNode = JsonUtil.toJson(args[0]);
List<RetryLogTaskDTO> retryTasks = Lists.newArrayList();
List<JobLogTaskDTO> jobTasks = Lists.newArrayList();
for (final JsonNode node : jsonNode) {
if (node.findValue(JSON_FILED).asText().equals(LogTypeEnum.JOB.name())) {
jobTasks.add(JsonUtil.parseObject(node.toPrettyString(), JobLogTaskDTO.class));
}
if (node.findValue(JSON_FILED).asText().equals(LogTypeEnum.RETRY.name())) {
retryTasks.add(JsonUtil.parseObject(node.toPrettyString(), RetryLogTaskDTO.class));
}
}
// 批量新增日志数据
if (!CollectionUtils.isEmpty(jobTasks)) {
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobTasks, actorRef);
}
if (!CollectionUtils.isEmpty(retryTasks)) {
String clintInfo = getClientInfo(headers);
for (final RetryLogTaskDTO retryTask : retryTasks) {
retryTask.setClientInfo(clintInfo);
}
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTasks, actorRef);
}
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Batch Log Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
}
private String getClientInfo(final HttpHeaders headers) {
String hostId = headers.get(HeadersEnum.HOST_ID.getKey());
String hostIp = headers.get(HeadersEnum.HOST_IP.getKey());
Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey());
RegisterNodeInfo registerNodeInfo = new RegisterNodeInfo();
registerNodeInfo.setHostIp(hostIp);
registerNodeInfo.setHostPort(hostPort);
registerNodeInfo.setHostId(hostId);
return ClientInfoUtils.generate(registerNodeInfo);
}
}

View File

@ -81,6 +81,8 @@
<!-- EasyRetry appender -->
<appender name="easyLogServerAppender" class="com.aizuda.easy.retry.server.job.task.support.appender.EasyRetryServerLogbackAppender">
</appender>
<appender name="easyRetryLogServerAppender" class="com.aizuda.easy.retry.server.retry.task.support.appender.EasyRetryServerLogbackAppender">
</appender>
<!-- 控制台输出日志级别 -->
<root level="info">
@ -89,5 +91,6 @@
<appender-ref ref="asyncWarn" />
<appender-ref ref="asyncError" />
<appender-ref ref="easyLogServerAppender" />
<appender-ref ref="easyRetryLogServerAppender" />
</root>
</configuration>