feat: 2.6.0

1.日志优化
This commit is contained in:
byteblogs168 2024-01-21 00:38:39 +08:00
parent 28096b946d
commit 65f83004a7
10 changed files with 72 additions and 59 deletions

View File

@ -113,7 +113,7 @@ public class EasyRetryProperties {
/**
* 窗口期时间长度
*/
private long duration = 10;
private long duration = 5;
/**
* 窗口期单位

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.client.job.core.client;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.client.job.core.IJobExecutor;
import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
@ -34,14 +35,23 @@ public class JobEndPoint {
@PostMapping("/dispatch/v1")
public Result<Boolean> dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) {
JobContext jobContext = buildJobContext(dispatchJob);
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
if (Objects.isNull(jobExecutorInfo)) {
EasyRetryLog.REMOTE.error("执行器配置有误. executorInfo:[{}]", dispatchJob.getExecutorInfo());
return new Result<>("执行器配置有误", Boolean.FALSE);
}
try {
JobContext jobContext = buildJobContext(dispatchJob);
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
if (Objects.nonNull(dispatchJob.getRetryCount()) && dispatchJob.getRetryCount() > 0) {
EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}]",
dispatchJob.getRetryCount());
}
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
if (Objects.isNull(jobExecutorInfo)) {
EasyRetryLog.REMOTE.error("执行器配置有误. executorInfo:[{}]", dispatchJob.getExecutorInfo());
return new Result<>("执行器配置有误", Boolean.FALSE);
}
// 选择执行器
Object executor = jobExecutorInfo.getExecutor();
IJobExecutor jobExecutor;
@ -51,13 +61,17 @@ public class JobEndPoint {
jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);
}
EasyRetryLog.REMOTE.info("批次:[{}] 任务调度成功. ", dispatchJob.getTaskBatchId());
jobExecutor.jobExecute(jobContext);
} catch (Exception e) {
EasyRetryLog.REMOTE.error("客户端发生非预期异常. taskBatchId:[{}]", dispatchJob.getTaskBatchId());
throw e;
} finally {
ThreadLocalLogUtil.removeContext();
}
return new Result<>(Boolean.TRUE);
}

View File

@ -52,5 +52,6 @@ public class DispatchJobRequest {
private Long workflowNodeId;
private Integer retryCount;
}

View File

@ -36,10 +36,6 @@ public class LogContentDTO {
fieldList.add(new TaskLogFieldDTO(name, value));
}
public void addTimeField(String time) {
this.addField(LogFieldConstants.TIME, time);
}
public void addTimeStamp(Long timeStamp) {
this.addField(LogFieldConstants.TIME_STAMP, String.valueOf(timeStamp));
}

View File

@ -62,6 +62,8 @@ public class RealJobExecutorDTO extends BaseDTO {
*/
private Integer retryInterval;
private Integer retryCount;
private Integer shardingTotal;
private Integer shardingIndex;

View File

@ -49,11 +49,9 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo()));
realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId());
realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(context);
EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}] <|>{}<|>",
jobTask.getRetryCount() + 1, logMetaDTO);
return;
}
}

View File

@ -70,13 +70,10 @@ public class RequestClientActor extends AbstractActor {
try {
// 构建请求客户端对象
Long timestamp = DateUtils.toNowMilli();
JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO);
Result<Boolean> dispatch = rpcClient.dispatch(dispatchJobRequest);
if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) {
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
logMetaDTO.setTimestamp(timestamp);
EasyRetryLog.REMOTE.info("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO);
EasyRetryLog.LOCAL.info("taskId:[{}] 任务调度成功.", realJobExecutorDTO.getTaskId());
} else {
// 客户端返回失败则认为任务执行失败
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType());
@ -87,13 +84,16 @@ public class RequestClientActor extends AbstractActor {
}
} catch (Exception e) {
log.error("调用客户端失败.", e);
Throwable throwable;
Throwable throwable = e;
if (e.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) e;
throwable = re.getLastFailedAttempt().getExceptionCause();
taskExecuteFailure(realJobExecutorDTO, throwable.getMessage());
}
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
logMetaDTO.setTimestamp( DateUtils.toNowMilli());
EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO, throwable);
}
}

View File

@ -167,20 +167,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return jobTask;
}
public void generate(WorkflowExecutorContext context) {
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.YES.getStatus())) {
return;
}
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
}
@Override
public void afterPropertiesSet() {
WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this);

View File

@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;
@ -74,27 +75,30 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
Boolean tempResult = null;
List<String> taskResult = Lists.newArrayList();
for (JobTask jobTask : jobTasks) {
taskResult.add(jobTask.getResultMessage());
boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
Boolean tempResult = null;
if (CollectionUtils.isEmpty(jobTasks)) {
tempResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), StrUtil.EMPTY)).orElse(Boolean.FALSE);
} else {
for (JobTask jobTask : jobTasks) {
taskResult.add(jobTask.getResultMessage());
boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
if (Objects.isNull(tempResult)) {
tempResult = execResult;
}
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
tempResult = tempResult && execResult;
} else {
tempResult = tempResult || execResult;
if (tempResult) {
break;
if (Objects.isNull(tempResult)) {
tempResult = execResult;
}
}
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
tempResult = tempResult && execResult;
} else {
tempResult = tempResult || execResult;
if (tempResult) {
break;
}
}
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
}
}
context.setTaskResult(JsonUtil.toJsonString(taskResult));

View File

@ -2,6 +2,8 @@ package com.aizuda.easy.retry.server.web.service.impl;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.server.web.model.request.JobLogQueryVO;
import com.aizuda.easy.retry.server.web.model.response.JobLogResponseVO;
import com.aizuda.easy.retry.server.web.service.JobLogService;
@ -18,9 +20,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -85,7 +85,7 @@ public class JobLogServiceImpl implements JobLogService {
}
long nextStartId = 0;
List<String> messages = Lists.newArrayList();
List<Map<String, String>> messages = Lists.newArrayList();
List<JobLogMessage> jobLogMessages = jobLogMessageMapper.selectList(
new LambdaQueryWrapper<JobLogMessage>()
.in(JobLogMessage::getId, ids)
@ -95,9 +95,9 @@ public class JobLogServiceImpl implements JobLogService {
for (final JobLogMessage jobLogMessage : jobLogMessages) {
List<String> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);
List<Map<String, String>> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);
int size = originalList.size() - fromIndex;
List<String> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
List<Map<String, String>> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
.collect(Collectors.toList());
if (messages.size() + size >= queryVO.getSize()) {
@ -112,6 +112,18 @@ public class JobLogServiceImpl implements JobLogService {
fromIndex = 0;
}
messages = messages.stream().sorted((o1, o2) -> {
long value = Long.parseLong(o1.get(LogFieldConstants.TIME_STAMP)) - Long.parseLong(o2.get(LogFieldConstants.TIME_STAMP));
if (value > 0) {
return 1;
} else if (value < 0) {
return -1;
}
return 0;
}).collect(Collectors.toList());
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
jobLogResponseVO.setMessage(messages);
jobLogResponseVO.setNextStartId(nextStartId);