diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java index ad2fef16..a9261c5f 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java @@ -16,13 +16,12 @@ import java.util.Objects; public class JobArgs { /** - * 此字段不在投递任何参数 - * see: jobParams + * 此字段即将废弃,请使用see: jobParams */ @Deprecated private String argsStr; - private String jobParams; + private Object jobParams; private String executorInfo; diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java index 2e552e0a..3006a637 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java @@ -15,6 +15,6 @@ public class MapArgs extends JobArgs { private String taskName; - private String mapResult; + private Object mapResult; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 07414ed0..3eeb378a 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -24,7 +24,9 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -100,6 +102,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildJobArgs(JobContext jobContext) { JobArgs jobArgs = new JobArgs(); + // 下一个版本即将删除,本期兼容此问题 + jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams())); jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); @@ -109,6 +113,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildShardingJobArgs(JobContext jobContext) { ShardingJobArgs jobArgs = new ShardingJobArgs(); jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); + // 下一个版本即将删除,本期兼容此问题 + jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams())); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setShardingIndex(jobContext.getShardingIndex()); jobArgs.setShardingTotal(jobContext.getShardingTotal()); @@ -118,6 +124,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildMapJobArgs(JobContext jobContext) { MapArgs jobArgs = new MapArgs(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); + // 下一个版本即将删除,本期兼容此问题 + jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams())); jobArgs.setJobParams(jobArgsHolder.getJobParams()); jobArgs.setMapResult(jobArgsHolder.getMaps()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); @@ -130,9 +138,13 @@ public abstract class AbstractJobExecutor implements IJobExecutor { ReduceArgs jobArgs = new ReduceArgs(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); jobArgs.setJobParams(jobArgsHolder.getJobParams()); - String maps = jobArgsHolder.getMaps(); - if (StrUtil.isNotBlank(maps)) { - jobArgs.setMapResult(JsonUtil.parseList(maps, Object.class)); + Object maps = jobArgsHolder.getMaps(); + if (Objects.nonNull(maps)) { + if (maps instanceof String) { + jobArgs.setMapResult(JsonUtil.parseList((String) maps, Object.class)); + } else { + jobArgs.setMapResult((List) maps); + } } jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); @@ -144,10 +156,15 @@ public abstract class AbstractJobExecutor implements IJobExecutor { MergeReduceArgs jobArgs = new MergeReduceArgs(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); jobArgs.setJobParams(jobArgsHolder.getJobParams()); - String reduces = jobArgsHolder.getReduces(); - if (StrUtil.isNotBlank(reduces)) { - jobArgs.setReduces(JsonUtil.parseList(reduces, Object.class)); + Object reduces = jobArgsHolder.getReduces(); + if (Objects.nonNull(reduces)) { + if (reduces instanceof String) { + jobArgs.setReduces(JsonUtil.parseList((String) reduces, Object.class)); + } else { + jobArgs.setReduces((List) reduces); + } } + jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); jobArgs.setWfContext(jobContext.getWfContext()); diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java index 02ac88d3..10eb54fe 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java @@ -16,16 +16,16 @@ public class JobArgsHolder { /** * sj_job表输入的参数 */ - private String jobParams; + private Object jobParams; /** * 动态分片 map节点的结果 */ - private String maps; + private Object maps; /** * 动态分片 reduce执行的结果 */ - private String reduces; + private Object reduces; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 3a3439c8..f250c69d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -57,7 +57,12 @@ public class JobExecutorResultActor extends AbstractActor { jobTask.setTaskStatus(result.getTaskStatus()); jobTask.setWfContext(result.getWfContext()); if (Objects.nonNull(result.getResult())) { - jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); + if (result.getResult() instanceof String) { + jobTask.setResultMessage((String) result.getResult()); + } else { + jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); + } + } Assert.isTrue(1 == jobTaskMapper.update(jobTask, diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 107c796d..72e2b7f9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -97,7 +97,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); jobTask.setArgsType(context.getArgsType()); - JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(context.getArgsStr()); jobArgsHolder.setReduces(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage))); @@ -152,7 +151,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(finalJobParams); - jobArgsHolder.setMaps(JsonUtil.toJsonString(partition.get(index))); + jobArgsHolder.setMaps(partition.get(index)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); @@ -203,7 +202,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(context.getArgsStr()); - jobArgsHolder.setMaps(JsonUtil.toJsonString(mapSubTask.get(index))); + jobArgsHolder.setMaps(mapSubTask.get(index)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());