feat: 2.6.0

1. 优化回调日志
This commit is contained in:
byteblogs168 2024-01-13 23:16:51 +08:00
parent c8a238b91c
commit fe9048ce8a
4 changed files with 41 additions and 46 deletions

View File

@ -57,7 +57,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Autowired
private JobTaskBatchGenerator jobTaskBatchGenerator;
@Autowired
private WorkflowBatchHandler workflowBatchHandler;
protected WorkflowBatchHandler workflowBatchHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
@ -107,12 +107,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
try {
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
} catch (IOException e) {
throw new EasyRetryServerException("工作流完成处理异常", e);
}
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
return false;
}

View File

@ -14,8 +14,10 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestInterceptor;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
@ -32,6 +34,7 @@ import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -108,7 +111,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
context.setJobTaskStatus(jobTaskStatus);
context.setEvaluationResult(result);
context.setLogMessage(message);
}
@ -129,6 +131,14 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
logMetaDTO.setTaskBatchId(jobTaskBatch.getId());
logMetaDTO.setJobId(SystemConstants.CALLBACK_JOB_ID);
logMetaDTO.setTaskId(jobTask.getId());
EasyRetryLog.REMOTE.info("workflowNodeId:[{}] 回调成功. <|>{}<|>", context.getWorkflowNodeId(), logMetaDTO);
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
EasyRetryLog.REMOTE.info("workflowNodeId:[{}] 回调成功. <|>{}<|>", context.getWorkflowNodeId(), logMetaDTO);
} else {
EasyRetryLog.REMOTE.info("workflowNodeId:[{}] 回调失败. 失败原因:[{}] <|>{}<|>", context.getWorkflowNodeId(),
context.getLogMessage(), logMetaDTO);
// 尝试完成任务
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
}
}
}

View File

@ -52,15 +52,14 @@ import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT
public class WorkflowBatchHandler {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
public boolean complete(Long workflowTaskBatchId) throws IOException {
public boolean complete(Long workflowTaskBatchId) {
return complete(workflowTaskBatchId, null);
}
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) throws IOException {
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) {
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
@ -83,10 +82,6 @@ public class WorkflowBatchHandler {
return false;
}
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus())
.in(WorkflowNode::getId, graph.nodes()));
Map<Long, List<JobTaskBatch>> currentWorkflowNodeMap = jobTaskBatches.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));

View File

@ -39,41 +39,36 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
protected void doHandler(WorkflowTaskPrepareDTO prepare) {
log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare));
try {
// 1. 若DAG已经支持完成了由于异常原因导致的没有更新成终态此次进行一次更新操作
int blockStrategy = prepare.getBlockStrategy();
if (workflowBatchHandler.complete(prepare.getWorkflowTaskBatchId())) {
// 开启新的任务
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间
long delay = DateUtils.toNowMilli() - prepare.getExecutionAt();
// 2. 判断DAG是否已经支持超时
// 计算超时时间到达超时时间中断任务
if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) {
log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]",
// 1. 若DAG已经支持完成了由于异常原因导致的没有更新成终态此次进行一次更新操作
int blockStrategy = prepare.getBlockStrategy();
if (workflowBatchHandler.complete(prepare.getWorkflowTaskBatchId())) {
// 开启新的任务
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间
long delay = DateUtils.toNowMilli() - prepare.getExecutionAt();
// 2. 判断DAG是否已经支持超时
// 计算超时时间到达超时时间中断任务
if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) {
log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]",
prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason());
}
// 超时停止任务
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason());
}
// 仅是超时检测的不执行阻塞策略
if (prepare.isOnlyTimeoutCheck()) {
return;
}
// 3. 支持阻塞策略同JOB逻辑一致
BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getJobTaskStop(blockStrategy);
WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext(
prepare);
blockStrategyInterface.block(workflowBlockStrategyContext);
} catch (IOException e) {
log.error("更新任务状态失败. prepare:[{}]", JsonUtil.toJsonString(prepare), e);
}
// 仅是超时检测的不执行阻塞策略
if (prepare.isOnlyTimeoutCheck()) {
return;
}
// 3. 支持阻塞策略同JOB逻辑一致
BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getJobTaskStop(blockStrategy);
WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext(
prepare);
blockStrategyInterface.block(workflowBlockStrategyContext);
}
}