fix(sj_1.1.0): 优化map reduce
This commit is contained in:
parent
5343299ff3
commit
6f1d04b854
@ -0,0 +1,20 @@
|
||||
package com.aizuda.snailjob.client.job.core.dto;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Task执行结果
|
||||
*
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-12 13:59
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class MergeReduceArgs extends JobArgs {
|
||||
|
||||
private List<?> reduces;
|
||||
|
||||
}
|
@ -1,6 +1,5 @@
|
||||
package com.aizuda.snailjob.client.job.core.dto;
|
||||
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
@ -18,7 +17,4 @@ public class ReduceArgs extends JobArgs {
|
||||
|
||||
private List<?> mapResult;
|
||||
|
||||
public List<?> getMapResult() {
|
||||
return JsonUtil.parseList(getArgsStr(), List.class);
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -60,8 +59,10 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
.contains(jobContext.getTaskType())) {
|
||||
if (MapReduceStageEnum.MAP.getStage() == jobContext.getMrStage()) {
|
||||
jobArgs = buildMapJobArgs(jobContext);
|
||||
} else {
|
||||
} else if (MapReduceStageEnum.REDUCE.getStage() == jobContext.getMrStage()) {
|
||||
jobArgs = buildReduceJobArgs(jobContext);
|
||||
} else {
|
||||
jobArgs = buildMergeReduceJobArgs(jobContext);
|
||||
}
|
||||
|
||||
} else {
|
||||
@ -118,7 +119,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
MapArgs jobArgs = new MapArgs();
|
||||
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
|
||||
jobArgs.setJobParams(jobArgsHolder.getJobParams());
|
||||
jobArgs.setMapResult(jobArgsHolder.getMapResult());
|
||||
jobArgs.setMapResult(jobArgsHolder.getMaps());
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setTaskName(jobContext.getTaskName());
|
||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
@ -129,7 +130,24 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
ReduceArgs jobArgs = new ReduceArgs();
|
||||
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
|
||||
jobArgs.setJobParams(jobArgsHolder.getJobParams());
|
||||
jobArgs.setMapResult(JsonUtil.parseList(jobArgsHolder.getMapResult(), List.class));
|
||||
String maps = jobArgsHolder.getMaps();
|
||||
if (StrUtil.isNotBlank(maps)) {
|
||||
jobArgs.setMapResult(JsonUtil.parseList(maps, Object.class));
|
||||
}
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
jobArgs.setWfContext(jobContext.getWfContext());
|
||||
return jobArgs;
|
||||
}
|
||||
|
||||
private static JobArgs buildMergeReduceJobArgs(JobContext jobContext) {
|
||||
MergeReduceArgs jobArgs = new MergeReduceArgs();
|
||||
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
|
||||
jobArgs.setJobParams(jobArgsHolder.getJobParams());
|
||||
String reduces = jobArgsHolder.getReduces();
|
||||
if (StrUtil.isNotBlank(reduces)) {
|
||||
jobArgs.setReduces(JsonUtil.parseList(reduces, Object.class));
|
||||
}
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
jobArgs.setWfContext(jobContext.getWfContext());
|
||||
|
@ -9,6 +9,7 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
import com.aizuda.snailjob.common.core.model.NettyResult;
|
||||
@ -67,7 +68,7 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
|
||||
|
||||
// 2. 同步发送请求
|
||||
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);
|
||||
if (result.getData()) {
|
||||
if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) {
|
||||
SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId());
|
||||
} else {
|
||||
throw new SnailJobMapReduceException("map failed for task: " + nextTaskName);
|
||||
|
@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.job.core.executor;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||
@ -27,7 +28,7 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
|
||||
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
|
||||
return this.doReduceExecute(reduceArgs);
|
||||
} else if (jobContext.getMrStage().equals(MapReduceStageEnum.MERGE_REDUCE.getStage())) {
|
||||
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
|
||||
MergeReduceArgs reduceArgs = (MergeReduceArgs) jobArgs;
|
||||
return this.doMergeReduceExecute(reduceArgs);
|
||||
}
|
||||
|
||||
@ -36,5 +37,5 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
|
||||
|
||||
protected abstract ExecuteResult doReduceExecute(ReduceArgs reduceArgs);
|
||||
|
||||
protected abstract ExecuteResult doMergeReduceExecute(ReduceArgs reduceArgs);
|
||||
protected abstract ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs);
|
||||
}
|
||||
|
@ -21,11 +21,11 @@ public class JobArgsHolder {
|
||||
/**
|
||||
* 动态分片 map节点的结果
|
||||
*/
|
||||
private String mapResult;
|
||||
private String maps;
|
||||
|
||||
/**
|
||||
* 动态分片 reduce执行的结果
|
||||
*/
|
||||
private String reduceResult;
|
||||
private String reduces;
|
||||
|
||||
}
|
||||
|
@ -1,8 +1,12 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.mapper;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
@ -15,4 +19,5 @@ import org.apache.ibatis.annotations.Mapper;
|
||||
@Mapper
|
||||
public interface JobTaskMapper extends BaseMapper<JobTask> {
|
||||
|
||||
int insertBatch(@Param("list") List<JobTask> list);
|
||||
}
|
||||
|
@ -0,0 +1,34 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper">
|
||||
|
||||
<!-- 定义批量新增的 SQL 映射 -->
|
||||
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
|
||||
INSERT INTO sj_job_task (namespace_id, group_name, job_id, task_batch_id, parent_id, task_status,
|
||||
retry_count, mr_stage, leaf, task_name, client_info, wf_context, args_str, result_message, args_type, ext_attrs,
|
||||
create_dt, update_dt)
|
||||
VALUES
|
||||
<foreach collection="list" item="item" separator=",">
|
||||
(
|
||||
#{item.namespaceId},
|
||||
#{item.groupName},
|
||||
#{item.jobId},
|
||||
#{item.taskBatchId},
|
||||
#{item.parentId},
|
||||
#{item.taskStatus},
|
||||
#{item.retryCount},
|
||||
#{item.mrStage},
|
||||
#{item.leaf},
|
||||
#{item.taskName},
|
||||
#{item.clientInfo},
|
||||
#{item.wfContext},
|
||||
#{item.argsStr},
|
||||
#{item.resultMessage},
|
||||
#{item.argsType},
|
||||
#{item.extAttrs},
|
||||
#{item.createDt},
|
||||
#{item.updateDt}
|
||||
)
|
||||
</foreach>
|
||||
</insert>
|
||||
</mapper>
|
@ -42,7 +42,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
|
||||
private static final String CALLBACK_TIMEOUT = "10";
|
||||
private final RestTemplate restTemplate;
|
||||
private final JobTaskMapper jobTaskMapper;
|
||||
|
||||
@Override
|
||||
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
||||
@ -70,8 +69,8 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
invokeCallback(context);
|
||||
}
|
||||
|
||||
// 执行下一个节点
|
||||
workflowTaskExecutor(context);
|
||||
// ToDo 执行下一个节点
|
||||
// workflowTaskExecutor(context);
|
||||
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
int operationReason = JobOperationReasonEnum.NONE.getReason();
|
||||
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
|
||||
String message = StrUtil.EMPTY;
|
||||
String wfContext = "";
|
||||
|
||||
Boolean result = (Boolean) Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE);
|
||||
|
||||
@ -66,27 +67,30 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
} else {
|
||||
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
|
||||
if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) {
|
||||
|
||||
try {
|
||||
// 这里重新加载一次最新的上下文
|
||||
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(new LambdaQueryWrapper<WorkflowTaskBatch>()
|
||||
.select(WorkflowTaskBatch::getWfContext)
|
||||
.eq(WorkflowTaskBatch::getId, context.getWorkflowTaskBatchId())
|
||||
);
|
||||
|
||||
if (Objects.isNull(workflowTaskBatch)) {
|
||||
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
|
||||
} else {
|
||||
wfContext = workflowTaskBatch.getWfContext();
|
||||
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
|
||||
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在"));
|
||||
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
|
||||
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
|
||||
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), workflowTaskBatch.getWfContext())).orElse(Boolean.FALSE);
|
||||
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), wfContext)).orElse(Boolean.FALSE);
|
||||
if (!result) {
|
||||
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e);
|
||||
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), wfContext, e);
|
||||
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
|
||||
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
|
||||
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
|
||||
@ -98,16 +102,13 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
// if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus && result) {
|
||||
// workflowTaskExecutor(context);
|
||||
// }
|
||||
|
||||
// 回传执行结果
|
||||
context.setEvaluationResult(result);
|
||||
context.setTaskBatchStatus(taskBatchStatus);
|
||||
context.setOperationReason(operationReason);
|
||||
context.setJobTaskStatus(jobTaskStatus);
|
||||
context.setLogMessage(message);
|
||||
context.setWfContext(wfContext);
|
||||
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
|
||||
@ -20,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
@ -65,11 +67,16 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||
jobTask.setParentId(0L);
|
||||
jobTask.setLeaf(StatusEnum.YES.getStatus());
|
||||
jobTask.setRetryCount(0);
|
||||
jobTask.setCreateDt(LocalDateTime.now());
|
||||
jobTask.setUpdateDt(LocalDateTime.now());
|
||||
clientInfoSet.add(address);
|
||||
jobTasks.add(jobTask);
|
||||
}
|
||||
|
||||
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
|
||||
return jobTasks;
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
|
||||
@ -54,7 +55,7 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
|
||||
// 新增任务实例
|
||||
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
|
||||
jobTask.setClientInfo(ClientInfoUtils.generate(serverNode));
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(context.getArgsStr());
|
||||
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
|
||||
|
@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||
@ -28,6 +29,7 @@ import org.springframework.transaction.TransactionStatus;
|
||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
@ -98,7 +100,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
||||
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(context.getArgsStr());
|
||||
jobArgsHolder.setReduceResult(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
|
||||
jobArgsHolder.setReduces(JsonUtil.toJsonString(StreamUtils.toSet(jobTasks, JobTask::getResultMessage)));
|
||||
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||
@ -139,22 +141,23 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
||||
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
|
||||
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(context.getArgsStr());
|
||||
jobArgsHolder.setMapResult(JsonUtil.toJsonString(partition.get(index)));
|
||||
|
||||
jobArgsHolder.setJobParams(StrUtil.isBlank(context.getArgsStr()) ? null : context.getArgsStr());
|
||||
jobArgsHolder.setMaps(JsonUtil.toJsonString(partition.get(index)));
|
||||
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||
jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage());
|
||||
jobTask.setTaskName("REDUCE_TASK");
|
||||
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
||||
// () -> new SnailJobServerException("新增任务实例失败"));
|
||||
jobTask.setParentId(0L);
|
||||
jobTask.setRetryCount(0);
|
||||
jobTask.setLeaf(StatusEnum.YES.getStatus());
|
||||
jobTask.setCreateDt(LocalDateTime.now());
|
||||
jobTask.setUpdateDt(LocalDateTime.now());
|
||||
finalJobTasks.add(jobTask);
|
||||
}
|
||||
|
||||
Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insert(finalJobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
Assert.isTrue(finalJobTasks.size() == jobTaskMapper.insertBatch(finalJobTasks), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
|
||||
}
|
||||
});
|
||||
@ -192,19 +195,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(context.getArgsStr());
|
||||
jobArgsHolder.setMapResult(JsonUtil.toJsonString(mapSubTask.get(index)));
|
||||
jobArgsHolder.setMaps(JsonUtil.toJsonString(mapSubTask.get(index)));
|
||||
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
|
||||
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
|
||||
jobTask.setTaskName(context.getTaskName());
|
||||
jobTask.setLeaf(StatusEnum.YES.getStatus());
|
||||
jobTask.setParentId(Objects.isNull(context.getParentId()) ? 0L : context.getParentId());
|
||||
jobTask.setRetryCount(0);
|
||||
jobTask.setCreateDt(LocalDateTime.now());
|
||||
jobTask.setUpdateDt(LocalDateTime.now());
|
||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
||||
// () -> new SnailJobServerException("新增任务实例失败"));
|
||||
jobTasks.add(jobTask);
|
||||
}
|
||||
|
||||
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
|
||||
// 更新父节点的为非叶子节点
|
||||
if (CollUtil.isNotEmpty(parentJobTasks)) {
|
||||
|
@ -3,8 +3,10 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.model.JobArgsHolder;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
@ -22,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -76,17 +79,21 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
|
||||
// 新增任务实例
|
||||
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
|
||||
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
|
||||
jobTask.setArgsType(context.getArgsType());
|
||||
JobArgsHolder jobArgsHolder = new JobArgsHolder();
|
||||
jobArgsHolder.setJobParams(argsStrs.get(index));
|
||||
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
|
||||
jobTask.setArgsType(JobArgsTypeEnum.JSON.getArgsType());
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||
// Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
jobTask.setParentId(0L);
|
||||
jobTask.setRetryCount(0);
|
||||
jobTask.setLeaf(StatusEnum.YES.getStatus());
|
||||
jobTask.setCreateDt(LocalDateTime.now());
|
||||
jobTask.setUpdateDt(LocalDateTime.now());
|
||||
jobTasks.add(jobTask);
|
||||
}
|
||||
|
||||
Assert.isTrue(jobTasks.size() == jobTaskMapper.insert(jobTasks).size(), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
Assert.isTrue(jobTasks.size() == jobTaskMapper.insertBatch(jobTasks), () -> new SnailJobServerException("新增任务实例失败"));
|
||||
|
||||
|
||||
return jobTasks;
|
||||
|
@ -135,13 +135,39 @@ public class JobTaskBatchHandler {
|
||||
*/
|
||||
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
|
||||
Integer mrStage = null;
|
||||
if (isAllMapTask(jobTasks)) {
|
||||
|
||||
int reduceCount = 0;
|
||||
int mapCount = 0;
|
||||
for (final JobTask jobTask : jobTasks) {
|
||||
if (Objects.isNull(jobTask.getMrStage())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 存在MERGE_REDUCE任务了不需要生成
|
||||
if (MERGE_REDUCE.getStage() == jobTask.getMrStage()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// REDUCE任务累加
|
||||
if (REDUCE.getStage() == jobTask.getMrStage()) {
|
||||
reduceCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// MAP任务累加
|
||||
if (MAP.getStage() == jobTask.getMrStage()) {
|
||||
mapCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// 若存在2个以上的reduce任务则开启merge reduce任务
|
||||
if (reduceCount > 1) {
|
||||
mrStage = MERGE_REDUCE.getStage();
|
||||
} else if (mapCount == jobTasks.size()) {
|
||||
// 若都是MAP任务则开启Reduce任务
|
||||
mrStage = REDUCE.getStage();
|
||||
} else if (isALeastOneReduceTask(jobTasks)) {
|
||||
// 若存在2个以上的reduce任务则开启merge reduce任务
|
||||
mrStage = MERGE_REDUCE.getStage();
|
||||
} else {
|
||||
// 若既不是MAP也是不REDUCE则是其他类型的任务,直接返回即可
|
||||
return false;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user