feat: 2.6.0

1. 修复查询详情错误问题
This commit is contained in:
byteblogs168 2024-01-02 12:23:36 +08:00
parent d7a53ae3e6
commit 9a0ffa4dce
7 changed files with 109 additions and 13 deletions

View File

@ -26,6 +26,8 @@ public enum JobOperationReasonEnum {
MANNER_STOP(8, "手动停止"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"),
JOB_TASK_INTERRUPTED(9, "任务中断"),
WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"),
;
private final int reason;

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.server.model.dto;
import lombok.Data;
/**
* @author: xiaowoniu
* @date : 2024-01-02
* @since : 2.6.0
*/
@Data
public class CallbackParamsDTO {
/**
* 执行结果
*/
private String resultMessage;
/**
* 客户端ID
*/
private String clientInfo;
}

View File

@ -6,6 +6,8 @@ import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBloc
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
@ -46,4 +48,6 @@ public interface WorkflowTaskConverter {
WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowBlockStrategyContext context);
WorkflowBlockStrategyContext toWorkflowBlockStrategyContext(WorkflowTaskPrepareDTO prepareDTO);
List<CallbackParamsDTO> toCallbackParamsDTO(List<JobTask> tasks);
}

View File

@ -1,21 +1,39 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* @author xiaowoniu
@ -24,9 +42,13 @@ import java.util.Map;
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
private static final String SECRET = "secret";
private final RestTemplate restTemplate;
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CALLBACK;
@ -35,23 +57,67 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
protected void doExecute(WorkflowExecutorContext context) {
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class);
int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
String message = StrUtil.EMPTY;
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set(HttpHeaders.CONTENT_TYPE, decisionConfig.getContentType());
requestHeaders.set("secret", decisionConfig.getSecret());
requestHeaders.set(SECRET, decisionConfig.getSecret());
// TODO 拼接所有的任务结果值传递给下游节点
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.select(JobTask::getResultMessage, JobTask::getClientInfo)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks);
Map<String, String> uriVariables = new HashMap<>();
uriVariables.put("secret", decisionConfig.getSecret());
restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
new HttpEntity<>("", requestHeaders), Object.class, uriVariables);
String result = StrUtil.EMPTY;
try {
Map<String, String> uriVariables = new HashMap<>();
uriVariables.put(SECRET, decisionConfig.getSecret());
// TODO 添加重试
ResponseEntity<String> exchange = restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables);
result = exchange.getBody();
log.info("回调结果. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), result);
} catch (Exception e) {
log.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), context.getResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
}
// TODO 保存批次
// TODO 保存任务
// TODO 保存日志
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason);
generatorContext.setJobId(SystemConstants.DECISION_JOB_ID);
JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
// 生成执行任务实例
JobTask jobTask = new JobTask();
jobTask.setGroupName(context.getGroupName());
jobTask.setNamespaceId(context.getNamespaceId());
jobTask.setJobId(SystemConstants.CALLBACK_JOB_ID);
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(jobTaskStatus);
jobTask.setResultMessage(result);
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
// 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO();
// TODO 等实时日志处理完毕后再处理
jobLogDTO.setMessage(message);
jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID);
jobLogDTO.setGroupName(context.getGroupName());
jobLogDTO.setNamespaceId(context.getNamespaceId());
jobLogDTO.setTaskBatchId(jobTaskBatch.getId());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
}
}

View File

@ -105,7 +105,6 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
}
if (result) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());

View File

@ -27,7 +27,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());

View File

@ -86,7 +86,9 @@ public interface WorkflowConverter {
static JobTaskConfig parseJobTaskConfig(WorkflowNode workflowNode) {
if (WorkflowNodeTypeEnum.JOB_TASK.getType() == workflowNode.getNodeType()) {
return JsonUtil.parseObject(workflowNode.getNodeInfo(), JobTaskConfig.class);
JobTaskConfig jobTaskConfig = new JobTaskConfig();
jobTaskConfig.setJobId(workflowNode.getJobId());
return jobTaskConfig;
}
return null;