feat(sj_1.1.0): 完成reduce 任务
This commit is contained in:
parent
3fab4805c2
commit
7c84e9645b
@ -21,15 +21,10 @@ import com.aizuda.snailjob.common.core.model.JobContext;
|
|||||||
import com.aizuda.snailjob.common.core.model.Result;
|
import com.aizuda.snailjob.common.core.model.Result;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
|
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
|
||||||
import jakarta.validation.ConstraintViolation;
|
|
||||||
import jakarta.validation.Valid;
|
import jakarta.validation.Valid;
|
||||||
import jakarta.validation.Validation;
|
|
||||||
import jakarta.validation.Validator;
|
|
||||||
import jakarta.validation.ValidatorFactory;
|
|
||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
|
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
|
||||||
@ -119,7 +114,7 @@ public class JobEndPoint {
|
|||||||
jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId());
|
jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId());
|
||||||
jobContext.setRetry(dispatchJob.isRetry());
|
jobContext.setRetry(dispatchJob.isRetry());
|
||||||
jobContext.setRetryScene(dispatchJob.getRetryScene());
|
jobContext.setRetryScene(dispatchJob.getRetryScene());
|
||||||
jobContext.setMapName(dispatchJob.getMapName());
|
jobContext.setTaskName(dispatchJob.getTaskName());
|
||||||
jobContext.setMrStage(dispatchJob.getMrStage());
|
jobContext.setMrStage(dispatchJob.getMrStage());
|
||||||
return jobContext;
|
return jobContext;
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,6 @@ import lombok.EqualsAndHashCode;
|
|||||||
@Data
|
@Data
|
||||||
public class MapArgs extends JobArgs {
|
public class MapArgs extends JobArgs {
|
||||||
|
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ import java.util.List;
|
|||||||
@Data
|
@Data
|
||||||
public class MapReduceArgs extends JobArgs {
|
public class MapReduceArgs extends JobArgs {
|
||||||
|
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
private List<MapArgs> mapArgsList;
|
private List<MapArgs> mapArgsList;
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
|||||||
jobArgs = buildShardingJobArgs(jobContext);
|
jobArgs = buildShardingJobArgs(jobContext);
|
||||||
} else if (Lists.newArrayList(JobTaskTypeEnum.MAP_REDUCE.getType(), JobTaskTypeEnum.MAP.getType())
|
} else if (Lists.newArrayList(JobTaskTypeEnum.MAP_REDUCE.getType(), JobTaskTypeEnum.MAP.getType())
|
||||||
.contains(jobContext.getTaskType())) {
|
.contains(jobContext.getTaskType())) {
|
||||||
if (MapReduceStageEnum.MAP.name().equals(jobContext.getMrStage())) {
|
if (MapReduceStageEnum.MAP.getStage() == jobContext.getMrStage()) {
|
||||||
jobArgs = buildMapJobArgs(jobContext);
|
jobArgs = buildMapJobArgs(jobContext);
|
||||||
} else {
|
} else {
|
||||||
jobArgs = buildReduceJobArgs(jobContext);
|
jobArgs = buildReduceJobArgs(jobContext);
|
||||||
@ -106,7 +106,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
|||||||
MapArgs jobArgs = new MapArgs();
|
MapArgs jobArgs = new MapArgs();
|
||||||
jobArgs.setArgsStr(jobContext.getArgsStr());
|
jobArgs.setArgsStr(jobContext.getArgsStr());
|
||||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||||
jobArgs.setMapName(jobContext.getMapName());
|
jobArgs.setTaskName(jobContext.getTaskName());
|
||||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||||
return jobArgs;
|
return jobArgs;
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
|||||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||||
import com.aizuda.snailjob.common.core.model.NettyResult;
|
import com.aizuda.snailjob.common.core.model.NettyResult;
|
||||||
import com.aizuda.snailjob.common.core.model.Result;
|
import com.aizuda.snailjob.common.core.model.Result;
|
||||||
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
@ -39,15 +40,19 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
|
|||||||
|
|
||||||
public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs);
|
public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs);
|
||||||
|
|
||||||
public void doMapExecute(List<Object> taskList, String nextMapName) {
|
public ExecuteResult doMap(List<Object> taskList, String nextTaskName) {
|
||||||
|
|
||||||
if (CollectionUtils.isEmpty(taskList) || StrUtil.isBlank(nextMapName)) {
|
if (StrUtil.isBlank(nextTaskName)) {
|
||||||
return;
|
throw new SnailJobMapReduceException("The next task name can not blank or null {}", nextTaskName);
|
||||||
}
|
}
|
||||||
|
|
||||||
// mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
|
if (CollectionUtils.isEmpty(taskList)) {
|
||||||
if (SystemConstants.MAP_ROOT.equals(nextMapName)) {
|
throw new SnailJobMapReduceException("The task list can not empty {}", nextTaskName);
|
||||||
throw new SnailJobMapReduceException("The Next mapName can not be {}", SystemConstants.MAP_ROOT);
|
}
|
||||||
|
|
||||||
|
// taskName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
|
||||||
|
if (SystemConstants.MAP_ROOT.equals(nextTaskName)) {
|
||||||
|
throw new SnailJobMapReduceException("The Next taskName can not be {}", SystemConstants.MAP_ROOT);
|
||||||
}
|
}
|
||||||
|
|
||||||
JobContext jobContext = JobContextManager.getJobContext();
|
JobContext jobContext = JobContextManager.getJobContext();
|
||||||
@ -56,17 +61,18 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
|
|||||||
MapTaskRequest mapTaskRequest = new MapTaskRequest();
|
MapTaskRequest mapTaskRequest = new MapTaskRequest();
|
||||||
mapTaskRequest.setJobId(jobContext.getJobId());
|
mapTaskRequest.setJobId(jobContext.getJobId());
|
||||||
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
|
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
|
||||||
mapTaskRequest.setMapName(nextMapName);
|
mapTaskRequest.setTaskName(nextTaskName);
|
||||||
mapTaskRequest.setSubTask(taskList);
|
mapTaskRequest.setSubTask(taskList);
|
||||||
mapTaskRequest.setParentId(jobContext.getTaskId());
|
mapTaskRequest.setParentId(jobContext.getTaskId());
|
||||||
|
|
||||||
// 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常)
|
// 2. 同步发送请求
|
||||||
Result<Boolean> booleanResult = CLIENT.batchReportMapTask(mapTaskRequest);
|
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);
|
||||||
|
if (result.getData()) {
|
||||||
if (booleanResult.getData()) {
|
SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId());
|
||||||
log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), nextMapName, taskList.size());
|
|
||||||
} else {
|
} else {
|
||||||
throw new SnailJobMapReduceException("map failed for task: " + nextMapName);
|
throw new SnailJobMapReduceException("map failed for task: " + nextTaskName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return ExecuteResult.success();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
|||||||
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
|
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
|
||||||
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
|
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
|
||||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||||
|
|
||||||
@ -20,12 +21,13 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
|
|||||||
@Override
|
@Override
|
||||||
public ExecuteResult doJobExecute(final JobArgs jobArgs) {
|
public ExecuteResult doJobExecute(final JobArgs jobArgs) {
|
||||||
JobContext jobContext = JobContextManager.getJobContext();
|
JobContext jobContext = JobContextManager.getJobContext();
|
||||||
if (jobContext.getMrStage().equals("MAP")) {
|
if (jobContext.getMrStage().equals(MapReduceStageEnum.MAP.getStage())) {
|
||||||
return super.doJobExecute(jobArgs);
|
return super.doJobExecute(jobArgs);
|
||||||
} else if(jobContext.getMrStage().equals("REDUCE")) {
|
} else if(jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) {
|
||||||
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
|
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
|
||||||
return doReduceExecute(reduceArgs);
|
return doReduceExecute(reduceArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new SnailJobMapReduceException("非法的MapReduceStage");
|
throw new SnailJobMapReduceException("非法的MapReduceStage");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,9 +44,9 @@ public class DispatchJobRequest {
|
|||||||
/**
|
/**
|
||||||
* 任务名称
|
* 任务名称
|
||||||
*/
|
*/
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
private String mrStage;
|
private Integer mrStage;
|
||||||
|
|
||||||
private String argsStr;
|
private String argsStr;
|
||||||
|
|
||||||
|
@ -27,8 +27,8 @@ public class MapTaskRequest {
|
|||||||
|
|
||||||
private Long workflowNodeId;
|
private Long workflowNodeId;
|
||||||
|
|
||||||
@NotBlank(message = "mapName 不能为空")
|
@NotBlank(message = "taskName 不能为空")
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
@NotEmpty(message = "subTask 不能为空")
|
@NotEmpty(message = "subTask 不能为空")
|
||||||
private List<Object> subTask;
|
private List<Object> subTask;
|
||||||
|
@ -1,11 +1,35 @@
|
|||||||
package com.aizuda.snailjob.common.core.enums;
|
package com.aizuda.snailjob.common.core.enums;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: opensnail
|
* @author: opensnail
|
||||||
* @date : 2024-06-12
|
* @date : 2024-06-12
|
||||||
* @since : sj_1.1.0
|
* @since : sj_1.1.0
|
||||||
*/
|
*/
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
public enum MapReduceStageEnum {
|
public enum MapReduceStageEnum {
|
||||||
MAP,
|
MAP(1),
|
||||||
REDUCE
|
REDUCE(2),
|
||||||
|
MERGE_REDUCE(3);
|
||||||
|
|
||||||
|
private final int stage;
|
||||||
|
|
||||||
|
public static MapReduceStageEnum ofStage(Integer stage) {
|
||||||
|
if (Objects.isNull(stage)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (MapReduceStageEnum value : MapReduceStageEnum.values()) {
|
||||||
|
if (value.getStage() == stage) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ public class JobContext {
|
|||||||
/**
|
/**
|
||||||
* Map名称
|
* Map名称
|
||||||
*/
|
*/
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
private String mrStage;
|
private Integer mrStage;
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@ public final class MapContext {
|
|||||||
/**
|
/**
|
||||||
* Map名称
|
* Map名称
|
||||||
*/
|
*/
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
private Long jobId;
|
private Long jobId;
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableId;
|
|||||||
import com.baomidou.mybatisplus.annotation.TableName;
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serial;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
@ -19,6 +20,7 @@ import java.time.LocalDateTime;
|
|||||||
@TableName("sj_job_task")
|
@TableName("sj_job_task")
|
||||||
public class JobTask implements Serializable {
|
public class JobTask implements Serializable {
|
||||||
|
|
||||||
|
@Serial
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -37,6 +39,11 @@ public class JobTask implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private String groupName;
|
private String groupName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 任务名称
|
||||||
|
*/
|
||||||
|
private String taskName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务信息id
|
* 任务信息id
|
||||||
*/
|
*/
|
||||||
@ -83,6 +90,17 @@ public class JobTask implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private Integer argsType;
|
private Integer argsType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 叶子节点(0:非叶子节点 1:叶子节点)
|
||||||
|
*/
|
||||||
|
private Integer leaf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 动态分片使用
|
||||||
|
* 1:map 2:reduce 3:mergeReduce
|
||||||
|
*/
|
||||||
|
private Integer mrStage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扩展字段
|
* 扩展字段
|
||||||
*/
|
*/
|
||||||
|
@ -40,5 +40,6 @@ public class JobExecutorResultDTO {
|
|||||||
|
|
||||||
private Integer jobOperationReason;
|
private Integer jobOperationReason;
|
||||||
|
|
||||||
|
private Integer isLeaf;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,37 +0,0 @@
|
|||||||
package com.aizuda.snailjob.server.job.task.dto;
|
|
||||||
|
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
|
||||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
|
||||||
import lombok.Data;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author: opensnail
|
|
||||||
* @date : 2024-06-12
|
|
||||||
* @since : sj_1.1.0
|
|
||||||
*/
|
|
||||||
@Data
|
|
||||||
public class JobTaskExtAttrsDTO {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 任务名称(目前只有map reduce使用)
|
|
||||||
*/
|
|
||||||
private String mapName;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* see: {@link JobTaskTypeEnum}
|
|
||||||
*/
|
|
||||||
private Integer taskType;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 当前任务处于map reduce的哪个阶段
|
|
||||||
* see: {@link MapReduceStageEnum}
|
|
||||||
*/
|
|
||||||
private String mrStage;
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return JsonUtil.toJsonString(this);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,5 +1,6 @@
|
|||||||
package com.aizuda.snailjob.server.job.task.dto;
|
package com.aizuda.snailjob.server.job.task.dto;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
|
||||||
@ -31,14 +32,15 @@ public class RealJobExecutorDTO extends BaseDTO {
|
|||||||
private String argsType;
|
private String argsType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MAP名称
|
* 任务名称
|
||||||
*/
|
*/
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MAP名称
|
* 动态分片的阶段
|
||||||
|
* {@link MapReduceStageEnum}
|
||||||
*/
|
*/
|
||||||
private String mrState;
|
private Integer mrStage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扩展字段
|
* 扩展字段
|
||||||
|
@ -104,7 +104,9 @@ public interface JobTaskConverter {
|
|||||||
@Mapping(source = "jobTask.argsStr", target = "argsStr"),
|
@Mapping(source = "jobTask.argsStr", target = "argsStr"),
|
||||||
@Mapping(source = "jobTask.argsType", target = "argsType"),
|
@Mapping(source = "jobTask.argsType", target = "argsType"),
|
||||||
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
|
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
|
||||||
@Mapping(source = "jobTask.namespaceId", target = "namespaceId")
|
@Mapping(source = "jobTask.namespaceId", target = "namespaceId"),
|
||||||
|
@Mapping(source = "jobTask.taskName", target = "taskName"),
|
||||||
|
@Mapping(source = "jobTask.mrStage", target = "mrStage")
|
||||||
})
|
})
|
||||||
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);
|
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);
|
||||||
|
|
||||||
|
@ -3,10 +3,8 @@ package com.aizuda.snailjob.server.job.task.support.callback;
|
|||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
|
||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
|
|
||||||
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
|
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
|
||||||
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
|
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
|
||||||
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
|
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
|
||||||
@ -53,12 +51,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
|
|||||||
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
|
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
|
||||||
realJobExecutor.setRetry(Boolean.TRUE);
|
realJobExecutor.setRetry(Boolean.TRUE);
|
||||||
realJobExecutor.setRetryScene(context.getRetryScene());
|
realJobExecutor.setRetryScene(context.getRetryScene());
|
||||||
String extAttrs = jobTask.getExtAttrs();
|
realJobExecutor.setTaskName(jobTask.getTaskName());
|
||||||
// TODO 待优化
|
|
||||||
if (StrUtil.isNotBlank(extAttrs)) {
|
|
||||||
JobTaskExtAttrsDTO extAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
|
|
||||||
realJobExecutor.setMapName(extAttrsDTO.getMapName());
|
|
||||||
}
|
|
||||||
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
|
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
|
||||||
actorRef.tell(realJobExecutor, actorRef);
|
actorRef.tell(realJobExecutor, actorRef);
|
||||||
return;
|
return;
|
||||||
|
@ -2,17 +2,25 @@ package com.aizuda.snailjob.server.job.task.support.callback;
|
|||||||
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
import cn.hutool.core.util.RandomUtil;
|
import cn.hutool.core.util.RandomUtil;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
|
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
|
||||||
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
||||||
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
|
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
|
||||||
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -21,8 +29,9 @@ import java.util.Set;
|
|||||||
* @since : sj_1.1.0
|
* @since : sj_1.1.0
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@RequiredArgsConstructor
|
||||||
public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandler {
|
public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandler {
|
||||||
|
private final JobTaskMapper jobTaskMapper;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JobTaskTypeEnum getTaskInstanceType() {
|
public JobTaskTypeEnum getTaskInstanceType() {
|
||||||
@ -31,13 +40,16 @@ public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandle
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doCallback(final ClientCallbackContext context) {
|
protected void doCallback(final ClientCallbackContext context) {
|
||||||
|
JobTask jobTask = jobTaskMapper.selectOne(new LambdaQueryWrapper<JobTask>()
|
||||||
|
.eq(JobTask::getId, context.getTaskId()));
|
||||||
|
Assert.notNull(jobTask, () -> new SnailJobServerException("job task is null"));
|
||||||
|
|
||||||
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
|
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
|
||||||
jobExecutorResultDTO.setTaskId(context.getTaskId());
|
jobExecutorResultDTO.setTaskId(context.getTaskId());
|
||||||
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
|
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
|
||||||
jobExecutorResultDTO.setResult(context.getExecuteResult().getResult());
|
jobExecutorResultDTO.setResult(context.getExecuteResult().getResult());
|
||||||
jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType());
|
jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType());
|
||||||
|
jobExecutorResultDTO.setIsLeaf(jobTask.getLeaf());
|
||||||
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
|
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
|
||||||
actorRef.tell(jobExecutorResultDTO, actorRef);
|
actorRef.tell(jobExecutorResultDTO, actorRef);
|
||||||
}
|
}
|
||||||
@ -46,7 +58,7 @@ public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandle
|
|||||||
protected String chooseNewClient(ClientCallbackContext context) {
|
protected String chooseNewClient(ClientCallbackContext context) {
|
||||||
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
|
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
|
||||||
if (CollUtil.isEmpty(nodes)) {
|
if (CollUtil.isEmpty(nodes)) {
|
||||||
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,10 +135,10 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
|
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
|
||||||
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
|
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
|
||||||
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
|
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
instanceGenerateContext.setMapName(SystemConstants.MAP_ROOT);
|
instanceGenerateContext.setTaskName(SystemConstants.MAP_ROOT);
|
||||||
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
|
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
|
||||||
// TODO 此处需要判断任务类型
|
// TODO 此处需要判断任务类型
|
||||||
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
|
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP.getStage());
|
||||||
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
|
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
|
||||||
if (CollUtil.isEmpty(taskList)) {
|
if (CollUtil.isEmpty(taskList)) {
|
||||||
SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId());
|
SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId());
|
||||||
@ -181,8 +181,6 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
context.setJobId(job.getId());
|
context.setJobId(job.getId());
|
||||||
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
||||||
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
|
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
|
||||||
context.setMapName(SystemConstants.MAP_ROOT);
|
|
||||||
context.setMrStage(MapReduceStageEnum.MAP.name());
|
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
|
|||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
@ -57,6 +58,12 @@ public class JobExecutorResultActor extends AbstractActor {
|
|||||||
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
|
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
|
||||||
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
|
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
|
||||||
() -> new SnailJobServerException("更新任务实例失败"));
|
() -> new SnailJobServerException("更新任务实例失败"));
|
||||||
|
|
||||||
|
// 除MAP和MAP_REDUCE 任务之外,其他任务都是叶子节点
|
||||||
|
if (Objects.nonNull(result.getIsLeaf()) && StatusEnum.NO.getStatus().equals(result.getIsLeaf())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 先尝试完成,若已完成则不需要通过获取分布式锁来完成
|
// 先尝试完成,若已完成则不需要通过获取分布式锁来完成
|
||||||
boolean tryCompleteAndStop = tryCompleteAndStop(result);
|
boolean tryCompleteAndStop = tryCompleteAndStop(result);
|
||||||
if (!tryCompleteAndStop) {
|
if (!tryCompleteAndStop) {
|
||||||
|
@ -3,10 +3,11 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
|
|||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
|
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
|
||||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
|
||||||
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
|
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
|
||||||
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
||||||
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
|
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
|
||||||
@ -16,8 +17,11 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
|
|||||||
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
|
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
|
||||||
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
|
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
@ -41,18 +45,19 @@ public class ReduceActor extends AbstractActor {
|
|||||||
private static final String KEY = "job_generate_reduce_{0}_{1}";
|
private static final String KEY = "job_generate_reduce_{0}_{1}";
|
||||||
private final DistributedLockHandler distributedLockHandler;
|
private final DistributedLockHandler distributedLockHandler;
|
||||||
private final JobMapper jobMapper;
|
private final JobMapper jobMapper;
|
||||||
|
private final JobTaskMapper jobTaskMapper;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
|
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
|
||||||
|
SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask));
|
||||||
try {
|
try {
|
||||||
distributedLockHandler.lockWithDisposableAndRetry(() -> {
|
distributedLockHandler.lockWithDisposableAndRetry(() -> {
|
||||||
doReduce(reduceTask);
|
doReduce(reduceTask);
|
||||||
}, MessageFormat.format(KEY, reduceTask.getTaskBatchId(),
|
}, MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()),
|
||||||
reduceTask.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
|
Duration.ofSeconds(1),
|
||||||
|
Duration.ofSeconds(1),
|
||||||
|
3);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
|
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
|
||||||
}
|
}
|
||||||
@ -62,13 +67,29 @@ public class ReduceActor extends AbstractActor {
|
|||||||
|
|
||||||
private void doReduce(final ReduceTaskDTO reduceTask) {
|
private void doReduce(final ReduceTaskDTO reduceTask) {
|
||||||
|
|
||||||
|
List<JobTask> jobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1),
|
||||||
|
new LambdaQueryWrapper<JobTask>()
|
||||||
|
.select(JobTask::getId)
|
||||||
|
.eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId())
|
||||||
|
.eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage())
|
||||||
|
);
|
||||||
|
|
||||||
|
if (CollUtil.isNotEmpty(jobTasks)) {
|
||||||
|
// 说明已经创建了reduce任务了
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Job job = jobMapper.selectById(reduceTask.getJobId());
|
Job job = jobMapper.selectById(reduceTask.getJobId());
|
||||||
|
// 非MAP_REDUCE任务不处理
|
||||||
|
if (JobTaskTypeEnum.MAP_REDUCE.getType() != job.getTaskType()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 创建reduce任务
|
// 创建reduce任务
|
||||||
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
|
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
|
||||||
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
|
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
|
||||||
context.setTaskBatchId(reduceTask.getTaskBatchId());
|
context.setTaskBatchId(reduceTask.getTaskBatchId());
|
||||||
context.setMrStage(MapReduceStageEnum.REDUCE);
|
context.setMrStage(MapReduceStageEnum.REDUCE.getStage());
|
||||||
List<JobTask> taskList = taskInstance.generate(context);
|
List<JobTask> taskList = taskInstance.generate(context);
|
||||||
if (CollUtil.isEmpty(taskList)) {
|
if (CollUtil.isEmpty(taskList)) {
|
||||||
SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());
|
SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());
|
||||||
@ -82,13 +103,12 @@ public class ReduceActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job,
|
private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job,
|
||||||
List<JobTask> taskList) {
|
List<JobTask> taskList) {
|
||||||
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
||||||
context.setTaskList(taskList);
|
context.setTaskList(taskList);
|
||||||
context.setTaskBatchId(reduceTask.getTaskBatchId());
|
context.setTaskBatchId(reduceTask.getTaskBatchId());
|
||||||
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
|
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
|
||||||
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
|
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
|
||||||
context.setMrStage(MapReduceStageEnum.REDUCE.name());
|
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,8 +90,4 @@ public class JobExecutorContext {
|
|||||||
|
|
||||||
private Long workflowNodeId;
|
private Long workflowNodeId;
|
||||||
|
|
||||||
private String mapName;
|
|
||||||
|
|
||||||
private String mrStage;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -39,12 +39,12 @@ public class JobTaskGenerateContext {
|
|||||||
/**
|
/**
|
||||||
* 任务名称
|
* 任务名称
|
||||||
*/
|
*/
|
||||||
private String mapName;
|
private String taskName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 动态分片的阶段
|
* 动态分片的阶段
|
||||||
*/
|
*/
|
||||||
private MapReduceStageEnum mrStage;
|
private Integer mrStage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 父任务id
|
* 父任务id
|
||||||
|
@ -5,6 +5,8 @@ import cn.hutool.core.lang.Assert;
|
|||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||||
|
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||||
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
||||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
@ -12,24 +14,20 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
|
|||||||
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
||||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||||
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
|
|
||||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
|
||||||
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.TransactionStatus;
|
import org.springframework.transaction.TransactionStatus;
|
||||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 生成Map Reduce任务
|
* 生成Map Reduce任务
|
||||||
@ -54,15 +52,18 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
|
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
|
||||||
// TODO 若没有客户端节点JobTask是否需要创建????
|
// TODO 若没有客户端节点JobTask是否需要创建????
|
||||||
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
|
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
|
||||||
context.getNamespaceId());
|
context.getNamespaceId());
|
||||||
if (CollUtil.isEmpty(serverNodes)) {
|
if (CollUtil.isEmpty(serverNodes)) {
|
||||||
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
||||||
return Lists.newArrayList();
|
return Lists.newArrayList();
|
||||||
}
|
}
|
||||||
|
|
||||||
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
|
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
|
||||||
|
MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage());
|
||||||
|
Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed"));
|
||||||
|
|
||||||
switch (context.getMrStage()) {
|
// todo 待优化
|
||||||
|
switch (mapReduceStageEnum) {
|
||||||
case MAP -> {
|
case MAP -> {
|
||||||
// MAP任务
|
// MAP任务
|
||||||
return createMapJobTasks(context, nodeInfoList, serverNodes);
|
return createMapJobTasks(context, nodeInfoList, serverNodes);
|
||||||
@ -76,37 +77,24 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
|
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
|
||||||
Set<RegisterNodeInfo> serverNodes) {
|
Set<RegisterNodeInfo> serverNodes) {
|
||||||
|
|
||||||
// TODO reduce阶段的并行度
|
// TODO reduce阶段的并行度
|
||||||
int reduceParallel = 10;
|
int reduceParallel = 2;
|
||||||
|
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||||
.select(JobTask::getResultMessage, JobTask::getExtAttrs)
|
.select(JobTask::getResultMessage)
|
||||||
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
|
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
|
||||||
|
.eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage())
|
||||||
// 若存在已经生成的reduce任务不需要重新生成
|
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
|
||||||
boolean existedReduce = jobTasks.stream()
|
);
|
||||||
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
|
|
||||||
.map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class))
|
|
||||||
.anyMatch(jobTaskExtAttrsDTO -> MapReduceStageEnum.REDUCE.name().equals(jobTaskExtAttrsDTO.getMrStage()));
|
|
||||||
if (existedReduce) {
|
|
||||||
SnailJobLog.LOCAL.warn("The reduce task already exists. taskBatchId:[{}]", context.getTaskBatchId());
|
|
||||||
return Lists.newArrayList();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 这里需要判断是否是map
|
// 这里需要判断是否是map
|
||||||
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
|
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
|
||||||
|
|
||||||
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
|
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
|
||||||
|
|
||||||
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO();
|
|
||||||
jobTaskExtAttrsDTO.setMapName(context.getMapName());
|
|
||||||
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
|
|
||||||
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name());
|
|
||||||
|
|
||||||
jobTasks = new ArrayList<>(partition.size());
|
jobTasks = new ArrayList<>(partition.size());
|
||||||
|
|
||||||
final List<JobTask> finalJobTasks = jobTasks;
|
final List<JobTask> finalJobTasks = jobTasks;
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
@ -120,9 +108,11 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index)));
|
jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index)));
|
||||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||||
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
|
jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage());
|
||||||
|
jobTask.setTaskName("REDUCE_TASK");
|
||||||
|
// TODO 改批量插入
|
||||||
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
||||||
() -> new SnailJobServerException("新增任务实例失败"));
|
() -> new SnailJobServerException("新增任务实例失败"));
|
||||||
finalJobTasks.add(jobTask);
|
finalJobTasks.add(jobTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,18 +121,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
return finalJobTasks;
|
return finalJobTasks;
|
||||||
}
|
}
|
||||||
|
|
||||||
private @Nullable List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
|
@NotNull
|
||||||
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
|
private List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
|
||||||
|
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
|
||||||
List<?> mapSubTask = context.getMapSubTask();
|
List<?> mapSubTask = context.getMapSubTask();
|
||||||
if (CollUtil.isEmpty(mapSubTask)) {
|
if (CollUtil.isEmpty(mapSubTask)) {
|
||||||
SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId());
|
SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId());
|
||||||
return Lists.newArrayList();
|
return Lists.newArrayList();
|
||||||
}
|
}
|
||||||
|
|
||||||
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO();
|
// 判定父节点是不是叶子节点,若是则不更新否则更新为非叶子节点
|
||||||
jobTaskExtAttrsDTO.setMapName(context.getMapName());
|
List<JobTask> parentJobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1),
|
||||||
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
|
new LambdaQueryWrapper<JobTask>().select(JobTask::getId)
|
||||||
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name());
|
.eq(JobTask::getId, context.getParentId())
|
||||||
|
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
|
||||||
|
);
|
||||||
|
|
||||||
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
|
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
|
||||||
|
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@ -157,12 +151,24 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
jobTask.setArgsType(context.getArgsType());
|
jobTask.setArgsType(context.getArgsType());
|
||||||
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
|
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
|
||||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||||
|
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
|
||||||
|
jobTask.setTaskName(context.getTaskName());
|
||||||
|
jobTask.setLeaf(StatusEnum.YES.getStatus());
|
||||||
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
|
||||||
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
|
// TODO 改批量插入
|
||||||
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
|
||||||
() -> new SnailJobServerException("新增任务实例失败"));
|
() -> new SnailJobServerException("新增任务实例失败"));
|
||||||
jobTasks.add(jobTask);
|
jobTasks.add(jobTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 更新父节点的为非叶子节点
|
||||||
|
if (CollUtil.isNotEmpty(parentJobTasks)) {
|
||||||
|
JobTask parentJobTask = new JobTask();
|
||||||
|
parentJobTask.setId(context.getParentId());
|
||||||
|
parentJobTask.setLeaf(StatusEnum.NO.getStatus());
|
||||||
|
jobTaskMapper.updateById(parentJobTask);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -16,12 +16,7 @@ import com.aizuda.snailjob.server.common.dto.DistributeInstance;
|
|||||||
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
|
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
|
||||||
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
|
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
|
||||||
import com.aizuda.snailjob.server.common.util.DateUtils;
|
import com.aizuda.snailjob.server.common.util.DateUtils;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
|
import com.aizuda.snailjob.server.job.task.dto.*;
|
||||||
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
|
|
||||||
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
|
|
||||||
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
|
|
||||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
|
||||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
|
||||||
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
|
||||||
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
|
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
|
||||||
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
|
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
|
||||||
@ -48,6 +43,8 @@ import java.util.Map;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: opensnail
|
* @author: opensnail
|
||||||
* @date : 2023-10-10 16:50
|
* @date : 2023-10-10 16:50
|
||||||
@ -62,29 +59,24 @@ public class JobTaskBatchHandler {
|
|||||||
private final WorkflowBatchHandler workflowBatchHandler;
|
private final WorkflowBatchHandler workflowBatchHandler;
|
||||||
private final GroupConfigMapper groupConfigMapper;
|
private final GroupConfigMapper groupConfigMapper;
|
||||||
|
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
|
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
|
||||||
|
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
||||||
new LambdaQueryWrapper<JobTask>()
|
new LambdaQueryWrapper<JobTask>()
|
||||||
.select(JobTask::getTaskStatus, JobTask::getExtAttrs)
|
.select(JobTask::getTaskStatus, JobTask::getMrStage)
|
||||||
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
|
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
|
||||||
|
|
||||||
|
if (CollUtil.isEmpty(jobTasks) ||
|
||||||
|
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
|
|
||||||
JobTaskBatch jobTaskBatch = new JobTaskBatch();
|
JobTaskBatch jobTaskBatch = new JobTaskBatch();
|
||||||
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
|
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
|
||||||
|
|
||||||
if (CollUtil.isEmpty(jobTasks)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<Integer, Long> statusCountMap = jobTasks.stream()
|
Map<Integer, Long> statusCountMap = jobTasks.stream()
|
||||||
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
|
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
|
||||||
|
|
||||||
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
|
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
|
||||||
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
|
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
|
||||||
@ -95,8 +87,10 @@ public class JobTaskBatchHandler {
|
|||||||
} else if (stopCount > 0) {
|
} else if (stopCount > 0) {
|
||||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
|
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
|
||||||
} else {
|
} else {
|
||||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
|
// todo 调试完成删除
|
||||||
|
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
|
||||||
|
|
||||||
|
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
|
||||||
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
|
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -115,9 +109,9 @@ public class JobTaskBatchHandler {
|
|||||||
|
|
||||||
jobTaskBatch.setUpdateDt(LocalDateTime.now());
|
jobTaskBatch.setUpdateDt(LocalDateTime.now());
|
||||||
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
|
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
|
||||||
new LambdaUpdateWrapper<JobTaskBatch>()
|
new LambdaUpdateWrapper<JobTaskBatch>()
|
||||||
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
|
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
|
||||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
|
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -126,26 +120,19 @@ public class JobTaskBatchHandler {
|
|||||||
* 若需要执行reduce则返回false 不需要更新批次状态, 否则需要更新批次状态
|
* 若需要执行reduce则返回false 不需要更新批次状态, 否则需要更新批次状态
|
||||||
*
|
*
|
||||||
* @param completeJobBatchDTO 需要执行批次完成所需的参数信息
|
* @param completeJobBatchDTO 需要执行批次完成所需的参数信息
|
||||||
* @param jobTasks 任务项列表
|
* @param jobTasks 任务项列表
|
||||||
* @return true-需要reduce false-不需要reduce
|
* @return true-需要reduce false-不需要reduce
|
||||||
*/
|
*/
|
||||||
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
|
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
|
||||||
// 判断是否是mapreduce任务
|
// 判断是否是mapreduce任务
|
||||||
// todo 此处待优化
|
if (isAllMapTask(jobTasks)) {
|
||||||
JobTask firstJobTask = jobTasks.get(0);
|
// 开启reduce阶段
|
||||||
String extAttrs = firstJobTask.getExtAttrs();
|
try {
|
||||||
if (StrUtil.isNotBlank(extAttrs)) {
|
ActorRef actorRef = ActorGenerator.jobReduceActor();
|
||||||
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
|
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
|
||||||
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
|
return true;
|
||||||
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
|
} catch (Exception e) {
|
||||||
// 开启reduce阶段
|
SnailJobLog.LOCAL.error("tell reduce actor error", e);
|
||||||
try {
|
|
||||||
ActorRef actorRef = ActorGenerator.jobReduceActor();
|
|
||||||
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
|
|
||||||
return true;
|
|
||||||
} catch (Exception e) {
|
|
||||||
SnailJobLog.LOCAL.error("tell reduce actor error", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,17 +140,9 @@ public class JobTaskBatchHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
|
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
|
||||||
return jobTasks.size() == jobTasks.stream()
|
return jobTasks.size() == jobTasks.stream()
|
||||||
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
|
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
|
||||||
.map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class))
|
.count();
|
||||||
.filter(jobTaskExtAttrsDTO -> {
|
|
||||||
String mrStage = jobTaskExtAttrsDTO.getMrStage();
|
|
||||||
if (StrUtil.isNotBlank(mrStage) && MapReduceStageEnum.MAP.name().equals(mrStage)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}).count();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -174,21 +153,21 @@ public class JobTaskBatchHandler {
|
|||||||
*/
|
*/
|
||||||
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
|
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
|
||||||
if (Objects.isNull(job)
|
if (Objects.isNull(job)
|
||||||
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||||
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||||
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||||
// 是否是常驻任务
|
// 是否是常驻任务
|
||||||
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
|
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
|
||||||
// 防止任务已经分配到其他节点导致的任务重复执行
|
// 防止任务已经分配到其他节点导致的任务重复执行
|
||||||
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
|
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
|
||||||
) {
|
) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
|
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
|
||||||
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
|
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
|
||||||
.eq(GroupConfig::getGroupName, job.getGroupName())
|
.eq(GroupConfig::getGroupName, job.getGroupName())
|
||||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
|
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -215,7 +194,7 @@ public class JobTaskBatchHandler {
|
|||||||
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
|
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
|
||||||
|
|
||||||
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds,
|
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds,
|
||||||
DateUtils.toNowMilli() % 1000);
|
DateUtils.toNowMilli() % 1000);
|
||||||
job.setNextTriggerAt(nextTriggerAt);
|
job.setNextTriggerAt(nextTriggerAt);
|
||||||
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
|
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
|
||||||
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
|
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
|
||||||
|
@ -71,7 +71,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
|
|||||||
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest);
|
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest);
|
||||||
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
|
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
|
||||||
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
|
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
|
||||||
context.setMrStage(MapReduceStageEnum.MAP);
|
context.setMrStage(MapReduceStageEnum.MAP.getStage());
|
||||||
context.setMapSubTask(mapTaskRequest.getSubTask());
|
context.setMapSubTask(mapTaskRequest.getSubTask());
|
||||||
List<JobTask> taskList = taskInstance.generate(context);
|
List<JobTask> taskList = taskInstance.generate(context);
|
||||||
if (CollUtil.isEmpty(taskList)) {
|
if (CollUtil.isEmpty(taskList)) {
|
||||||
@ -107,8 +107,6 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
|
|||||||
context.setTaskBatchId(mapTaskRequest.getTaskBatchId());
|
context.setTaskBatchId(mapTaskRequest.getTaskBatchId());
|
||||||
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
|
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
|
||||||
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
|
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
|
||||||
context.setMapName(mapTaskRequest.getMapName());
|
|
||||||
context.setMrStage(MapReduceStageEnum.MAP.name());
|
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user