feat(sj_1.1.0-beta2): 优化批次插入任务

This commit is contained in:
opensnail 2024-07-07 18:56:57 +08:00
parent 8de35d775d
commit d9cd271be2
4 changed files with 24 additions and 25 deletions

View File

@ -1138,7 +1138,7 @@ GO
-- sj_distributed_lock
CREATE TABLE sj_distributed_lock
(
name nvarchar(64) NOT NULL PRIMARY KEY IDENTITY,
name nvarchar(64) NOT NULL PRIMARY KEY,
lock_until datetime2 NOT NULL DEFAULT CURRENT_TIMESTAMP,
locked_at datetime2 NOT NULL DEFAULT CURRENT_TIMESTAMP,
locked_by nvarchar(255) NOT NULL,

View File

@ -4,7 +4,6 @@ import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
@ -63,7 +62,7 @@ public class ReduceActor extends AbstractActor {
Assert.notNull(reduceTask.getMrStage(), () -> new SnailJobServerException("mrStage can not be null"));
Assert.notNull(reduceTask.getJobId(), () -> new SnailJobServerException("jobId can not be null"));
Assert.notNull(reduceTask.getTaskBatchId(),
() -> new SnailJobServerException("taskBatchId can not be null"));
() -> new SnailJobServerException("taskBatchId can not be null"));
String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId());
distributedLockHandler.lockWithDisposableAndRetry(() -> {
doReduce(reduceTask);
@ -78,10 +77,11 @@ public class ReduceActor extends AbstractActor {
private void doReduce(final ReduceTaskDTO reduceTask) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1),
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId)
.eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId())
.eq(JobTask::getMrStage, reduceTask.getMrStage())
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId)
.eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId())
.eq(JobTask::getMrStage, reduceTask.getMrStage())
.orderByAsc(JobTask::getId)
);
if (CollUtil.isNotEmpty(jobTasks)) {
@ -110,9 +110,9 @@ public class ReduceActor extends AbstractActor {
String wfContext = null;
if (Objects.nonNull(reduceTask.getWorkflowTaskBatchId())) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getId)
.eq(WorkflowTaskBatch::getId, reduceTask.getWorkflowTaskBatchId())
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getId)
.eq(WorkflowTaskBatch::getId, reduceTask.getWorkflowTaskBatchId())
);
wfContext = workflowTaskBatch.getWfContext();
}
@ -124,10 +124,10 @@ public class ReduceActor extends AbstractActor {
}
private static JobExecutorContext buildJobExecutorContext(
ReduceTaskDTO reduceTask,
Job job,
List<JobTask> taskList,
String wfContext) {
ReduceTaskDTO reduceTask,
Job job,
List<JobTask> taskList,
String wfContext) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(reduceTask.getTaskBatchId());

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
import com.google.common.collect.Sets;
import org.apache.ibatis.executor.BatchResult;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@ -39,17 +40,14 @@ public abstract class AbstractJobTaskGenerator implements JobTaskGenerator, Init
protected void batchSaveJobTasks(List<JobTask> jobTasks) {
// ORACLE 批次插入不能直接返回id因此此处特殊处理
// 后期版本会对snail-job-datasource进行重构在考虑此处的兼容逻辑
if (DbUtils.getDbType().getDb().equals(DbTypeEnum.ORACLE.getDb())) {
List<BatchResult> inserts = jobTaskMapper.insert(jobTasks);
if (CollUtil.isNotEmpty(inserts)) {
BatchResult batchResult = inserts.get(0);
Assert.isTrue(jobTasks.size() == Arrays.stream(batchResult.getUpdateCounts()).sum(), () -> new SnailJobServerException("新增任务实例失败"));
} else {
throw new SnailJobServerException("新增任务实例失败");
if (Sets.newHashSet(DbTypeEnum.ORACLE.getDb(), DbTypeEnum.SQLSERVER.getDb())
.contains(DbUtils.getDbType().getDb())) {
// sqlserver oracle 不支持返回批量id,故暂时先这样处理
for (JobTask jobTask : jobTasks) {
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
}
} else {
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
}
}

View File

@ -124,6 +124,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
MapReduceArgsStrDTO.class);
reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1);
jobParams = mapReduceArgsStrDTO.getArgsStr();
reduceParallel = Math.max(1, reduceParallel);
} catch (Exception e) {
SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr());
}
@ -190,7 +191,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
JobTask parentJobTask = jobTaskMapper.selectOne(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId)
.eq(JobTask::getId, context.getParentId())
.eq(JobTask::getId, Optional.ofNullable(context.getParentId()).orElse(0L))
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);