From 9a0ffa4dce9e48d648453a065201ad99f26147f8 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Tue, 2 Jan 2024 12:23:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E8=AF=A6=E6=83=85=E9=94=99=E8=AF=AF=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/enums/JobOperationReasonEnum.java | 2 + .../server/model/dto/CallbackParamsDTO.java | 23 +++++ .../task/support/WorkflowTaskConverter.java | 4 + .../workflow/CallbackWorkflowExecutor.java | 86 ++++++++++++++++--- .../workflow/ConditionWorkflowExecutor.java | 1 - .../workflow/JobTaskWorkflowExecutor.java | 2 +- .../service/convert/WorkflowConverter.java | 4 +- 7 files changed, 109 insertions(+), 13 deletions(-) create mode 100644 easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/CallbackParamsDTO.java diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java index 18fc320d..df3a8ecc 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java @@ -26,6 +26,8 @@ public enum JobOperationReasonEnum { MANNER_STOP(8, "手动停止"), WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"), JOB_TASK_INTERRUPTED(9, "任务中断"), + WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"), + ; private final int reason; diff --git a/easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/CallbackParamsDTO.java b/easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/CallbackParamsDTO.java new file mode 100644 index 00000000..4fd874cb --- /dev/null +++ b/easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/CallbackParamsDTO.java @@ -0,0 +1,23 @@ +package com.aizuda.easy.retry.server.model.dto; + +import lombok.Data; + +/** + * @author: xiaowoniu + * @date : 2024-01-02 + * @since : 2.6.0 + */ +@Data +public class CallbackParamsDTO { + + /** + * 执行结果 + */ + private String resultMessage; + + /** + * 客户端ID + */ + private String clientInfo; + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java index 7d375b12..6d5ab6a0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java @@ -6,6 +6,8 @@ import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBloc import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; +import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; @@ -46,4 +48,6 @@ public interface WorkflowTaskConverter { WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowBlockStrategyContext context); WorkflowBlockStrategyContext toWorkflowBlockStrategyContext(WorkflowTaskPrepareDTO prepareDTO); + + List toCallbackParamsDTO(List tasks); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index 93141fb1..5336ae45 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -1,21 +1,39 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow; +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.dto.CallbackConfig; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; +import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** * @author xiaowoniu @@ -24,9 +42,13 @@ import java.util.Map; */ @Component @RequiredArgsConstructor +@Slf4j public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { + private static final String SECRET = "secret"; private final RestTemplate restTemplate; private final JobTaskMapper jobTaskMapper; + private final JobTaskBatchGenerator jobTaskBatchGenerator; + @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { return WorkflowNodeTypeEnum.CALLBACK; @@ -35,23 +57,67 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { @Override protected void doExecute(WorkflowExecutorContext context) { CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class); + int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus(); + int operationReason = JobOperationReasonEnum.NONE.getReason(); + int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); + + String message = StrUtil.EMPTY; HttpHeaders requestHeaders = new HttpHeaders(); requestHeaders.set(HttpHeaders.CONTENT_TYPE, decisionConfig.getContentType()); - requestHeaders.set("secret", decisionConfig.getSecret()); + requestHeaders.set(SECRET, decisionConfig.getSecret()); - // TODO 拼接所有的任务结果值传递给下游节点 List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage) + .select(JobTask::getResultMessage, JobTask::getClientInfo) .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); + List callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks); - Map uriVariables = new HashMap<>(); - uriVariables.put("secret", decisionConfig.getSecret()); - restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST, - new HttpEntity<>("", requestHeaders), Object.class, uriVariables); + String result = StrUtil.EMPTY; + try { + Map uriVariables = new HashMap<>(); + uriVariables.put(SECRET, decisionConfig.getSecret()); + // TODO 添加重试 + ResponseEntity exchange = restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST, + new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables); + result = exchange.getBody(); + log.info("回调结果. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), result); + } catch (Exception e) { + log.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), context.getResult(), e); + taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); + operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason(); + jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); + message = e.getMessage(); + } - // TODO 保存批次 - // TODO 保存任务 - // TODO 保存日志 + JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context); + generatorContext.setTaskBatchStatus(taskBatchStatus); + generatorContext.setOperationReason(operationReason); + generatorContext.setJobId(SystemConstants.DECISION_JOB_ID); + JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); + + // 生成执行任务实例 + JobTask jobTask = new JobTask(); + jobTask.setGroupName(context.getGroupName()); + jobTask.setNamespaceId(context.getNamespaceId()); + jobTask.setJobId(SystemConstants.CALLBACK_JOB_ID); + jobTask.setClientInfo(StrUtil.EMPTY); + jobTask.setTaskBatchId(jobTaskBatch.getId()); + jobTask.setArgsType(1); + jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY)); + jobTask.setTaskStatus(jobTaskStatus); + jobTask.setResultMessage(result); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); + + // 保存执行的日志 + JobLogDTO jobLogDTO = new JobLogDTO(); + // TODO 等实时日志处理完毕后,再处理 + jobLogDTO.setMessage(message); + jobLogDTO.setTaskId(jobTask.getId()); + jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID); + jobLogDTO.setGroupName(context.getGroupName()); + jobLogDTO.setNamespaceId(context.getNamespaceId()); + jobLogDTO.setTaskBatchId(jobTaskBatch.getId()); + ActorRef actorRef = ActorGenerator.jobLogActor(); + actorRef.tell(jobLogDTO, actorRef); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java index 265ff76f..ef272e8e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java @@ -105,7 +105,6 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { } if (result) { - // 若是工作流则开启下一个任务 try { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java index 6c811935..c800a561 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java @@ -27,7 +27,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { // 生成任务批次 JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob()); jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType()); - jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000); + jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000); jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId()); jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId()); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java index fe5682a1..17cc3d40 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java @@ -86,7 +86,9 @@ public interface WorkflowConverter { static JobTaskConfig parseJobTaskConfig(WorkflowNode workflowNode) { if (WorkflowNodeTypeEnum.JOB_TASK.getType() == workflowNode.getNodeType()) { - return JsonUtil.parseObject(workflowNode.getNodeInfo(), JobTaskConfig.class); + JobTaskConfig jobTaskConfig = new JobTaskConfig(); + jobTaskConfig.setJobId(workflowNode.getJobId()); + return jobTaskConfig; } return null;