diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java index c0c846e9a..09f8c6be7 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java @@ -15,12 +15,6 @@ import java.util.Objects; @Data public class JobArgs { - /** - * 此字段即将废弃,请使用see: jobParams - */ - @Deprecated - private String argsStr; - private Object jobParams; private String executorInfo; diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 3f4d72b77..35b800572 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -101,13 +101,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildJobArgs(JobContext jobContext) { JobArgs jobArgs = new JobArgs(); - // 下一个版本即将删除,本期兼容此问题 - Object jobParams = jobContext.getJobArgsHolder().getJobParams(); - if (jobParams instanceof String) { - jobArgs.setArgsStr((String) jobParams); - } else { - jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams)); - } jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); @@ -117,14 +110,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildShardingJobArgs(JobContext jobContext) { ShardingJobArgs jobArgs = new ShardingJobArgs(); jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams()); - // 下一个版本即将删除,本期兼容此问题 - Object jobParams = jobContext.getJobArgsHolder().getJobParams(); - if (jobParams instanceof String) { - jobArgs.setArgsStr((String) jobParams); - } else { - jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams)); - } - jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setShardingIndex(jobContext.getShardingIndex()); jobArgs.setShardingTotal(jobContext.getShardingTotal()); @@ -134,13 +119,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildMapJobArgs(JobContext jobContext) { MapArgs jobArgs = new MapArgs(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); - // 下一个版本即将删除,本期兼容此问题 - Object jobParams = jobContext.getJobArgsHolder().getJobParams(); - if (jobParams instanceof String) { - jobArgs.setArgsStr((String) jobParams); - } else { - jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams)); - } jobArgs.setJobParams(jobArgsHolder.getJobParams()); jobArgs.setMapResult(jobArgsHolder.getMaps()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); @@ -152,13 +130,6 @@ public abstract class AbstractJobExecutor implements IJobExecutor { private static JobArgs buildReduceJobArgs(JobContext jobContext) { ReduceArgs jobArgs = new ReduceArgs(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); - // 下一个版本即将删除,本期兼容此问题 - Object jobParams = jobContext.getJobArgsHolder().getJobParams(); - if (jobParams instanceof String) { - jobArgs.setArgsStr((String) jobParams); - } else { - jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams)); - } jobArgs.setJobParams(jobArgsHolder.getJobParams()); Object maps = jobArgsHolder.getMaps(); if (Objects.nonNull(maps)) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 2f9272bf9..6d81806cb 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -3,11 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.enums.*; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -26,15 +22,16 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; -import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.time.LocalDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; /** * 生成Map Reduce任务 @@ -60,14 +57,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { @Override protected List doGenerate(final JobTaskGenerateContext context) { -// Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), -// context.getNamespaceId()); -// if (CollUtil.isEmpty(serverNodes)) { -// SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); -// return Lists.newArrayList(); -// } - -// List nodeInfoList = new ArrayList<>(serverNodes); MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage()); Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed")); switch (Objects.requireNonNull(mapReduceStageEnum)) { @@ -90,19 +79,20 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { private List createMergeReduceJobTasks(JobTaskGenerateContext context) { List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) - .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + .select(JobTask::getResultMessage) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + .eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); + MapReduceArgsStrDTO jobParams = getJobParams(context); Pair clientInfo = getClientNodeInfo(context); // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(clientInfo.getKey()); jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); - jobArgsHolder.setJobParams(context.getArgsStr()); + jobArgsHolder.setJobParams(jobParams.getArgsStr()); jobArgsHolder.setReduces(StreamUtils.toList(jobTasks, JobTask::getResultMessage)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(clientInfo.getValue()); @@ -110,30 +100,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage()); jobTask.setTaskName(MERGE_REDUCE_TASK); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), - () -> new SnailJobServerException("新增任务实例失败")); + () -> new SnailJobServerException("新增任务实例失败")); return Lists.newArrayList(jobTask); } private List createReduceJobTasks(JobTaskGenerateContext context) { - int reduceParallel = 1; - String jobParams = null; - try { - MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(), - MapReduceArgsStrDTO.class); - reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1); - jobParams = mapReduceArgsStrDTO.getArgsStr(); - reduceParallel = Math.max(1, reduceParallel); - } catch (Exception e) { - SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr()); - } + MapReduceArgsStrDTO jobParams = getJobParams(context); + int reduceParallel = Math.max(1, + Optional.ofNullable(jobParams.getShardNum()).orElse(1)); List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getResultMessage, JobTask::getId) - .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - .eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage()) - .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + .select(JobTask::getResultMessage, JobTask::getId) + .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) + .eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage()) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); if (CollUtil.isEmpty(jobTasks)) { @@ -146,7 +128,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTasks = new ArrayList<>(partition.size()); final List finalJobTasks = jobTasks; - String finalJobParams = jobParams; transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { @@ -158,7 +139,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setClientInfo(clientInfo.getKey()); jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); - jobArgsHolder.setJobParams(finalJobParams); + jobArgsHolder.setJobParams(jobParams.getArgsStr()); jobArgsHolder.setMaps(partition.get(index)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(clientInfo.getValue()); @@ -187,62 +168,74 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { return Lists.newArrayList(); } + MapReduceArgsStrDTO jobParams = getJobParams(context); + // 判定父节点是不是叶子节点,若是则不更新否则更新为非叶子节点 JobTask parentJobTask = jobTaskMapper.selectOne( - new LambdaQueryWrapper() - .select(JobTask::getId) - .eq(JobTask::getId, Optional.ofNullable(context.getParentId()).orElse(0L)) - .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) + new LambdaQueryWrapper() + .select(JobTask::getId) + .eq(JobTask::getId, Optional.ofNullable(context.getParentId()).orElse(0L)) + .eq(JobTask::getLeaf, StatusEnum.YES.getStatus()) ); - return transactionTemplate.execute(status -> { - List jobTasks = new ArrayList<>(mapSubTask.size()); - for (int index = 0; index < mapSubTask.size(); index++) { - Pair clientInfo = getClientNodeInfo(context); + return transactionTemplate.execute(status -> { + List jobTasks = new ArrayList<>(mapSubTask.size()); + for (int index = 0; index < mapSubTask.size(); index++) { + Pair clientInfo = getClientNodeInfo(context); - // 新增任务实例 - JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); - jobTask.setClientInfo(clientInfo.getKey()); - jobTask.setArgsType(context.getArgsType()); - JobArgsHolder jobArgsHolder = new JobArgsHolder(); - jobArgsHolder.setJobParams(context.getArgsStr()); - jobArgsHolder.setMaps(mapSubTask.get(index)); - jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); - jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); - jobTask.setTaskStatus(clientInfo.getValue()); - jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); - jobTask.setTaskName(context.getTaskName()); - jobTask.setLeaf(StatusEnum.YES.getStatus()); - jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId()); - jobTask.setRetryCount(0); - jobTask.setCreateDt(LocalDateTime.now()); - jobTask.setUpdateDt(LocalDateTime.now()); - jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); - jobTasks.add(jobTask); - } + // 新增任务实例 + JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); + jobTask.setClientInfo(clientInfo.getKey()); + jobTask.setArgsType(context.getArgsType()); + JobArgsHolder jobArgsHolder = new JobArgsHolder(); + jobArgsHolder.setJobParams(jobParams.getArgsStr()); + jobArgsHolder.setMaps(mapSubTask.get(index)); + jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); + jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); + jobTask.setTaskStatus(clientInfo.getValue()); + jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); + jobTask.setTaskName(context.getTaskName()); + jobTask.setLeaf(StatusEnum.YES.getStatus()); + jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId()); + jobTask.setRetryCount(0); + jobTask.setCreateDt(LocalDateTime.now()); + jobTask.setUpdateDt(LocalDateTime.now()); + jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); + jobTasks.add(jobTask); + } - batchSaveJobTasks(jobTasks); + batchSaveJobTasks(jobTasks); - // 更新父节点的为非叶子节点 - if (Objects.nonNull(parentJobTask)) { - JobTask parentJobTask1 = new JobTask(); - parentJobTask1.setId(context.getParentId()); - parentJobTask1.setLeaf(StatusEnum.NO.getStatus()); - Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask1), - () -> new SnailJobMapReduceException("更新父节点失败")); - } + // 更新父节点的为非叶子节点 + if (Objects.nonNull(parentJobTask)) { + JobTask parentJobTask1 = new JobTask(); + parentJobTask1.setId(context.getParentId()); + parentJobTask1.setLeaf(StatusEnum.NO.getStatus()); + Assert.isTrue(1 == jobTaskMapper.updateById(parentJobTask1), + () -> new SnailJobMapReduceException("更新父节点失败")); + } - return jobTasks; - }); + return jobTasks; + }); } + protected MapReduceArgsStrDTO getJobParams(JobTaskGenerateContext context) { + try { + return JsonUtil.parseObject(context.getArgsStr(), MapReduceArgsStrDTO.class); + } catch (Exception e) { + SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr()); + } + + return new MapReduceArgsStrDTO(); + } + private Pair getClientNodeInfo(JobTaskGenerateContext context) { RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode( - context.getJobId().toString(), - context.getGroupName(), - context.getNamespaceId(), - ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType() + context.getJobId().toString(), + context.getGroupName(), + context.getNamespaceId(), + ClientLoadBalanceManager.AllocationAlgorithmEnum.ROUND.getType() ); String clientInfo = StrUtil.EMPTY; int JobTaskStatus = JobTaskStatusEnum.RUNNING.getStatus(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java index b52bbe9da..0513573f4 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapTaskGenerator.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.snailjob.server.job.task.dto.MapReduceArgsStrDTO; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import org.springframework.stereotype.Component; @@ -32,4 +33,12 @@ public class MapTaskGenerator extends MapReduceTaskGenerator { protected List doGenerate(final JobTaskGenerateContext context) { return super.doGenerate(context); } + + @Override + protected MapReduceArgsStrDTO getJobParams(JobTaskGenerateContext context) { + // 这里复用map reduce的参数能力 + MapReduceArgsStrDTO mapReduceArgsStrDTO = new MapReduceArgsStrDTO(); + mapReduceArgsStrDTO.setArgsStr(context.getArgsStr()); + return mapReduceArgsStrDTO; + } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index bb5162185..7eabfdf71 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -69,10 +69,23 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { Object[] args = retryRequest.getArgs(); MapTaskRequest mapTaskRequest = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), MapTaskRequest.class); + Job job = jobMapper.selectOne(new LambdaQueryWrapper() + .eq(Job::getId, mapTaskRequest.getJobId()) + .eq(Job::getGroupName, groupName) + .eq(Job::getNamespaceId, namespace) + ); + + if (Objects.isNull(job)) { + return JsonUtil.toJsonString( + new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.FALSE, + retryRequest.getReqId())); + } + // 创建map任务 - JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); + JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType()); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest); context.setGroupName(HttpHeaderUtil.getGroupName(headers)); + context.setArgsStr(job.getArgsStr()); context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); context.setMrStage(MapReduceStageEnum.MAP.getStage()); context.setMapSubTask(mapTaskRequest.getSubTask()); @@ -84,18 +97,6 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { retryRequest.getReqId())); } - Job job = jobMapper.selectOne(new LambdaQueryWrapper() - .eq(Job::getId, mapTaskRequest.getJobId()) - .eq(Job::getGroupName, groupName) - .eq(Job::getNamespaceId, namespace) - ); - - if (Objects.isNull(job)) { - return JsonUtil.toJsonString( - new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.FALSE, - retryRequest.getReqId())); - } - String newWfContext = null; if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId())) { WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(