feat(sj_1.1.0-beta2): 完成RECOVERY功能, 批量新增jobTask兼容oracle数据

This commit is contained in:
opensnail 2024-06-26 12:29:17 +08:00
parent ca2ada3875
commit b60892b76f
8 changed files with 79 additions and 26 deletions

View File

@ -5,6 +5,8 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 阻塞策略针对处于待处理 or 运行中的批次做了一种异常容错策略
*
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
@ -12,11 +14,21 @@ import lombok.Getter;
@AllArgsConstructor
@Getter
public enum BlockStrategyEnum {
/**
* 不创建新的批次等待当前批次执行完成
*/
DISCARD(1),
/**
* 停止当前的批次然后新增一个新的批次
*/
OVERLAY(2),
/**
* 每次都创建一个新的批次
*/
CONCURRENCY(3),
/**
* 丢弃新的并重新触发老的任务(失败的任务)
* 不创建新的批次, 重新执行当前的批次中已经失败的任务
*/
RECOVERY(4);
;

View File

@ -20,6 +20,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@ -36,8 +37,10 @@ import java.util.stream.Stream;
@Component
@RequiredArgsConstructor
public class RecoveryBlockStrategy extends AbstracJobBlockStrategy {
private final JobTaskMapper jobTaskMapper;
private final JobMapper jobMapper;
@Override
protected void doBlock(BlockStrategyContext context) {
Assert.notNull(context.getJobId(), () -> new SnailJobServerException("job id can not be null"));
@ -45,9 +48,8 @@ public class RecoveryBlockStrategy extends AbstracJobBlockStrategy {
Assert.notNull(context.getTaskType(), () -> new SnailJobServerException("task type can not be null"));
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId, JobTask::getTaskStatus)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
);
// 若任务项为空则生成任务项
@ -67,7 +69,9 @@ public class RecoveryBlockStrategy extends AbstracJobBlockStrategy {
// 执行任务 Stop or Fail 任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(context.getTaskType());
jobExecutor.execute(buildJobExecutorContext(context, job,
StreamUtils.filter(jobTasks, (jobTask) -> JobTaskStatusEnum.NOT_SUCCESS.contains(jobTask.getTaskStatus()))));
StreamUtils.filter(jobTasks,
(jobTask) -> JobTaskStatusEnum.NOT_SUCCESS.contains(jobTask.getTaskStatus())
|| JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))));
}
@Override
@ -76,7 +80,7 @@ public class RecoveryBlockStrategy extends AbstracJobBlockStrategy {
}
private static JobExecutorContext buildJobExecutorContext(BlockStrategyContext strategyContext, Job job,
List<JobTask> taskList) {
List<JobTask> taskList) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(strategyContext.getTaskBatchId());

View File

@ -1,8 +1,10 @@
package com.aizuda.snailjob.server.job.task.support.executor.job;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
@ -30,6 +32,10 @@ public class ClusterJobExecutor extends AbstractJobExecutor {
// 调度客户端
List<JobTask> taskList = context.getTaskList();
if (CollUtil.isEmpty(taskList)) {
throw new SnailJobServerException("No executable job task");
}
JobTask jobTask = taskList.get(0);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));

View File

@ -1,8 +1,17 @@
package com.aizuda.snailjob.server.job.task.support.generator.task;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
import org.apache.ibatis.executor.BatchResult;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Arrays;
import java.util.List;
/**
@ -12,6 +21,9 @@ import java.util.List;
*/
public abstract class AbstractJobTaskGenerator implements JobTaskGenerator, InitializingBean {
@Autowired
private JobTaskMapper jobTaskMapper;
@Override
public List<JobTask> generate(JobTaskGenerateContext context) {
return doGenerate(context);
@ -23,4 +35,21 @@ public abstract class AbstractJobTaskGenerator implements JobTaskGenerator, Init
public void afterPropertiesSet() throws Exception {
JobTaskGeneratorFactory.registerTaskInstance(getTaskInstanceType(), this);
}
protected void batchSaveJobTasks(List<JobTask> jobTasks) {
// ORACLE 批次插入不能直接返回id因此此处特殊处理
// 后期版本会对snail-job-datasource进行重构在考虑此处的兼容逻辑
if (DbUtils.getDbType().getDb().equals(DbTypeEnum.ORACLE.getDb())) {
List<BatchResult> inserts = jobTaskMapper.insert(jobTasks);
if (CollUtil.isNotEmpty(inserts)) {
BatchResult batchResult = inserts.get(0);
Assert.isTrue(jobTasks.size() == Arrays.stream(batchResult.getUpdateCounts()).sum(), () -> new SnailJobServerException("新增任务实例失败"));
} else {
throw new SnailJobServerException("新增任务实例失败");
}
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
}
}

View File

@ -8,6 +8,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
@ -16,6 +17,7 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -30,11 +32,10 @@ import java.util.*;
* @since 2.4.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
@Autowired
private JobTaskMapper jobTaskMapper;
private static final String TASK_NAME ="BROADCAST_TASK";
private final JobTaskMapper jobTaskMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
@ -46,7 +47,7 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
public List<JobTask> doGenerate(JobTaskGenerateContext context) {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(serverNodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
@ -70,13 +71,14 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setParentId(0L);
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setRetryCount(0);
jobTask.setTaskName(TASK_NAME);
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
clientInfoSet.add(address);
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
batchSaveJobTasks(jobTasks);
return jobTasks;
}

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
@ -15,6 +16,7 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -29,13 +31,11 @@ import java.util.Optional;
* @since 2.4.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
private static final String TASK_NAME ="CLUSTER_TASK";
private final ClientNodeAllocateHandler clientNodeAllocateHandler;
private final JobTaskMapper jobTaskMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
@ -48,7 +48,7 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(),
context.getGroupName(), context.getNamespaceId(), context.getRouteKey());
if (Objects.isNull(serverNode)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
@ -60,6 +60,7 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
jobArgsHolder.setJobParams(context.getArgsStr());
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setTaskName(TASK_NAME);
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));

View File

@ -140,6 +140,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTasks = new ArrayList<>(partition.size());
final List<JobTask> finalJobTasks = jobTasks;
String finalJobParams = jobParams;
final List<JobTask> finalJobTasks1 = jobTasks;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
@ -165,8 +166,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
finalJobTasks.add(jobTask);
}
Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insertBatch(finalJobTasks), () -> new SnailJobServerException("新增任务实例失败"));
batchSaveJobTasks(finalJobTasks1);
}
});
@ -218,7 +218,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
batchSaveJobTasks(jobTasks);
// 更新父节点的为非叶子节点
if (CollUtil.isNotEmpty(parentJobTasks)) {

View File

@ -41,7 +41,7 @@ import java.util.Set;
@Component
@RequiredArgsConstructor
public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
private final ClientNodeAllocateHandler clientNodeAllocateHandler;
private static final String TASK_NAME ="SHARDING_TASK";
private final JobTaskMapper jobTaskMapper;
@Override
@ -90,12 +90,11 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setCreateDt(LocalDateTime.now());
jobTask.setUpdateDt(LocalDateTime.now());
jobTask.setTaskName(TASK_NAME);
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
batchSaveJobTasks(jobTasks);
return jobTasks;
}