feat: 2.6.0

1.定时任务流程接入实时日志
This commit is contained in:
byteblogs168 2024-01-11 17:43:24 +08:00
parent 2742fd0ff1
commit 8da7ec673a
7 changed files with 27 additions and 39 deletions

View File

@ -36,6 +36,11 @@ public class LogMetaDTO {
*/ */
private Long taskId; private Long taskId;
/**
* 时间
*/
private Long timestamp;
@Override @Override
public String toString() { public String toString() {
return JsonUtil.toJsonString(this); return JsonUtil.toJsonString(this);

View File

@ -65,9 +65,9 @@ public interface JobTaskConverter {
JobLogDTO toJobLogDTO(JobExecutorContext context); JobLogDTO toJobLogDTO(JobExecutorContext context);
JobLogDTO toJobLogDTO(JobExecutorResultDTO resultDTO); LogMetaDTO toJobLogDTO(JobExecutorResultDTO resultDTO);
JobLogDTO toJobLogDTO(BaseDTO baseDTO); LogMetaDTO toJobLogDTO(BaseDTO baseDTO);
ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request); ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request);

View File

@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatc
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.Getter; import lombok.Getter;
@ -81,6 +82,11 @@ public class BlockStrategies {
private Integer operationReason; private Integer operationReason;
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer taskExecutorScene;
} }
private static final class DiscardBlockStrategy implements BlockStrategy { private static final class DiscardBlockStrategy implements BlockStrategy {

View File

@ -6,9 +6,12 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO; import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
@ -28,6 +31,7 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Objects; import java.util.Objects;
/** /**
@ -84,11 +88,11 @@ public class JobExecutorResultActor extends AbstractActor {
} }
}); });
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result); LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
jobLogDTO.setMessage(result.getMessage()); // 防止客户端日志还未上报完成导致日志时序错误
jobLogDTO.setTaskId(result.getTaskId()); logMetaDTO.setTimestamp(DateUtils.toEpochMilli(LocalDateTime.now().plusHours(1)));
ActorRef actorRef = ActorGenerator.jobLogActor(); EasyRetryLog.REMOTE.info("taskId:[{}] 任务执行成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO);
actorRef.tell(jobLogDTO, actorRef);
} catch (Exception e) { } catch (Exception e) {
LogUtils.error(log, " job executor result exception. [{}]", result, e); LogUtils.error(log, " job executor result exception. [{}]", result, e);
} finally { } finally {

View File

@ -4,10 +4,12 @@ import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.client.model.ExecuteResult; import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest; import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder; import com.aizuda.easy.retry.server.common.client.RequestBuilder;
@ -15,6 +17,7 @@ import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.job.task.client.JobRpcClient; import com.aizuda.easy.retry.server.job.task.client.JobRpcClient;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler; import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -62,7 +65,6 @@ public class RequestClientActor extends AbstractActor {
return; return;
} }
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
DispatchJobRequest dispatchJobRequest = JobTaskConverter.INSTANCE.toDispatchJobRequest(realJobExecutorDTO); DispatchJobRequest dispatchJobRequest = JobTaskConverter.INSTANCE.toDispatchJobRequest(realJobExecutorDTO);
try { try {
@ -70,9 +72,8 @@ public class RequestClientActor extends AbstractActor {
JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO); JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO);
Result<Boolean> dispatch = rpcClient.dispatch(dispatchJobRequest); Result<Boolean> dispatch = rpcClient.dispatch(dispatchJobRequest);
if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) { if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) {
jobLogDTO.setMessage("任务调度成功"); LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
ActorRef actorRef = ActorGenerator.jobLogActor(); EasyRetryLog.REMOTE.info("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO);
actorRef.tell(jobLogDTO, actorRef);
} else { } else {
// 客户端返回失败则认为任务执行失败 // 客户端返回失败则认为任务执行失败
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType()); ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType());

View File

@ -1,21 +0,0 @@
package com.aizuda.easy.retry.server.web.service.convert;
import com.aizuda.easy.retry.server.web.model.response.JobLogResponseVO;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2023-10-12 11:27
* @since : 2.4.0
*/
@Mapper
public interface JobLogResponseVOConverter {
JobLogResponseVOConverter INSTANCE = Mappers.getMapper(JobLogResponseVOConverter.class);
List<JobLogResponseVO> toJobLogResponseVOs(List<JobLogMessage> jobLogMessages);
}

View File

@ -1,12 +1,9 @@
package com.aizuda.easy.retry.server.web.service.impl; package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.util.ArrayUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.JobLogQueryVO; import com.aizuda.easy.retry.server.web.model.request.JobLogQueryVO;
import com.aizuda.easy.retry.server.web.model.response.JobLogResponseVO; import com.aizuda.easy.retry.server.web.model.response.JobLogResponseVO;
import com.aizuda.easy.retry.server.web.service.JobLogService; import com.aizuda.easy.retry.server.web.service.JobLogService;
import com.aizuda.easy.retry.server.web.service.convert.JobLogResponseVOConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -17,8 +14,6 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -42,8 +37,6 @@ public class JobLogServiceImpl implements JobLogService {
queryWrapper queryWrapper
.select(JobLogMessage::getId, JobLogMessage::getLogNum) .select(JobLogMessage::getId, JobLogMessage::getLogNum)
.ge(JobLogMessage::getId, queryVO.getStartId()) .ge(JobLogMessage::getId, queryVO.getStartId())
.eq(JobLogMessage::getJobId, queryVO.getJobId())
.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId()); .eq(JobLogMessage::getTaskId, queryVO.getTaskId());
queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId); queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId);