fix(sj_1.1.0): 优化定时任务参数传递形式

1. 使用JobArgsHolder传递执行参数
2. 支持MAP的类型任务
3. 任务生成采用批量插入
This commit is contained in:
opensnail 2024-06-19 14:28:59 +08:00
parent e544f31c4a
commit 40ee403534
15 changed files with 204 additions and 32 deletions

View File

@ -25,7 +25,7 @@
<dingding-talk.version>1.0.0</dingding-talk.version>
<netty-all.version>4.1.94.Final</netty-all.version>
<hutool-all.version>5.8.25</hutool-all.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<mybatis-plus.version>3.5.7</mybatis-plus.version>
<alibaba-dingtalk.version>2.0.0</alibaba-dingtalk.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
<tinylog.version>1.3.6</tinylog.version>

View File

@ -18,6 +18,7 @@ import com.aizuda.snailjob.client.model.StopJobDTO;
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -113,7 +114,6 @@ public class JobEndPoint {
jobContext.setParallelNum(dispatchJob.getParallelNum());
jobContext.setTaskType(dispatchJob.getTaskType());
jobContext.setExecutorTimeout(dispatchJob.getExecutorTimeout());
jobContext.setArgsStr(dispatchJob.getArgsStr());
jobContext.setWorkflowNodeId(dispatchJob.getWorkflowNodeId());
jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId());
jobContext.setRetry(dispatchJob.isRetry());
@ -121,6 +121,20 @@ public class JobEndPoint {
jobContext.setTaskName(dispatchJob.getTaskName());
jobContext.setMrStage(dispatchJob.getMrStage());
if (StrUtil.isNotBlank(dispatchJob.getArgsStr())) {
try {
jobContext.setJobArgsHolder(JsonUtil.parseObject(dispatchJob.getArgsStr(), JobArgsHolder.class));
} catch (Exception e) {
SnailJobLog.REMOTE.warn("workflow context parse error", e);
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(dispatchJob.getArgsStr());
jobContext.setJobArgsHolder(jobArgsHolder);
}
} else {
// 没有数据给个空对象方便后面取参数
jobContext.setJobArgsHolder(new JobArgsHolder());
}
String wfContext = dispatchJob.getWfContext();
if (StrUtil.isNotBlank(wfContext)) {
try {

View File

@ -15,8 +15,15 @@ import java.util.Objects;
@Data
public class JobArgs {
/**
* 此字段不在投递任何参数
* see: jobParams
*/
@Deprecated
private String argsStr;
private String jobParams;
private String executorInfo;
private Long taskBatchId;

View File

@ -15,4 +15,6 @@ public class MapArgs extends JobArgs {
private String taskName;
private String mapResult;
}

View File

@ -12,7 +12,9 @@ import com.aizuda.snailjob.client.job.core.timer.TimerManager;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -22,6 +24,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -96,7 +99,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildJobArgs(JobContext jobContext) {
JobArgs jobArgs = new JobArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
return jobArgs;
@ -104,7 +107,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildShardingJobArgs(JobContext jobContext) {
ShardingJobArgs jobArgs = new ShardingJobArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setShardingIndex(jobContext.getShardingIndex());
jobArgs.setShardingTotal(jobContext.getShardingTotal());
@ -113,7 +116,9 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildMapJobArgs(JobContext jobContext) {
MapArgs jobArgs = new MapArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
jobArgs.setJobParams(jobArgsHolder.getJobParams());
jobArgs.setMapResult(jobArgsHolder.getMapResult());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskName(jobContext.getTaskName());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
@ -122,7 +127,9 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildReduceJobArgs(JobContext jobContext) {
ReduceArgs jobArgs = new ReduceArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
jobArgs.setJobParams(jobArgsHolder.getJobParams());
jobArgs.setMapResult(JsonUtil.parseList(jobArgsHolder.getMapResult(), List.class));
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
jobArgs.setWfContext(jobContext.getWfContext());

View File

@ -0,0 +1,31 @@
package com.aizuda.snailjob.common.core.model;
import lombok.Data;
/**
* 定时任务 sj_job_task的args_str对应的
* 参数模型
*
* @author: opensnail
* @date : 2024-06-19
* @since : sj_1.1.0
*/
@Data
public class JobArgsHolder {
/**
* sj_job表输入的参数
*/
private String jobParams;
/**
* 动态分片 map节点的结果
*/
private String mapResult;
/**
* 动态分片 reduce执行的结果
*/
private String reduceResult;
}

View File

@ -38,7 +38,7 @@ public class JobContext {
private Integer executorTimeout;
private String argsStr;
// private String argsStr;
/**
* 重试场景 automanual
@ -70,5 +70,9 @@ public class JobContext {
*/
private Map<String, Object> wfContext;
/**
* 定时任务参数
*/
private JobArgsHolder jobArgsHolder;
}

View File

@ -0,0 +1,22 @@
package com.aizuda.snailjob.server.job.task.support.executor.job;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import org.springframework.stereotype.Component;
/**
* @author: shuguang.zhang
* @date : 2024-06-19
*/
@Component
public class MapJobExecutor extends MapReduceJobExecutor {
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP;
}
@Override
protected void doExecute(final JobExecutorContext context) {
super.doExecute(context);
}
}

View File

@ -5,6 +5,8 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
@ -57,15 +59,18 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(serverNode));
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(context.getArgsStr());
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
clientInfoSet.add(address);
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
return jobTasks;
}

View File

@ -4,6 +4,8 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
@ -53,7 +55,9 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(serverNode));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(context.getArgsStr());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));

View File

@ -9,7 +9,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @date 2023-10-02 13:04:09
* @since 2.4.0
*/
public class JobTaskGeneratorFactory {
public final class JobTaskGeneratorFactory {
private static final ConcurrentHashMap<JobTaskTypeEnum, JobTaskGenerator> CACHE = new ConcurrentHashMap<>();

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -28,7 +29,6 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.*;
import java.util.stream.Collectors;
/**
* 生成Map Reduce任务
@ -95,7 +95,11 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setReduceResult(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage());
@ -135,16 +139,23 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index)));
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setMapResult(JsonUtil.toJsonString(partition.get(index)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage());
jobTask.setTaskName("REDUCE_TASK");
// TODO 改批量插入
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
// () -> new SnailJobServerException("新增任务实例失败"));
finalJobTasks.add(jobTask);
}
Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insert(finalJobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
}
});
@ -179,18 +190,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(context.getArgsStr());
jobArgsHolder.setMapResult(JsonUtil.toJsonString(mapSubTask.get(index)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
jobTask.setTaskName(context.getTaskName());
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
// TODO 改批量插入
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
// () -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
// 更新父节点的为非叶子节点
if (CollUtil.isNotEmpty(parentJobTasks)) {
JobTask parentJobTask = new JobTask();

View File

@ -0,0 +1,33 @@
package com.aizuda.snailjob.server.job.task.support.generator.task;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.List;
/**
* @author: opensnail
* @date : 2024-06-19
* @since : sj_1.1.0
*/
@Component
public class MapTaskGenerator extends MapReduceTaskGenerator {
public MapTaskGenerator(final JobTaskMapper jobTaskMapper,
final TransactionTemplate transactionTemplate) {
super(jobTaskMapper, transactionTemplate);
}
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP;
}
@Override
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
return super.doGenerate(context);
}
}

View File

@ -5,7 +5,9 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
@ -15,6 +17,7 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -33,13 +36,10 @@ import java.util.Set;
* @since 2.4.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
private final ClientNodeAllocateHandler clientNodeAllocateHandler;
private final JobTaskMapper jobTaskMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
@ -51,13 +51,13 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(serverNodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
String argsStr = context.getArgsStr();
if (StrUtil.isBlank(argsStr)) {
log.error("切片参数为空. jobId:[{}]", context.getJobId());
SnailJobLog.LOCAL.error("切片参数为空. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
@ -65,7 +65,7 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
try {
argsStrs = JsonUtil.parseList(argsStr, String.class);
} catch (Exception e) {
log.error("切片参数解析失败. jobId:[{}]", context.getJobId(), e);
SnailJobLog.LOCAL.error("切片参数解析失败. jobId:[{}]", context.getJobId(), e);
return Lists.newArrayList();
}
@ -77,13 +77,18 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(argsStrs.get(index));
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(argsStrs.get(index));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
return jobTasks;
}

View File

@ -0,0 +1,23 @@
package com.aizuda.snailjob.server.job.task.support.stop;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-06-19
* @since : sj_1.1.0
*/
@Component
public class MapTaskStopHandler extends MapReduceTaskStopHandler {
@Override
public JobTaskTypeEnum getTaskType() {
return JobTaskTypeEnum.MAP;
}
@Override
protected void doStop(final TaskStopJobContext context) {
super.doStop(context);
}
}