From 65f83004a7c965cdbde93e2635f192e7e8d046c7 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sun, 21 Jan 2024 00:38:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=E6=97=A5=E5=BF=97=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/config/EasyRetryProperties.java | 2 +- .../client/job/core/client/JobEndPoint.java | 30 +++++++++++---- .../model/request/DispatchJobRequest.java | 1 + .../retry/common/log/dto/LogContentDTO.java | 4 -- .../job/task/dto/RealJobExecutorDTO.java | 2 + .../AbstractClientCallbackHandler.java | 4 +- .../executor/job/RequestClientActor.java | 12 +++--- .../workflow/AbstractWorkflowExecutor.java | 14 ------- .../workflow/DecisionWorkflowExecutor.java | 38 ++++++++++--------- .../web/service/impl/JobLogServiceImpl.java | 24 +++++++++--- 10 files changed, 72 insertions(+), 59 deletions(-) diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java index 11915f49..e6e27acd 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java @@ -113,7 +113,7 @@ public class EasyRetryProperties { /** * 窗口期时间长度 */ - private long duration = 10; + private long duration = 5; /** * 窗口期单位 diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java index ed585898..31c91e38 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.client.job.core.client; +import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil; import com.aizuda.easy.retry.client.job.core.IJobExecutor; import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache; import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache; @@ -34,14 +35,23 @@ public class JobEndPoint { @PostMapping("/dispatch/v1") public Result dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) { - JobContext jobContext = buildJobContext(dispatchJob); - JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo()); - if (Objects.isNull(jobExecutorInfo)) { - EasyRetryLog.REMOTE.error("执行器配置有误. executorInfo:[{}]", dispatchJob.getExecutorInfo()); - return new Result<>("执行器配置有误", Boolean.FALSE); - } - try { + JobContext jobContext = buildJobContext(dispatchJob); + + // 初始化调度信息(日志上报LogUtil) + ThreadLocalLogUtil.setContext(jobContext); + + if (Objects.nonNull(dispatchJob.getRetryCount()) && dispatchJob.getRetryCount() > 0) { + EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}]", + dispatchJob.getRetryCount()); + } + + JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo()); + if (Objects.isNull(jobExecutorInfo)) { + EasyRetryLog.REMOTE.error("执行器配置有误. executorInfo:[{}]", dispatchJob.getExecutorInfo()); + return new Result<>("执行器配置有误", Boolean.FALSE); + } + // 选择执行器 Object executor = jobExecutorInfo.getExecutor(); IJobExecutor jobExecutor; @@ -51,13 +61,17 @@ public class JobEndPoint { jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); } + EasyRetryLog.REMOTE.info("批次:[{}] 任务调度成功. ", dispatchJob.getTaskBatchId()); + jobExecutor.jobExecute(jobContext); + } catch (Exception e) { EasyRetryLog.REMOTE.error("客户端发生非预期异常. taskBatchId:[{}]", dispatchJob.getTaskBatchId()); throw e; + } finally { + ThreadLocalLogUtil.removeContext(); } - return new Result<>(Boolean.TRUE); } diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java index 946062b3..5ff00d13 100644 --- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java +++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java @@ -52,5 +52,6 @@ public class DispatchJobRequest { private Long workflowNodeId; + private Integer retryCount; } diff --git a/easy-retry-common/easy-retry-common-log/src/main/java/com/aizuda/easy/retry/common/log/dto/LogContentDTO.java b/easy-retry-common/easy-retry-common-log/src/main/java/com/aizuda/easy/retry/common/log/dto/LogContentDTO.java index e3798cd3..e482cb81 100644 --- a/easy-retry-common/easy-retry-common-log/src/main/java/com/aizuda/easy/retry/common/log/dto/LogContentDTO.java +++ b/easy-retry-common/easy-retry-common-log/src/main/java/com/aizuda/easy/retry/common/log/dto/LogContentDTO.java @@ -36,10 +36,6 @@ public class LogContentDTO { fieldList.add(new TaskLogFieldDTO(name, value)); } - public void addTimeField(String time) { - this.addField(LogFieldConstants.TIME, time); - } - public void addTimeStamp(Long timeStamp) { this.addField(LogFieldConstants.TIME_STAMP, String.valueOf(timeStamp)); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/RealJobExecutorDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/RealJobExecutorDTO.java index 5ac25117..27c14049 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/RealJobExecutorDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/RealJobExecutorDTO.java @@ -62,6 +62,8 @@ public class RealJobExecutorDTO extends BaseDTO { */ private Integer retryInterval; + private Integer retryCount; + private Integer shardingTotal; private Integer shardingIndex; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java index 6bc8d8f6..12aff79d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -49,11 +49,9 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo())); realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId()); realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); actorRef.tell(realJobExecutor, actorRef); - LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(context); - EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}] <|>{}<|>", - jobTask.getRetryCount() + 1, logMetaDTO); return; } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java index e0aaba5b..a93a5098 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java @@ -70,13 +70,10 @@ public class RequestClientActor extends AbstractActor { try { // 构建请求客户端对象 - Long timestamp = DateUtils.toNowMilli(); JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO); Result dispatch = rpcClient.dispatch(dispatchJobRequest); if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) { - LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); - logMetaDTO.setTimestamp(timestamp); - EasyRetryLog.REMOTE.info("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO); + EasyRetryLog.LOCAL.info("taskId:[{}] 任务调度成功.", realJobExecutorDTO.getTaskId()); } else { // 客户端返回失败,则认为任务执行失败 ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType()); @@ -87,13 +84,16 @@ public class RequestClientActor extends AbstractActor { } } catch (Exception e) { - log.error("调用客户端失败.", e); - Throwable throwable; + Throwable throwable = e; if (e.getClass().isAssignableFrom(RetryException.class)) { RetryException re = (RetryException) e; throwable = re.getLastFailedAttempt().getExceptionCause(); taskExecuteFailure(realJobExecutorDTO, throwable.getMessage()); } + + LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); + logMetaDTO.setTimestamp( DateUtils.toNowMilli()); + EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO, throwable); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index e6e52f62..99116067 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -167,20 +167,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init return jobTask; } - public void generate(WorkflowExecutorContext context) { - if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.YES.getStatus())) { - return; - } - - JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context); - generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); - generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason()); - generatorContext.setJobId(context.getJobId()); - generatorContext.setTaskExecutorScene(context.getTaskExecutorScene()); - jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); - workflowBatchHandler.complete(context.getWorkflowTaskBatchId()); - } - @Override public void afterPropertiesSet() { WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java index 504f0b5a..c44bdc45 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.List; import java.util.Objects; @@ -74,27 +75,30 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() .select(JobTask::getResultMessage) .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); - - Boolean tempResult = null; List taskResult = Lists.newArrayList(); - for (JobTask jobTask : jobTasks) { - taskResult.add(jobTask.getResultMessage()); - boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE); + Boolean tempResult = null; + if (CollectionUtils.isEmpty(jobTasks)) { + tempResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), StrUtil.EMPTY)).orElse(Boolean.FALSE); + } else { + for (JobTask jobTask : jobTasks) { + taskResult.add(jobTask.getResultMessage()); + boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE); - if (Objects.isNull(tempResult)) { - tempResult = execResult; - } - - if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) { - tempResult = tempResult && execResult; - } else { - tempResult = tempResult || execResult; - if (tempResult) { - break; + if (Objects.isNull(tempResult)) { + tempResult = execResult; } - } - log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result); + if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) { + tempResult = tempResult && execResult; + } else { + tempResult = tempResult || execResult; + if (tempResult) { + break; + } + } + + log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result); + } } context.setTaskResult(JsonUtil.toJsonString(taskResult)); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java index cda816c6..31dba737 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobLogServiceImpl.java @@ -2,6 +2,8 @@ package com.aizuda.easy.retry.server.web.service.impl; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.log.constant.LogFieldConstants; +import com.aizuda.easy.retry.common.log.dto.LogContentDTO; import com.aizuda.easy.retry.server.web.model.request.JobLogQueryVO; import com.aizuda.easy.retry.server.web.model.response.JobLogResponseVO; import com.aizuda.easy.retry.server.web.service.JobLogService; @@ -18,9 +20,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; /** @@ -85,7 +85,7 @@ public class JobLogServiceImpl implements JobLogService { } long nextStartId = 0; - List messages = Lists.newArrayList(); + List> messages = Lists.newArrayList(); List jobLogMessages = jobLogMessageMapper.selectList( new LambdaQueryWrapper() .in(JobLogMessage::getId, ids) @@ -95,9 +95,9 @@ public class JobLogServiceImpl implements JobLogService { for (final JobLogMessage jobLogMessage : jobLogMessages) { - List originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class); + List> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class); int size = originalList.size() - fromIndex; - List pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) + List> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) .collect(Collectors.toList()); if (messages.size() + size >= queryVO.getSize()) { @@ -112,6 +112,18 @@ public class JobLogServiceImpl implements JobLogService { fromIndex = 0; } + messages = messages.stream().sorted((o1, o2) -> { + long value = Long.parseLong(o1.get(LogFieldConstants.TIME_STAMP)) - Long.parseLong(o2.get(LogFieldConstants.TIME_STAMP)); + + if (value > 0) { + return 1; + } else if (value < 0) { + return -1; + } + + return 0; + }).collect(Collectors.toList()); + JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); jobLogResponseVO.setMessage(messages); jobLogResponseVO.setNextStartId(nextStartId);