fix(sj_1.1.0-beta2): 优化工作流代码

This commit is contained in:
opensnail 2024-07-04 11:25:43 +08:00
parent 19590c5422
commit 27adf03420

View File

@ -8,12 +8,12 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.allocate.client.ClientLoadBalanceManager;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
@ -24,7 +24,6 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
@ -90,10 +89,10 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private List<JobTask> createMergeReduceJobTasks(JobTaskGenerateContext context) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
Pair<String, Integer> clientInfo = getClientNodeInfo(context);
@ -110,7 +109,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage());
jobTask.setTaskName(MERGE_REDUCE_TASK);
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
() -> new SnailJobServerException("新增任务实例失败"));
return Lists.newArrayList(jobTask);
}
@ -120,7 +119,8 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
int reduceParallel = 1;
String jobParams = null;
try {
MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(), MapReduceArgsStrDTO.class);
MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(),
MapReduceArgsStrDTO.class);
reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1);
jobParams = mapReduceArgsStrDTO.getArgsStr();
} catch (Exception e) {
@ -128,10 +128,10 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
}
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
.select(JobTask::getResultMessage, JobTask::getId)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
if (CollUtil.isEmpty(jobTasks)) {
@ -187,10 +187,11 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
}
// 判定父节点是不是叶子节点若是则不更新否则更新为非叶子节点
List<JobTask> parentJobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1),
new LambdaQueryWrapper<JobTask>().select(JobTask::getId)
.eq(JobTask::getId, context.getParentId())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
JobTask parentJobTask = jobTaskMapper.selectOne(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId)
.eq(JobTask::getId, context.getParentId())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
@ -226,26 +227,26 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
batchSaveJobTasks(jobTasks);
// 更新父节点的为非叶子节点
if (CollUtil.isNotEmpty(parentJobTasks)) {
if (Objects.nonNull(parentJobTask)) {
JobTask parentJobTask = new JobTask();
parentJobTask.setId(context.getParentId());
parentJobTask.setLeaf(StatusEnum.NO.getStatus());
jobTaskMapper.updateById(parentJobTask);
Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask),
() -> new SnailJobMapReduceException("更新父节点失败"));
}
}
});
return jobTasks;
}
private Pair<String, Integer> getClientNodeInfo(JobTaskGenerateContext context) {
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(
context.getJobId().toString(),
context.getGroupName(),
context.getNamespaceId(),
ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType()
context.getJobId().toString(),
context.getGroupName(),
context.getNamespaceId(),
ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType()
);
String clientInfo = StrUtil.EMPTY;
int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus();