diff --git a/pom.xml b/pom.xml index e72ac3da..3f21f751 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ 1.0.0 4.1.94.Final 5.8.25 - 3.5.5 + 3.5.7 2.0.0 2.0.0 1.3.6 diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 84c89245..bbb6231b 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -18,6 +18,7 @@ import com.aizuda.snailjob.client.model.StopJobDTO; import com.aizuda.snailjob.client.model.request.DispatchJobRequest; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -113,7 +114,6 @@ public class JobEndPoint { jobContext.setParallelNum(dispatchJob.getParallelNum()); jobContext.setTaskType(dispatchJob.getTaskType()); jobContext.setExecutorTimeout(dispatchJob.getExecutorTimeout()); - jobContext.setArgsStr(dispatchJob.getArgsStr()); jobContext.setWorkflowNodeId(dispatchJob.getWorkflowNodeId()); jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId()); jobContext.setRetry(dispatchJob.isRetry()); @@ -121,6 +121,20 @@ public class JobEndPoint { jobContext.setTaskName(dispatchJob.getTaskName()); jobContext.setMrStage(dispatchJob.getMrStage()); + if (StrUtil.isNotBlank(dispatchJob.getArgsStr())) { + try { + jobContext.setJobArgsHolder(JsonUtil.parseObject(dispatchJob.getArgsStr(), JobArgsHolder.class)); + } catch (Exception e) { + SnailJobLog.REMOTE.warn("workflow context parse error", e); + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(dispatchJob.getArgsStr()); + jobContext.setJobArgsHolder(jobArgsHolder); + } + } else { + // 没有数据给个空对象,方便后面取参数 + jobContext.setJobArgsHolder(new JobArgsHolder()); + } + String wfContext = dispatchJob.getWfContext(); if (StrUtil.isNotBlank(wfContext)) { try { diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java index c7160575..ad2fef16 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java @@ -15,8 +15,15 @@ import java.util.Objects; @Data public class JobArgs { + /** + * 此字段不在投递任何参数 + * see: jobParams + */ + @Deprecated private String argsStr; + private String jobParams; + private String executorInfo; private Long taskBatchId; diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java index 8c18744d..2e552e0a 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapArgs.java @@ -15,4 +15,6 @@ public class MapArgs extends JobArgs { private String taskName; + private String mapResult; + } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 8056b1be..86f52fcd 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -12,7 +12,9 @@ import com.aizuda.snailjob.client.job.core.timer.TimerManager; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; +import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.model.JobContext; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.enums.LogTypeEnum; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -22,6 +24,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -96,7 +99,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildJobArgs(JobContext jobContext) { JobArgs jobArgs = new JobArgs(); - jobArgs.setArgsStr(jobContext.getArgsStr()); + jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); return jobArgs; @@ -104,7 +107,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildShardingJobArgs(JobContext jobContext) { ShardingJobArgs jobArgs = new ShardingJobArgs(); - jobArgs.setArgsStr(jobContext.getArgsStr()); + jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setShardingIndex(jobContext.getShardingIndex()); jobArgs.setShardingTotal(jobContext.getShardingTotal()); @@ -113,7 +116,9 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildMapJobArgs(JobContext jobContext) { MapArgs jobArgs = new MapArgs(); - jobArgs.setArgsStr(jobContext.getArgsStr()); + JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); + jobArgs.setJobParams(jobArgsHolder.getJobParams()); + jobArgs.setMapResult(jobArgsHolder.getMapResult()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskName(jobContext.getTaskName()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); @@ -122,7 +127,9 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildReduceJobArgs(JobContext jobContext) { ReduceArgs jobArgs = new ReduceArgs(); - jobArgs.setArgsStr(jobContext.getArgsStr()); + JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); + jobArgs.setJobParams(jobArgsHolder.getJobParams()); + jobArgs.setMapResult(JsonUtil.parseList(jobArgsHolder.getMapResult(), List.class)); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); jobArgs.setWfContext(jobContext.getWfContext()); diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java new file mode 100644 index 00000000..a299b78d --- /dev/null +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java @@ -0,0 +1,31 @@ +package com.aizuda.snailjob.common.core.model; + +import lombok.Data; + +/** + * 定时任务 sj_job_task的args_str对应的 + * 参数模型 + * + * @author: opensnail + * @date : 2024-06-19 + * @since : sj_1.1.0 + */ +@Data +public class JobArgsHolder { + + /** + * sj_job表输入的参数 + */ + private String jobParams; + + /** + * 动态分片 map节点的结果 + */ + private String mapResult; + + /** + * 动态分片 reduce执行的结果 + */ + private String reduceResult; + +} diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java index e9ee837d..a0be939e 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java @@ -38,7 +38,7 @@ public class JobContext { private Integer executorTimeout; - private String argsStr; +// private String argsStr; /** * 重试场景 auto、manual @@ -70,5 +70,9 @@ public class JobContext { */ private Map wfContext; + /** + * 定时任务参数 + */ + private JobArgsHolder jobArgsHolder; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapJobExecutor.java new file mode 100644 index 00000000..bcbbfb83 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapJobExecutor.java @@ -0,0 +1,22 @@ +package com.aizuda.snailjob.server.job.task.support.executor.job; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import org.springframework.stereotype.Component; + +/** + * @author: shuguang.zhang + * @date : 2024-06-19 + */ +@Component +public class MapJobExecutor extends MapReduceJobExecutor { + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.MAP; + } + + @Override + protected void doExecute(final JobExecutorContext context) { + super.doExecute(context); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java index 21123c1b..3cf5a871 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java @@ -5,6 +5,8 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.model.JobArgsHolder; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; @@ -57,15 +59,18 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(serverNode)); + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(context.getArgsStr()); + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(context.getArgsStr()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); clientInfoSet.add(address); jobTasks.add(jobTask); } + Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + return jobTasks; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java index 59d5715f..e8cc76a8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java @@ -4,6 +4,8 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.model.JobArgsHolder; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; @@ -53,7 +55,9 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(serverNode)); jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(context.getArgsStr()); + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(context.getArgsStr()); + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGeneratorFactory.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGeneratorFactory.java index 4ed21245..d1f3a3c2 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGeneratorFactory.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGeneratorFactory.java @@ -9,7 +9,7 @@ import java.util.concurrent.ConcurrentHashMap; * @date 2023-10-02 13:04:09 * @since 2.4.0 */ -public class JobTaskGeneratorFactory { +public final class JobTaskGeneratorFactory { private static final ConcurrentHashMap CACHE = new ConcurrentHashMap<>(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 9e96d7ae..5c30e66b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -7,6 +7,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -28,7 +29,6 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.util.*; -import java.util.stream.Collectors; /** * 生成Map Reduce任务 @@ -95,7 +95,11 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage))); + + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(context.getArgsStr()); + jobArgsHolder.setReduceResult(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage))); + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage()); @@ -135,16 +139,23 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index))); + + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(context.getArgsStr()); + jobArgsHolder.setMapResult(JsonUtil.toJsonString(partition.get(index))); + + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage()); jobTask.setTaskName("REDUCE_TASK"); - // TODO 改批量插入 - Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); +// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), +// () -> new SnailJobServerException("新增任务实例失败")); finalJobTasks.add(jobTask); } + + Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insert(finalJobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + } }); @@ -179,18 +190,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index))); + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(context.getArgsStr()); + jobArgsHolder.setMapResult(JsonUtil.toJsonString(mapSubTask.get(index))); + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); jobTask.setTaskName(context.getTaskName()); jobTask.setLeaf(StatusEnum.YES.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - // TODO 改批量插入 - Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); +// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), +// () -> new SnailJobServerException("新增任务实例失败")); jobTasks.add(jobTask); } + Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + // 更新父节点的为非叶子节点 if (CollUtil.isNotEmpty(parentJobTasks)) { JobTask parentJobTask = new JobTask(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java new file mode 100644 index 00000000..ab7341f8 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java @@ -0,0 +1,33 @@ +package com.aizuda.snailjob.server.job.task.support.generator.task; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.List; + +/** + * @author: opensnail + * @date : 2024-06-19 + * @since : sj_1.1.0 + */ +@Component +public class MapTaskGenerator extends MapReduceTaskGenerator { + + public MapTaskGenerator(final JobTaskMapper jobTaskMapper, + final TransactionTemplate transactionTemplate) { + super(jobTaskMapper, transactionTemplate); + } + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.MAP; + } + + @Override + protected List doGenerate(final JobTaskGenerateContext context) { + return super.doGenerate(context); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java index 2ba10484..d1fe27bb 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java @@ -5,7 +5,9 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; @@ -15,6 +17,7 @@ import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -33,13 +36,10 @@ import java.util.Set; * @since 2.4.0 */ @Component -@Slf4j +@RequiredArgsConstructor public class ShardingTaskGenerator extends AbstractJobTaskGenerator { - - @Autowired - protected ClientNodeAllocateHandler clientNodeAllocateHandler; - @Autowired - private JobTaskMapper jobTaskMapper; + private final ClientNodeAllocateHandler clientNodeAllocateHandler; + private final JobTaskMapper jobTaskMapper; @Override public JobTaskTypeEnum getTaskInstanceType() { @@ -51,13 +51,13 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); if (CollUtil.isEmpty(serverNodes)) { - log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return Lists.newArrayList(); } String argsStr = context.getArgsStr(); if (StrUtil.isBlank(argsStr)) { - log.error("切片参数为空. jobId:[{}]", context.getJobId()); + SnailJobLog.LOCAL.error("切片参数为空. jobId:[{}]", context.getJobId()); return Lists.newArrayList(); } @@ -65,7 +65,7 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { try { argsStrs = JsonUtil.parseList(argsStr, String.class); } catch (Exception e) { - log.error("切片参数解析失败. jobId:[{}]", context.getJobId(), e); + SnailJobLog.LOCAL.error("切片参数解析失败. jobId:[{}]", context.getJobId(), e); return Lists.newArrayList(); } @@ -77,13 +77,18 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); jobTask.setArgsType(context.getArgsType()); - jobTask.setArgsStr(argsStrs.get(index)); + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(argsStrs.get(index)); + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); +// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); jobTasks.add(jobTask); } + Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + + return jobTasks; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapTaskStopHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapTaskStopHandler.java new file mode 100644 index 00000000..820b8d87 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/stop/MapTaskStopHandler.java @@ -0,0 +1,23 @@ +package com.aizuda.snailjob.server.job.task.support.stop; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import org.springframework.stereotype.Component; + +/** + * @author: opensnail + * @date : 2024-06-19 + * @since : sj_1.1.0 + */ +@Component +public class MapTaskStopHandler extends MapReduceTaskStopHandler { + + @Override + public JobTaskTypeEnum getTaskType() { + return JobTaskTypeEnum.MAP; + } + + @Override + protected void doStop(final TaskStopJobContext context) { + super.doStop(context); + } +}