feat: 2.6.0

1. 修复广播模式并发问题
This commit is contained in:
byteblogs168 2024-01-21 18:18:06 +08:00
parent 39f7041472
commit 0440e127ca
6 changed files with 71 additions and 48 deletions

View File

@ -107,6 +107,8 @@ public interface JobTaskConverter {
)
JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask);
JobExecutorResultDTO toJobExecutorResultDTO(RealJobExecutorDTO realJobExecutorDTO);
RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context);
List<JobPartitionTaskDTO> toJobPartitionTasks(List<Job> jobs);

View File

@ -35,4 +35,20 @@ public class BlockStrategyContext {
*/
private Integer taskExecutorScene;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
/**
* 工作流节点id
*/
private Long workflowNodeId;
/**
* 工作流父节点id
*/
private Long parentWorkflowNodeId;
}

View File

@ -16,6 +16,8 @@ import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
@ -31,6 +33,8 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
@ -43,13 +47,15 @@ import java.util.Objects;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class JobExecutorResultActor extends AbstractActor {
private static final String KEY = "job_complete_{0}_{1}";
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private JobTaskBatchHandler jobTaskBatchHandler;
@Autowired
private DistributedLockHandler distributedLockHandler;
@Override
public Receive createReceive() {
@ -69,30 +75,26 @@ public class JobExecutorResultActor extends AbstractActor {
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO);
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
// 存在并发问题
distributedLockHandler.lockWithDisposableAndRetry(() -> {
// 更新批次上的状态
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO);
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
}
}, MessageFormat.format(KEY, result.getTaskBatchId(), result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 3);
}
});
// LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
// // 防止客户端日志还未上报完成导致日志时序错误
// logMetaDTO.setTimestamp(DateUtils.toEpochMilli(LocalDateTime.now().plusHours(1)));
// EasyRetryLog.REMOTE.info("taskId:[{}] 任务执行成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO);
} catch (Exception e) {
EasyRetryLog.LOCAL.error(" job executor result exception. [{}]", result, e);
} finally {

View File

@ -19,6 +19,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT_COMPLETE;
/**
@ -60,6 +61,9 @@ public class JobTaskPrepareActor extends AbstractActor {
prepare.getTaskExecutorScene());
if (TaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getTaskType().getType())) {
queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId());
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.WORKFLOW.getType());
} else {
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.JOB.getType());
}
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper
@ -75,7 +79,7 @@ public class JobTaskPrepareActor extends AbstractActor {
for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) {
prepare.setExecutionAt(jobTaskBatch.getExecutionAt());
prepare.setTaskBatchId(jobTaskBatch.getId());
prepare.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId());
prepare.setWorkflowTaskBatchId(prepare.getWorkflowTaskBatchId());
prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId());
prepare.setOnlyTimeoutCheck(onlyTimeoutCheck);
for (JobPrePareHandler prePareHandler : prePareHandlers) {

View File

@ -88,12 +88,14 @@ public class RequestClientActor extends AbstractActor {
if (e.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) e;
throwable = re.getLastFailedAttempt().getExceptionCause();
taskExecuteFailure(realJobExecutorDTO, throwable.getMessage());
}
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
logMetaDTO.setTimestamp( DateUtils.toNowMilli());
EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO, throwable);
EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度失败. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO, throwable);
taskExecuteFailure(realJobExecutorDTO, throwable.getMessage());
}
}
@ -135,10 +137,7 @@ public class RequestClientActor extends AbstractActor {
private static void taskExecuteFailure(RealJobExecutorDTO realJobExecutorDTO, String message) {
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
JobExecutorResultDTO jobExecutorResultDTO = new JobExecutorResultDTO();
jobExecutorResultDTO.setTaskId(realJobExecutorDTO.getTaskId());
jobExecutorResultDTO.setJobId(realJobExecutorDTO.getJobId());
jobExecutorResultDTO.setTaskBatchId(realJobExecutorDTO.getTaskBatchId());
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(realJobExecutorDTO);
jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
jobExecutorResultDTO.setMessage(message);
actorRef.tell(jobExecutorResultDTO, actorRef);

View File

@ -60,25 +60,6 @@ public class JobTaskBatchGenerator {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
}
});
} else {
// 生成一个新的任务
jobTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
@ -98,6 +79,25 @@ public class JobTaskBatchGenerator {
// 非待处理状态无需进入时间轮中
if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
}
});
return jobTaskBatch;
}