fix(sj_1.1.0-beta2): 优化事务问题

This commit is contained in:
opensnail 2024-07-04 13:03:47 +08:00
parent 27adf03420
commit 3a4382a9ab
2 changed files with 56 additions and 54 deletions

View File

@ -149,20 +149,27 @@ public class JobExecutorActor extends AbstractActor {
return;
}
// 获取工作流的上下文
WorkflowTaskBatch workflowTaskBatch = null;
Long workflowTaskBatchId = taskExecute.getWorkflowTaskBatchId();
if (Objects.nonNull(workflowTaskBatchId)) {
workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, taskExecute.getWorkflowTaskBatchId())
);
}
// 事务提交以后再执行任务
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(final int status) {
// 获取工作流的上下文
WorkflowTaskBatch workflowTaskBatch = null;
Long workflowTaskBatchId = taskExecute.getWorkflowTaskBatchId();
if (Objects.nonNull(workflowTaskBatchId)) {
workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, taskExecute.getWorkflowTaskBatchId())
);
}
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList, workflowTaskBatch));
}
});
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList, workflowTaskBatch));
} finally {
log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
final int finalTaskStatus = taskStatus;

View File

@ -29,6 +29,7 @@ import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
@ -178,7 +179,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
return finalJobTasks;
}
@NotNull
private List<JobTask> createMapJobTasks(final JobTaskGenerateContext context) {
List<?> mapSubTask = context.getMapSubTask();
if (CollUtil.isEmpty(mapSubTask)) {
@ -194,51 +194,46 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
return transactionTemplate.execute(status -> {
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
for (int index = 0; index < mapSubTask.size(); index++) {
Pair<String, Integer> clientInfo = getClientNodeInfo(context);
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setMaps(mapSubTask.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
jobTask.setTaskStatus(clientInfo.getValue());
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
jobTask.setTaskName(context.getTaskName());
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId());
jobTask.setRetryCount(0);
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTasks.add(jobTask);
}
for (int index = 0; index < mapSubTask.size(); index++) {
Pair<String, Integer> clientInfo = getClientNodeInfo(context);
batchSaveJobTasks(jobTasks);
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(clientInfo.getKey());
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setMaps(mapSubTask.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
jobTask.setTaskStatus(clientInfo.getValue());
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
jobTask.setTaskName(context.getTaskName());
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId());
jobTask.setRetryCount(0);
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTasks.add(jobTask);
}
// 更新父节点的为非叶子节点
if (Objects.nonNull(parentJobTask)) {
JobTask parentJobTask1 = new JobTask();
parentJobTask1.setId(context.getParentId());
parentJobTask1.setLeaf(StatusEnum.NO.getStatus());
Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask1),
() -> new SnailJobMapReduceException("更新父节点失败"));
}
batchSaveJobTasks(jobTasks);
return jobTasks;
});
// 更新父节点的为非叶子节点
if (Objects.nonNull(parentJobTask)) {
JobTask parentJobTask = new JobTask();
parentJobTask.setId(context.getParentId());
parentJobTask.setLeaf(StatusEnum.NO.getStatus());
Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask),
() -> new SnailJobMapReduceException("更新父节点失败"));
}
}
});
return jobTasks;
}
private Pair<String, Integer> getClientNodeInfo(JobTaskGenerateContext context) {