From 6f1d04b854d2b040acdd4ed69d768cbd414129b7 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Fri, 21 Jun 2024 17:53:11 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0):=20=E4=BC=98=E5=8C=96map=20redu?= =?UTF-8?q?ce?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/job/core/dto/MergeReduceArgs.java | 20 +++++++++++ .../client/job/core/dto/ReduceArgs.java | 4 --- .../core/executor/AbstractJobExecutor.java | 26 +++++++++++--- .../core/executor/AbstractMapExecutor.java | 3 +- .../executor/AbstractMapReduceExecutor.java | 5 +-- .../common/core/model/JobArgsHolder.java | 4 +-- .../persistence/mapper/JobTaskMapper.java | 5 +++ .../template/mapper/JobTaskMapper.xml | 34 +++++++++++++++++++ .../workflow/CallbackWorkflowExecutor.java | 5 ++- .../workflow/DecisionWorkflowExecutor.java | 13 +++---- .../task/BroadcastTaskGenerator.java | 9 ++++- .../generator/task/ClusterTaskGenerator.java | 3 +- .../task/MapReduceTaskGenerator.java | 30 +++++++++------- .../generator/task/ShardingTaskGenerator.java | 13 +++++-- .../support/handler/JobTaskBatchHandler.java | 34 ++++++++++++++++--- 15 files changed, 165 insertions(+), 43 deletions(-) create mode 100644 snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MergeReduceArgs.java create mode 100644 snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskMapper.xml diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MergeReduceArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MergeReduceArgs.java new file mode 100644 index 000000000..59b023e27 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MergeReduceArgs.java @@ -0,0 +1,20 @@ +package com.aizuda.snailjob.client.job.core.dto; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.List; + +/** + * Task执行结果 + * + * @author: opensnail + * @date : 2024-06-12 13:59 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class MergeReduceArgs extends JobArgs { + + private List reduces; + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/ReduceArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/ReduceArgs.java index 043aa6f2c..e4e0c1a1a 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/ReduceArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/ReduceArgs.java @@ -1,6 +1,5 @@ package com.aizuda.snailjob.client.job.core.dto; -import com.aizuda.snailjob.common.core.util.JsonUtil; import lombok.Data; import lombok.EqualsAndHashCode; @@ -18,7 +17,4 @@ public class ReduceArgs extends JobArgs { private List mapResult; - public List getMapResult() { - return JsonUtil.parseList(getArgsStr(), List.class); - } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 86f52fcd8..07414ed0e 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; -import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -60,8 +59,10 @@ public abstract class AbstractJobExecutor implements IJobExecutor { .contains(jobContext.getTaskType())) { if (MapReduceStageEnum.MAP.getStage() == jobContext.getMrStage()) { jobArgs = buildMapJobArgs(jobContext); - } else { + } else if (MapReduceStageEnum.REDUCE.getStage() == jobContext.getMrStage()) { jobArgs = buildReduceJobArgs(jobContext); + } else { + jobArgs = buildMergeReduceJobArgs(jobContext); } } else { @@ -118,7 +119,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { MapArgs jobArgs = new MapArgs(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); jobArgs.setJobParams(jobArgsHolder.getJobParams()); - jobArgs.setMapResult(jobArgsHolder.getMapResult()); + jobArgs.setMapResult(jobArgsHolder.getMaps()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskName(jobContext.getTaskName()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); @@ -129,7 +130,24 @@ public abstract class AbstractJobExecutor implements IJobExecutor { ReduceArgs jobArgs = new ReduceArgs(); JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); jobArgs.setJobParams(jobArgsHolder.getJobParams()); - jobArgs.setMapResult(JsonUtil.parseList(jobArgsHolder.getMapResult(), List.class)); + String maps = jobArgsHolder.getMaps(); + if (StrUtil.isNotBlank(maps)) { + jobArgs.setMapResult(JsonUtil.parseList(maps, Object.class)); + } + jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); + jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); + jobArgs.setWfContext(jobContext.getWfContext()); + return jobArgs; + } + + private static JobArgs buildMergeReduceJobArgs(JobContext jobContext) { + MergeReduceArgs jobArgs = new MergeReduceArgs(); + JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder(); + jobArgs.setJobParams(jobArgsHolder.getJobParams()); + String reduces = jobArgsHolder.getReduces(); + if (StrUtil.isNotBlank(reduces)) { + jobArgs.setReduces(JsonUtil.parseList(reduces, Object.class)); + } jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); jobArgs.setWfContext(jobContext.getWfContext()); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java index c5a6aa47e..b4bae628f 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java @@ -9,6 +9,7 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.NettyResult; @@ -67,7 +68,7 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements // 2. 同步发送请求 Result result = CLIENT.batchReportMapTask(mapTaskRequest); - if (result.getData()) { + if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) { SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId()); } else { throw new SnailJobMapReduceException("map failed for task: " + nextTaskName); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java index 4a59f00f8..ce7085d87 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.job.core.executor; import com.aizuda.snailjob.client.job.core.dto.JobArgs; import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; @@ -27,7 +28,7 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor { ReduceArgs reduceArgs = (ReduceArgs) jobArgs; return this.doReduceExecute(reduceArgs); } else if (jobContext.getMrStage().equals(MapReduceStageEnum.MERGE_REDUCE.getStage())) { - ReduceArgs reduceArgs = (ReduceArgs) jobArgs; + MergeReduceArgs reduceArgs = (MergeReduceArgs) jobArgs; return this.doMergeReduceExecute(reduceArgs); } @@ -36,5 +37,5 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor { protected abstract ExecuteResult doReduceExecute(ReduceArgs reduceArgs); - protected abstract ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs); + protected abstract ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs); } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java index a299b78d9..02ac88d33 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobArgsHolder.java @@ -21,11 +21,11 @@ public class JobArgsHolder { /** * 动态分片 map节点的结果 */ - private String mapResult; + private String maps; /** * 动态分片 reduce执行的结果 */ - private String reduceResult; + private String reduces; } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/JobTaskMapper.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/JobTaskMapper.java index 901ae7b25..c4b5b7db4 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/JobTaskMapper.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/JobTaskMapper.java @@ -1,8 +1,12 @@ package com.aizuda.snailjob.template.datasource.persistence.mapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; /** *

@@ -15,4 +19,5 @@ import org.apache.ibatis.annotations.Mapper; @Mapper public interface JobTaskMapper extends BaseMapper { + int insertBatch(@Param("list") List list); } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskMapper.xml b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskMapper.xml new file mode 100644 index 000000000..2ae359ede --- /dev/null +++ b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/JobTaskMapper.xml @@ -0,0 +1,34 @@ + + + + + + + INSERT INTO sj_job_task (namespace_id, group_name, job_id, task_batch_id, parent_id, task_status, + retry_count, mr_stage, leaf, task_name, client_info, wf_context, args_str, result_message, args_type, ext_attrs, + create_dt, update_dt) + VALUES + + ( + #{item.namespaceId}, + #{item.groupName}, + #{item.jobId}, + #{item.taskBatchId}, + #{item.parentId}, + #{item.taskStatus}, + #{item.retryCount}, + #{item.mrStage}, + #{item.leaf}, + #{item.taskName}, + #{item.clientInfo}, + #{item.wfContext}, + #{item.argsStr}, + #{item.resultMessage}, + #{item.argsType}, + #{item.extAttrs}, + #{item.createDt}, + #{item.updateDt} + ) + + + diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index 7d9a96455..e49dcdc3b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -42,7 +42,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { private static final String CALLBACK_TIMEOUT = "10"; private final RestTemplate restTemplate; - private final JobTaskMapper jobTaskMapper; @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { @@ -70,8 +69,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { invokeCallback(context); } - // 执行下一个节点 - workflowTaskExecutor(context); + // ToDo 执行下一个节点 +// workflowTaskExecutor(context); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java index 9ebd4b516..cd9d32c94 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java @@ -55,6 +55,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { int operationReason = JobOperationReasonEnum.NONE.getReason(); int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); String message = StrUtil.EMPTY; + String wfContext = ""; Boolean result = (Boolean) Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE); @@ -66,27 +67,30 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { } else { DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class); if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) { + try { // 这里重新加载一次最新的上下文 WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(new LambdaQueryWrapper() .select(WorkflowTaskBatch::getWfContext) .eq(WorkflowTaskBatch::getId, context.getWorkflowTaskBatchId()) ); + if (Objects.isNull(workflowTaskBatch)) { operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason(); } else { + wfContext = workflowTaskBatch.getWfContext(); ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType()); Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在")); ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine); ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler); - result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), workflowTaskBatch.getWfContext())).orElse(Boolean.FALSE); + result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), wfContext)).orElse(Boolean.FALSE); if (!result) { operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason(); } } } catch (Exception e) { - log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e); + log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), wfContext, e); taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason(); jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); @@ -98,16 +102,13 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { } } -// if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus && result) { -// workflowTaskExecutor(context); -// } - // 回传执行结果 context.setEvaluationResult(result); context.setTaskBatchStatus(taskBatchStatus); context.setOperationReason(operationReason); context.setJobTaskStatus(jobTaskStatus); context.setLogMessage(message); + context.setWfContext(wfContext); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java index 3cf5a871f..a2ac0bb51 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/BroadcastTaskGenerator.java @@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; @@ -20,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.LocalDateTime; import java.util.*; /** @@ -65,11 +67,16 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator { jobTask.setArgsType(context.getArgsType()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); + jobTask.setParentId(0L); + jobTask.setLeaf(StatusEnum.YES.getStatus()); + jobTask.setRetryCount(0); + jobTask.setCreateDt(LocalDateTime.now()); + jobTask.setUpdateDt(LocalDateTime.now()); clientInfoSet.add(address); jobTasks.add(jobTask); } - Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); return jobTasks; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java index e8cc76a8b..7f67777ed 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ClusterTaskGenerator.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.model.JobArgsHolder; @@ -54,7 +55,7 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator { // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(serverNode)); - jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(context.getArgsStr()); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java index 5c30e66ba..3c66b5e24 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; @@ -28,6 +29,7 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; +import java.time.LocalDateTime; import java.util.*; /** @@ -98,7 +100,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(context.getArgsStr()); - jobArgsHolder.setReduceResult(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage))); + jobArgsHolder.setReduces(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage))); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); @@ -139,22 +141,23 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); jobTask.setArgsType(context.getArgsType()); - JobArgsHolder jobArgsHolder = new JobArgsHolder(); - jobArgsHolder.setJobParams(context.getArgsStr()); - jobArgsHolder.setMapResult(JsonUtil.toJsonString(partition.get(index))); - + jobArgsHolder.setJobParams(StrUtil.isBlank(context.getArgsStr()) ? null : context.getArgsStr()); + jobArgsHolder.setMaps(JsonUtil.toJsonString(partition.get(index))); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage()); jobTask.setTaskName("REDUCE_TASK"); -// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), -// () -> new SnailJobServerException("新增任务实例失败")); + jobTask.setParentId(0L); + jobTask.setRetryCount(0); + jobTask.setLeaf(StatusEnum.YES.getStatus()); + jobTask.setCreateDt(LocalDateTime.now()); + jobTask.setUpdateDt(LocalDateTime.now()); finalJobTasks.add(jobTask); } - Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insert(finalJobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insertBatch(finalJobTasks), () -> new SnailJobServerException("新增任务实例失败")); } }); @@ -192,19 +195,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(context.getArgsStr()); - jobArgsHolder.setMapResult(JsonUtil.toJsonString(mapSubTask.get(index))); + jobArgsHolder.setMaps(JsonUtil.toJsonString(mapSubTask.get(index))); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); + jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setMrStage(MapReduceStageEnum.MAP.getStage()); jobTask.setTaskName(context.getTaskName()); jobTask.setLeaf(StatusEnum.YES.getStatus()); + jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId()); + jobTask.setRetryCount(0); + jobTask.setCreateDt(LocalDateTime.now()); + jobTask.setUpdateDt(LocalDateTime.now()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); -// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), -// () -> new SnailJobServerException("新增任务实例失败")); jobTasks.add(jobTask); } - Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); // 更新父节点的为非叶子节点 if (CollUtil.isNotEmpty(parentJobTasks)) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java index d1fe27bb2..b66e88754 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/ShardingTaskGenerator.java @@ -3,8 +3,10 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.JobArgsHolder; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -22,6 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -76,17 +79,21 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); - jobTask.setArgsType(context.getArgsType()); JobArgsHolder jobArgsHolder = new JobArgsHolder(); jobArgsHolder.setJobParams(argsStrs.get(index)); jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder)); + jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); -// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); + jobTask.setParentId(0L); + jobTask.setRetryCount(0); + jobTask.setLeaf(StatusEnum.YES.getStatus()); + jobTask.setCreateDt(LocalDateTime.now()); + jobTask.setUpdateDt(LocalDateTime.now()); jobTasks.add(jobTask); } - Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败")); + Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败")); return jobTasks; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index f64613a22..80004964e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -135,13 +135,39 @@ public class JobTaskBatchHandler { */ private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List jobTasks) { Integer mrStage = null; - if (isAllMapTask(jobTasks)) { + + int reduceCount = 0; + int mapCount = 0; + for (final JobTask jobTask : jobTasks) { + if (Objects.isNull(jobTask.getMrStage())) { + continue; + } + + // 存在MERGE_REDUCE任务了不需要生成 + if (MERGE_REDUCE.getStage() == jobTask.getMrStage()) { + return false; + } + + // REDUCE任务累加 + if (REDUCE.getStage() == jobTask.getMrStage()) { + reduceCount++; + continue; + } + + // MAP任务累加 + if (MAP.getStage() == jobTask.getMrStage()) { + mapCount++; + } + } + + // 若存在2个以上的reduce任务则开启merge reduce任务 + if (reduceCount > 1) { + mrStage = MERGE_REDUCE.getStage(); + } else if (mapCount == jobTasks.size()) { // 若都是MAP任务则开启Reduce任务 mrStage = REDUCE.getStage(); - } else if (isALeastOneReduceTask(jobTasks)) { - // 若存在2个以上的reduce任务则开启merge reduce任务 - mrStage = MERGE_REDUCE.getStage(); } else { + // 若既不是MAP也是不REDUCE则是其他类型的任务,直接返回即可 return false; }