feat: 2.6.0

1. 优化重试导致事务超时问题
This commit is contained in:
byteblogs168 2024-01-26 18:14:52 +08:00
parent e3206c611f
commit 0d9f885cc5
4 changed files with 52 additions and 41 deletions

View File

@ -62,9 +62,6 @@ public class JobExecutorResultActor extends AbstractActor {
return receiveBuilder().match(JobExecutorResultDTO.class, result -> { return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
try { try {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
JobTask jobTask = new JobTask(); JobTask jobTask = new JobTask();
jobTask.setTaskStatus(result.getTaskStatus()); jobTask.setTaskStatus(result.getTaskStatus());
if (Objects.nonNull(result.getResult())) { if (Objects.nonNull(result.getResult())) {
@ -81,10 +78,8 @@ public class JobExecutorResultActor extends AbstractActor {
distributedLockHandler.lockWithDisposableAndRetry(() -> { distributedLockHandler.lockWithDisposableAndRetry(() -> {
tryCompleteAndStop(result); tryCompleteAndStop(result);
}, MessageFormat.format(KEY, result.getTaskBatchId(), }, MessageFormat.format(KEY, result.getTaskBatchId(),
result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 6); result.getJobId()), Duration.ofSeconds(2), Duration.ofSeconds(1), 3);
} }
}
});
} catch (Exception e) { } catch (Exception e) {
EasyRetryLog.LOCAL.error(" job executor result exception. [{}]", result, e); EasyRetryLog.LOCAL.error(" job executor result exception. [{}]", result, e);
} finally { } finally {

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum; import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
@ -80,7 +81,9 @@ public class WorkflowExecutorActor extends AbstractActor {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId()); WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
if (SystemConstants.ROOT.equals(taskExecute.getParentId()) && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason()); handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason());
}
// 获取DAG图 // 获取DAG图
String flowInfo = workflowTaskBatch.getFlowInfo(); String flowInfo = workflowTaskBatch.getFlowInfo();

View File

@ -80,24 +80,18 @@ public class JobTaskBatchGenerator {
// 非待处理状态无需进入时间轮中 // 非待处理状态无需进入时间轮中
if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) { if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override @Override
public void afterCompletion(int status) { public void afterCompletion(int status) {
if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) { openNextNode(context);
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
} }
}); });
} else {
openNextNode(context);
}
return jobTaskBatch; return jobTaskBatch;
} }
@ -115,4 +109,24 @@ public class JobTaskBatchGenerator {
return jobTaskBatch; return jobTaskBatch;
} }
/**
* 工作流开启下一个节点
* @param context
*/
private static void openNextNode(final JobTaskBatchGeneratorContext context) {
if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
}
} }

View File

@ -50,7 +50,6 @@ public class ReportLogHttpRequestHandler extends PostHttpRequestHandler {
} }
@Override @Override
@Transactional
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) { public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
EasyRetryLog.LOCAL.info("Begin Handler Log Report Data. [{}]", content); EasyRetryLog.LOCAL.info("Begin Handler Log Report Data. [{}]", content);