feat(sj_1.0.1): 完成reduce 任务

This commit is contained in:
opensnail 2024-06-15 23:44:24 +08:00
parent 3fab4805c2
commit 5f50e31c2a
26 changed files with 234 additions and 212 deletions

View File

@ -21,15 +21,10 @@ import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Valid;
import jakarta.validation.Validation;
import jakarta.validation.Validator;
import jakarta.validation.ValidatorFactory;
import org.springframework.validation.annotation.Validated;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
@ -119,7 +114,7 @@ public class JobEndPoint {
jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId());
jobContext.setRetry(dispatchJob.isRetry());
jobContext.setRetryScene(dispatchJob.getRetryScene());
jobContext.setMapName(dispatchJob.getMapName());
jobContext.setTaskName(dispatchJob.getTaskName());
jobContext.setMrStage(dispatchJob.getMrStage());
return jobContext;
}

View File

@ -13,6 +13,6 @@ import lombok.EqualsAndHashCode;
@Data
public class MapArgs extends JobArgs {
private String mapName;
private String taskName;
}

View File

@ -14,7 +14,7 @@ import java.util.List;
@Data
public class MapReduceArgs extends JobArgs {
private String mapName;
private String taskName;
private List<MapArgs> mapArgsList;
}

View File

@ -50,7 +50,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
jobArgs = buildShardingJobArgs(jobContext);
} else if (Lists.newArrayList(JobTaskTypeEnum.MAP_REDUCE.getType(), JobTaskTypeEnum.MAP.getType())
.contains(jobContext.getTaskType())) {
if (MapReduceStageEnum.MAP.name().equals(jobContext.getMrStage())) {
if (MapReduceStageEnum.MAP.getStage() == jobContext.getMrStage()) {
jobArgs = buildMapJobArgs(jobContext);
} else {
jobArgs = buildReduceJobArgs(jobContext);
@ -106,7 +106,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
MapArgs jobArgs = new MapArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setMapName(jobContext.getMapName());
jobArgs.setTaskName(jobContext.getTaskName());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
return jobArgs;
}

View File

@ -13,6 +13,7 @@ import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
@ -39,15 +40,19 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs);
public void doMapExecute(List<Object> taskList, String nextMapName) {
public ExecuteResult doMap(List<Object> taskList, String nextTaskName) {
if (CollectionUtils.isEmpty(taskList) || StrUtil.isBlank(nextMapName)) {
return;
if (StrUtil.isBlank(nextTaskName)) {
throw new SnailJobMapReduceException("The next task name can not blank or null {}", nextTaskName);
}
// mapName 任务命名和根任务名或者最终任务名称一致导致的问题无限生成子任务或者直接失败
if (SystemConstants.MAP_ROOT.equals(nextMapName)) {
throw new SnailJobMapReduceException("The Next mapName can not be {}", SystemConstants.MAP_ROOT);
if (CollectionUtils.isEmpty(taskList)) {
throw new SnailJobMapReduceException("The task list can not empty {}", nextTaskName);
}
// taskName 任务命名和根任务名或者最终任务名称一致导致的问题无限生成子任务或者直接失败
if (SystemConstants.MAP_ROOT.equals(nextTaskName)) {
throw new SnailJobMapReduceException("The Next taskName can not be {}", SystemConstants.MAP_ROOT);
}
JobContext jobContext = JobContextManager.getJobContext();
@ -56,17 +61,18 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
MapTaskRequest mapTaskRequest = new MapTaskRequest();
mapTaskRequest.setJobId(jobContext.getJobId());
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
mapTaskRequest.setMapName(nextMapName);
mapTaskRequest.setTaskName(nextTaskName);
mapTaskRequest.setSubTask(taskList);
mapTaskRequest.setParentId(jobContext.getTaskId());
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
Result<Boolean> booleanResult = CLIENT.batchReportMapTask(mapTaskRequest);
if (booleanResult.getData()) {
log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), nextMapName, taskList.size());
// 2. 同步发送请求
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);
if (result.getData()) {
SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId());
} else {
throw new SnailJobMapReduceException("map failed for task: " + nextMapName);
throw new SnailJobMapReduceException("map failed for task: " + nextTaskName);
}
return ExecuteResult.success();
}
}

