feat(sj_1.1.0-beta2): 优化参数信息
This commit is contained in:
parent
dd03b837ef
commit
ada349a88f
@ -16,13 +16,12 @@ import java.util.Objects;
|
||||
public class JobArgs {
|
||||
|
||||
/**
|
||||
* 此字段不在投递任何参数
|
||||
* see: jobParams
|
||||
* 此字段即将废弃,请使用see: jobParams
|
||||
*/
|
||||
@Deprecated
|
||||
private String argsStr;
|
||||
|
||||
private String jobParams;
|
||||
private Object jobParams;
|
||||
|
||||
private String executorInfo;
|
||||
|
||||
|
@ -15,6 +15,6 @@ public class MapArgs extends JobArgs {
|
||||
|
||||
private String taskName;
|
||||
|
||||
private String mapResult;
|
||||
private Object mapResult;
|
||||
|
||||
}
|
||||
|
@ -24,7 +24,9 @@ import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -100,6 +102,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
|
||||
private static JobArgs buildJobArgs(JobContext jobContext) {
|
||||
JobArgs jobArgs = new JobArgs();
|
||||
// 下一个版本即将删除,本期兼容此问题
|
||||
jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams()));
|
||||
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
@ -109,6 +113,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
private static JobArgs buildShardingJobArgs(JobContext jobContext) {
|
||||
ShardingJobArgs jobArgs = new ShardingJobArgs();
|
||||
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
|
||||
// 下一个版本即将删除,本期兼容此问题
|
||||
jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams()));
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setShardingIndex(jobContext.getShardingIndex());
|
||||
jobArgs.setShardingTotal(jobContext.getShardingTotal());
|
||||
@ -118,6 +124,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
private static JobArgs buildMapJobArgs(JobContext jobContext) {
|
||||
MapArgs jobArgs = new MapArgs();
|
||||
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
|
||||
// 下一个版本即将删除,本期兼容此问题
|
||||
jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams()));
|
||||
jobArgs.setJobParams(jobArgsHolder.getJobParams());
|
||||
jobArgs.setMapResult(jobArgsHolder.getMaps());
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
@ -130,9 +138,13 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
ReduceArgs jobArgs = new ReduceArgs();
|
||||
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
|
||||
jobArgs.setJobParams(jobArgsHolder.getJobParams());
|
||||
String maps = jobArgsHolder.getMaps();
|
||||
if (StrUtil.isNotBlank(maps)) {
|
||||
jobArgs.setMapResult(JsonUtil.parseList(maps, Object.class));
|
||||
Object maps = jobArgsHolder.getMaps();
|
||||
if (Objects.nonNull(maps)) {
|
||||
if (maps instanceof String) {
|
||||
jobArgs.setMapResult(JsonUtil.parseList((String) maps, Object.class));
|
||||
} else {
|
||||
jobArgs.setMapResult((List<?>) maps);
|
||||
}
|
||||
}
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
@ -144,10 +156,15 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
MergeReduceArgs jobArgs = new MergeReduceArgs();
|
||||
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
|
||||
jobArgs.setJobParams(jobArgsHolder.getJobParams());
|
||||
String reduces = jobArgsHolder.getReduces();
|
||||
if (StrUtil.isNotBlank(reduces)) {
|
||||
jobArgs.setReduces(JsonUtil.parseList(reduces, Object.class));
|
||||
Object reduces = jobArgsHolder.getReduces();
|
||||
if (Objects.nonNull(reduces)) {
|
||||
if (reduces instanceof String) {
|
||||
jobArgs.setReduces(JsonUtil.parseList((String) reduces, Object.class));
|
||||
} else {
|
||||
jobArgs.setReduces((List<?>) reduces);
|
||||
}
|
||||
}
|
||||
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
jobArgs.setWfContext(jobContext.getWfContext());
|
||||
|
@ -16,16 +16,16 @@ public class JobArgsHolder {
|
||||
/**
|
||||
* sj_job表输入的参数
|
||||
*/
|
||||
private String jobParams;
|
||||
private Object jobParams;
|
||||
|
||||
/**
|
||||
* 动态分片 map节点的结果
|
||||
*/
|
||||
private String maps;
|
||||
private Object maps;
|
||||
|
||||
/**
|
||||
* 动态分片 reduce执行的结果
|
||||
*/
|
||||
private String reduces;
|
||||
private Object reduces;
|
||||
|
||||
}
|
||||
|
@ -57,7 +57,12 @@ public class JobExecutorResultActor extends AbstractActor {
|
||||
jobTask.setTaskStatus(result.getTaskStatus());
|
||||
jobTask.setWfContext(result.getWfContext());
|
||||
if (Objects.nonNull(result.getResult())) {
|
||||
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
|
||||
if (result.getResult() instanceof String) {
|
||||
jobTask.setResultMessage((String) result.getResult());
|
||||
} else {
|
||||
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
|
||||
|
@ -97,7 +97,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
||||
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
|
||||
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(context.getArgsStr());
|
||||
jobArgsHolder.setReduces(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
|
||||
@ -152,7 +151,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(finalJobParams);
|
||||
jobArgsHolder.setMaps(JsonUtil.toJsonString(partition.get(index)));
|
||||
jobArgsHolder.setMaps(partition.get(index));
|
||||
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||
@ -203,7 +202,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(context.getArgsStr());
|
||||
jobArgsHolder.setMaps(JsonUtil.toJsonString(mapSubTask.get(index)));
|
||||
jobArgsHolder.setMaps(mapSubTask.get(index));
|
||||
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
|
||||
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
|
Loading…
Reference in New Issue
Block a user