View File

@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
@ -20,12 +21,13 @@ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
@Override
public ExecuteResult doJobExecute(final JobArgs jobArgs) {
JobContext jobContext = JobContextManager.getJobContext();
if (jobContext.getMrStage().equals("MAP")) {
if (jobContext.getMrStage().equals(MapReduceStageEnum.MAP.getStage())) {
return super.doJobExecute(jobArgs);
} else if(jobContext.getMrStage().equals("REDUCE")) {
} else if(jobContext.getMrStage().equals(MapReduceStageEnum.REDUCE.getStage())) {
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
return doReduceExecute(reduceArgs);
}
throw new SnailJobMapReduceException("非法的MapReduceStage");
}

View File

@ -44,9 +44,9 @@ public class DispatchJobRequest {
/**
* 任务名称
*/
private String mapName;
private String taskName;
private String mrStage;
private Integer mrStage;
private String argsStr;

View File

@ -27,8 +27,8 @@ public class MapTaskRequest {
private Long workflowNodeId;
@NotBlank(message = "mapName 不能为空")
private String mapName;
@NotBlank(message = "taskName 不能为空")
private String taskName;
@NotEmpty(message = "subTask 不能为空")
private List<Object> subTask;

View File

@ -1,11 +1,35 @@
package com.aizuda.snailjob.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Objects;
/**
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
@Getter
@AllArgsConstructor
public enum MapReduceStageEnum {
MAP,
REDUCE
MAP(1),
REDUCE(2),
MERGE_REDUCE(3);
private final int stage;
public static MapReduceStageEnum ofStage(Integer stage) {
if (Objects.isNull(stage)) {
return null;
}
for (MapReduceStageEnum value : MapReduceStageEnum.values()) {
if (value.getStage() == stage) {
return value;
}
}
return null;
}
}

View File

@ -57,7 +57,7 @@ public class JobContext {
/**
* Map名称
*/
private String mapName;
private String taskName;
private String mrStage;
private Integer mrStage;
}

View File

@ -20,7 +20,7 @@ public final class MapContext {
/**
* Map名称
*/
private String mapName;
private String taskName;
private Long jobId;

View File

@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@ -19,6 +20,7 @@ import java.time.LocalDateTime;
@TableName("sj_job_task")
public class JobTask implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
@ -37,6 +39,11 @@ public class JobTask implements Serializable {
*/
private String groupName;
/**
* 任务名称
*/
private String taskName;
/**
* 任务信息id
*/
@ -83,6 +90,17 @@ public class JobTask implements Serializable {
*/
private Integer argsType;
/**
* 叶子节点(0:非叶子节点 1:叶子节点)
*/
private Integer leaf;
/**
* 动态分片使用
* 1:map 2:reduce 3:mergeReduce
*/
private Integer mrStage;
/**
* 扩展字段
*/

View File

@ -40,5 +40,6 @@ public class JobExecutorResultDTO {
private Integer jobOperationReason;
private Integer isLeaf;
}

View File

@ -1,37 +0,0 @@
package com.aizuda.snailjob.server.job.task.dto;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import lombok.Data;
/**
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
@Data
public class JobTaskExtAttrsDTO {
/**
* 任务名称(目前只有map reduce使用)
*/
private String mapName;
/**
* see: {@link JobTaskTypeEnum}
*/
private Integer taskType;
/**
* 当前任务处于map reduce的哪个阶段
* see: {@link MapReduceStageEnum}
*/
private String mrStage;
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.dto;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -31,14 +32,15 @@ public class RealJobExecutorDTO extends BaseDTO {
private String argsType;
/**
* MAP名称
* 任务名称
*/
private String mapName;
private String taskName;
/**
* MAP名称
* 动态分片的阶段
* {@link MapReduceStageEnum}
*/
private String mrState;
private Integer mrStage;
/**
* 扩展字段

View File

@ -104,7 +104,9 @@ public interface JobTaskConverter {
@Mapping(source = "jobTask.argsStr", target = "argsStr"),
@Mapping(source = "jobTask.argsType", target = "argsType"),
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
@Mapping(source = "jobTask.namespaceId", target = "namespaceId")
@Mapping(source = "jobTask.namespaceId", target = "namespaceId"),
@Mapping(source = "jobTask.taskName", target = "taskName"),
@Mapping(source = "jobTask.mrStage", target = "mrStage")
})
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);

View File

@ -3,10 +3,8 @@ package com.aizuda.snailjob.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
@ -53,12 +51,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
String extAttrs = jobTask.getExtAttrs();
// TODO 待优化
if (StrUtil.isNotBlank(extAttrs)) {
JobTaskExtAttrsDTO extAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
realJobExecutor.setMapName(extAttrsDTO.getMapName());
}
realJobExecutor.setTaskName(jobTask.getTaskName());
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
return;

View File

@ -2,17 +2,25 @@ package com.aizuda.snailjob.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.RandomUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import lombok.extern.slf4j.Slf4j;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Set;
/**
@ -21,8 +29,9 @@ import java.util.Set;
* @since : sj_1.1.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandler {
private final JobTaskMapper jobTaskMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
@ -31,13 +40,16 @@ public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandle
@Override
protected void doCallback(final ClientCallbackContext context) {
JobTask jobTask = jobTaskMapper.selectOne(new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getId, context.getTaskId()));
Assert.notNull(jobTask, () -> new SnailJobServerException("job task is null"));
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId());
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
jobExecutorResultDTO.setResult(context.getExecuteResult().getResult());
jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType());
jobExecutorResultDTO.setIsLeaf(jobTask.getLeaf());
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
actorRef.tell(jobExecutorResultDTO, actorRef);
}
@ -46,7 +58,7 @@ public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandle
protected String chooseNewClient(ClientCallbackContext context) {
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(nodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return null;
}

View File

@ -135,10 +135,10 @@ public class JobExecutorActor extends AbstractActor {
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
instanceGenerateContext.setMapName(SystemConstants.MAP_ROOT);
instanceGenerateContext.setTaskName(SystemConstants.MAP_ROOT);
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
// TODO 此处需要判断任务类型
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP.getStage());
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollUtil.isEmpty(taskList)) {
SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId());
@ -181,8 +181,6 @@ public class JobExecutorActor extends AbstractActor {
context.setJobId(job.getId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
context.setMapName(SystemConstants.MAP_ROOT);
context.setMrStage(MapReduceStageEnum.MAP.name());
return context;
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
@ -57,6 +58,12 @@ public class JobExecutorResultActor extends AbstractActor {
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
() -> new SnailJobServerException("更新任务实例失败"));
// 除MAP和MAP_REDUCE 任务之外其他任务都是叶子节点
if (Objects.nonNull(result.getIsLeaf()) && StatusEnum.NO.getStatus().equals(result.getIsLeaf())) {
return;
}
// 先尝试完成若已完成则不需要通过获取分布式锁来完成
boolean tryCompleteAndStop = tryCompleteAndStop(result);
if (!tryCompleteAndStop) {

View File

@ -3,10 +3,11 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
@ -16,8 +17,11 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
@ -41,18 +45,19 @@ public class ReduceActor extends AbstractActor {
private static final String KEY = "job_generate_reduce_{0}_{1}";
private final DistributedLockHandler distributedLockHandler;
private final JobMapper jobMapper;
private final JobTaskMapper jobTaskMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask));
try {
distributedLockHandler.lockWithDisposableAndRetry(() -> {
doReduce(reduceTask);
}, MessageFormat.format(KEY, reduceTask.getTaskBatchId(),
reduceTask.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
doReduce(reduceTask);
}, MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()),
Duration.ofSeconds(1),
Duration.ofSeconds(1),
3);
} catch (Exception e) {
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
}
@ -62,13 +67,29 @@ public class ReduceActor extends AbstractActor {
private void doReduce(final ReduceTaskDTO reduceTask) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1),
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId)
.eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.REDUCE.getStage())
);
if (CollUtil.isNotEmpty(jobTasks)) {
// 说明已经创建了reduce任务了
return;
}
Job job = jobMapper.selectById(reduceTask.getJobId());
// 非MAP_REDUCE任务不处理
if (JobTaskTypeEnum.MAP_REDUCE.getType() != job.getTaskType()) {
return;
}
// 创建reduce任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setMrStage(MapReduceStageEnum.REDUCE);
context.setMrStage(MapReduceStageEnum.REDUCE.getStage());
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());
@ -82,13 +103,12 @@ public class ReduceActor extends AbstractActor {
}
private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job,
List<JobTask> taskList) {
List<JobTask> taskList) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
context.setMrStage(MapReduceStageEnum.REDUCE.name());
return context;
}
}

View File

@ -90,8 +90,4 @@ public class JobExecutorContext {
private Long workflowNodeId;
private String mapName;
private String mrStage;
}

View File

@ -39,12 +39,12 @@ public class JobTaskGenerateContext {
/**
* 任务名称
*/
private String mapName;
private String taskName;
/**
* 动态分片的阶段
*/
private MapReduceStageEnum mrStage;
private Integer mrStage;
/**
* 父任务id

View File

@ -5,6 +5,8 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -12,24 +14,20 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.*;
/**
* 生成Map Reduce任务
@ -54,15 +52,18 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
// TODO 若没有客户端节点JobTask是否需要创建????
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
context.getNamespaceId());
context.getNamespaceId());
if (CollUtil.isEmpty(serverNodes)) {
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage());
Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed"));
switch (context.getMrStage()) {
// todo 待优化
switch (mapReduceStageEnum) {
case MAP -> {
// MAP任务
return createMapJobTasks(context, nodeInfoList, serverNodes);
@ -76,37 +77,24 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
}
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
Set<RegisterNodeInfo> serverNodes) {
Set<RegisterNodeInfo> serverNodes) {
// TODO reduce阶段的并行度
int reduceParallel = 10;
int reduceParallel = 2;
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getExtAttrs)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
// 若存在已经生成的reduce任务不需要重新生成
boolean existedReduce = jobTasks.stream()
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
.map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class))
.anyMatch(jobTaskExtAttrsDTO -> MapReduceStageEnum.REDUCE.name().equals(jobTaskExtAttrsDTO.getMrStage()));
if (existedReduce) {
SnailJobLog.LOCAL.warn("The reduce task already exists. taskBatchId:[{}]", context.getTaskBatchId());
return Lists.newArrayList();
}
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getMrStage, MapReduceStageEnum.MAP.getStage())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
// 这里需要判断是否是map
List<String> allMapJobTasks = StreamUtils.toList(jobTasks, JobTask::getResultMessage);
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO();
jobTaskExtAttrsDTO.setMapName(context.getMapName());
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name());
jobTasks = new ArrayList<>(partition.size());
final List<JobTask> finalJobTasks = jobTasks;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
@ -120,9 +108,11 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage());
jobTask.setTaskName("REDUCE_TASK");
// TODO 改批量插入
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
() -> new SnailJobServerException("新增任务实例失败"));
finalJobTasks.add(jobTask);
}
}
@ -131,18 +121,22 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
return finalJobTasks;
}
private @Nullable List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
@NotNull
private List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
List<?> mapSubTask = context.getMapSubTask();
if (CollUtil.isEmpty(mapSubTask)) {
SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId());
return Lists.newArrayList();
}
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO();
jobTaskExtAttrsDTO.setMapName(context.getMapName());
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name());
// 判定父节点是不是叶子节点若是则不更新否则更新为非叶子节点
List<JobTask> parentJobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1),
new LambdaQueryWrapper<JobTask>().select(JobTask::getId)
.eq(JobTask::getId, context.getParentId())
.eq(JobTask::getLeaf, StatusEnum.YES.getStatus())
);
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@ -157,12 +151,24 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setMrStage(MapReduceStageEnum.MAP.getStage());
jobTask.setTaskName(context.getTaskName());
jobTask.setLeaf(StatusEnum.YES.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
// TODO 改批量插入
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
() -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
// 更新父节点的为非叶子节点
if (CollUtil.isNotEmpty(parentJobTasks)) {
JobTask parentJobTask = new JobTask();
parentJobTask.setId(context.getParentId());
parentJobTask.setLeaf(StatusEnum.NO.getStatus());
jobTaskMapper.updateById(parentJobTask);
}
}
});

View File

@ -16,12 +16,7 @@ import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.dto.*;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
@ -48,6 +43,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP;
/**
* @author: opensnail
* @date : 2023-10-10 16:50
@ -62,29 +59,24 @@ public class JobTaskBatchHandler {
private final WorkflowBatchHandler workflowBatchHandler;
private final GroupConfigMapper groupConfigMapper;
@Transactional
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getExtAttrs)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getMrStage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
if (CollUtil.isEmpty(jobTasks) ||
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
if (CollUtil.isEmpty(jobTasks)) {
return false;
}
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
Map<Integer, Long> statusCountMap = jobTasks.stream()
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
@ -95,8 +87,10 @@ public class JobTaskBatchHandler {
} else if (stopCount > 0) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
// todo 调试完成删除
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
return false;
}
@ -115,9 +109,9 @@ public class JobTaskBatchHandler {
jobTaskBatch.setUpdateDt(LocalDateTime.now());
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}
@ -126,26 +120,19 @@ public class JobTaskBatchHandler {
* 若需要执行reduce则返回false 不需要更新批次状态 否则需要更新批次状态
*
* @param completeJobBatchDTO 需要执行批次完成所需的参数信息
* @param jobTasks 任务项列表
* @param jobTasks 任务项列表
* @return true-需要reduce false-不需要reduce
*/
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
// 判断是否是mapreduce任务
// todo 此处待优化
JobTask firstJobTask = jobTasks.get(0);
String extAttrs = firstJobTask.getExtAttrs();
if (StrUtil.isNotBlank(extAttrs)) {
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
// 开启reduce阶段
try {
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
return true;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
if (isAllMapTask(jobTasks)) {
// 开启reduce阶段
try {
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
return true;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
}
@ -153,17 +140,9 @@ public class JobTaskBatchHandler {
}
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
return jobTasks.size() == jobTasks.stream()
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
.map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class))
.filter(jobTaskExtAttrsDTO -> {
String mrStage = jobTaskExtAttrsDTO.getMrStage();
if (StrUtil.isNotBlank(mrStage) && MapReduceStageEnum.MAP.name().equals(mrStage)) {
return true;
}
return false;
}).count();
return jobTasks.size() == jobTasks.stream()
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
.count();
}
/**
@ -174,21 +153,21 @@ public class JobTaskBatchHandler {
*/
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
) {
return;
}
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
if (count == 0) {
return;
}
@ -215,7 +194,7 @@ public class JobTaskBatchHandler {
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds,
DateUtils.toNowMilli() % 1000);
DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);

View File

@ -71,7 +71,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest);
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
context.setMrStage(MapReduceStageEnum.MAP);
context.setMrStage(MapReduceStageEnum.MAP.getStage());
context.setMapSubTask(mapTaskRequest.getSubTask());
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
@ -107,8 +107,6 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setTaskBatchId(mapTaskRequest.getTaskBatchId());
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
context.setMapName(mapTaskRequest.getMapName());
context.setMrStage(MapReduceStageEnum.MAP.name());
return context;
}