fix(sj_1.1.0): 完成map任务的创建和调度

This commit is contained in:
opensnail 2024-06-13 23:46:23 +08:00
parent 100cd1271b
commit 4e8bd301d2
16 changed files with 195 additions and 23 deletions

View File

@ -65,7 +65,7 @@ public class JobEndPoint {
if (IJobExecutor.class.isAssignableFrom(executor.getClass())) {
if (JobTaskTypeEnum.MAP.getType() == jobContext.getTaskType()) {
jobExecutor = (AbstractMapExecutor) executor;
} else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskId()) {
} else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskType()) {
jobExecutor = (AbstractMapReduceExecutor) executor;
} else {
jobExecutor = (AbstractJobExecutor) executor;
@ -115,6 +115,7 @@ public class JobEndPoint {
jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId());
jobContext.setRetry(dispatchJob.isRetry());
jobContext.setRetryScene(dispatchJob.getRetryScene());
jobContext.setMapName(dispatchJob.getMapName());
return jobContext;
}

View File

@ -0,0 +1,16 @@
package com.aizuda.snailjob.client.job.core.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author: opensnail
* @date : 2024-06-13
* @since : sj_1.1.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class MapReduceArgs extends JobArgs {
private String mapName;
}

View File

@ -5,6 +5,7 @@ import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.cache.FutureCache;
import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ShardingJobArgs;
import com.aizuda.snailjob.client.job.core.log.JobLogMeta;
import com.aizuda.snailjob.client.job.core.timer.StopTaskTimerTask;
@ -47,6 +48,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
JobArgs jobArgs;
if (jobContext.getTaskType() == JobTaskTypeEnum.SHARDING.getType()) {
jobArgs = buildShardingJobArgs(jobContext);
} else if (jobContext.getTaskType() == JobTaskTypeEnum.MAP_REDUCE.getType()) {
jobArgs = buildMapReduceJobArgs(jobContext);
} else {
jobArgs = buildJobArgs(jobContext);
}
@ -73,6 +76,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
logMeta.setJobId(jobContext.getJobId());
logMeta.setTaskBatchId(jobContext.getTaskBatchId());
SnailJobLogManager.initLogInfo(logMeta, LogTypeEnum.JOB);
JobContextManager.setJobContext(jobContext);
}
private static JobArgs buildJobArgs(JobContext jobContext) {
@ -92,5 +96,14 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
return jobArgs;
}
private static JobArgs buildMapReduceJobArgs(JobContext jobContext) {
MapReduceArgs jobArgs = new MapReduceArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setMapName(jobContext.getMapName());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
return jobArgs;
}
protected abstract ExecuteResult doJobExecute(JobArgs jobArgs);
}

View File

@ -1,11 +1,16 @@
package com.aizuda.snailjob.client.job.core.executor;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.MapContext;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.Result;
import lombok.extern.slf4j.Slf4j;
@ -23,36 +28,45 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
.client(JobNettyClient.class)
.async(Boolean.FALSE)
.build();
public void doMapExecute(JobContext jobContext) {
@Override
protected ExecuteResult doJobExecute(final JobArgs jobArgs) {
MapReduceArgs mapReduceArgs = (MapReduceArgs) jobArgs;
return doJobExecute(mapReduceArgs);
}
List<Object> taskList = jobContext.getTaskList();
String mapName = jobContext.getMapName();
if (CollectionUtils.isEmpty(taskList)) {
public abstract ExecuteResult doJobExecute(MapReduceArgs mapReduceArgs);
public void doMapExecute(List<Object> taskList, String nextMapName) {
if (CollectionUtils.isEmpty(taskList) || StrUtil.isBlank(nextMapName)) {
return;
}
// mapName 任务命名和根任务名或者最终任务名称一致导致的问题无限生成子任务或者直接失败
if ("ROOT_TASK".equals(mapName) || "ROOT_TASK".equals(mapName)) {
log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically.", jobContext.getTaskId(), mapName, mapName);
mapName = "X-" + mapName;
if ("ROOT_TASK".equals(nextMapName)) {
throw new SnailJobMapReduceException("NextMapName can not be ROOT_TASK");
}
JobContext jobContext = JobContextManager.getJobContext();
// 1. 构造请求
MapTaskRequest mapTaskRequest = new MapTaskRequest();
mapTaskRequest.setJobId(jobContext.getJobId());
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
mapTaskRequest.setTaskName(mapName);
mapTaskRequest.setMapName(nextMapName);
mapTaskRequest.setSubTask(taskList);
mapTaskRequest.setParentId(jobContext.getTaskId());
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
Result<Boolean> booleanResult = CLIENT.batchReportMapTask(mapTaskRequest);
if (booleanResult.getData()) {
log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), mapName, taskList.size());
log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), nextMapName, taskList.size());
} else {
throw new SnailJobMapReduceException("map failed for task: " + mapName);
throw new SnailJobMapReduceException("map failed for task: " + nextMapName);
}
}
}

View File

@ -0,0 +1,26 @@
package com.aizuda.snailjob.client.job.core.executor;
import com.aizuda.snailjob.common.core.model.JobContext;
/**
* @author: opensnail
* @date : 2024-06-13
* @since : sj_1.1.0
*/
public final class JobContextManager {
private static final ThreadLocal<JobContext> JOB_CONTEXT_LOCAL = new ThreadLocal<>();
public static void setJobContext(JobContext jobContext) {
JOB_CONTEXT_LOCAL.set(jobContext);
}
public static JobContext getJobContext() {
return JOB_CONTEXT_LOCAL.get();
}
public static void removeJobContext() {
JOB_CONTEXT_LOCAL.remove();
}
}

View File

@ -44,7 +44,7 @@ public class DispatchJobRequest {
/**
* 任务名称
*/
private String taskName;
private String mapName;
private String argsStr;

View File

@ -27,8 +27,8 @@ public class MapTaskRequest {
private Long workflowNodeId;
@NotBlank(message = "taskName 不能为空")
private String taskName;
@NotBlank(message = "mapName 不能为空")
private String mapName;
@NotEmpty(message = "subTask 不能为空")
private List<Object> subTask;

View File

@ -0,0 +1,30 @@
package com.aizuda.snailjob.common.core.model;
import lombok.Data;
import java.util.List;
/**
* @author: opensnail
* @date : 2024-06-13
* @since : sj_1.1.0
*/
@Data
public final class MapContext {
/**
* Map集合列表
*/
private List<Object> taskList;
/**
* Map名称
*/
private String mapName;
private Long jobId;
private Long taskBatchId;
private Long taskId;
}

View File

@ -32,11 +32,6 @@ public class JobTask extends CreateUpdateDt {
*/
private String groupName;
/**
* 任务名称
*/
private String taskName;
/**
* 任务信息id
*/

View File

@ -33,7 +33,7 @@ public class RealJobExecutorDTO extends BaseDTO {
/**
* 任务名称
*/
private String taskName;
private String mapName;
/**
* 扩展字段

View File

@ -3,8 +3,10 @@ package com.aizuda.snailjob.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.enums.JobRetrySceneEnum;
import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
@ -51,6 +53,12 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
String extAttrs = jobTask.getExtAttrs();
// TODO 待优化
if (StrUtil.isNotBlank(extAttrs)) {
JobTaskExtAttrsDTO extAttrsDTO = JsonUtil.parseObject(extAttrs, JobTaskExtAttrsDTO.class);
realJobExecutor.setMapName(extAttrsDTO.getMapName());
}
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
return;

View File

@ -0,0 +1,56 @@
package com.aizuda.snailjob.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Set;
/**
* @author: opensnail
* @date : 2024-06-13 21:22x
* @since : sj_1.1.0
*/
@Component
@Slf4j
public class MapReduceClientCallbackHandler extends AbstractClientCallbackHandler {
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP_REDUCE;
}
@Override
protected void doCallback(final ClientCallbackContext context) {
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId());
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
jobExecutorResultDTO.setResult(context.getExecuteResult().getResult());
jobExecutorResultDTO.setTaskType(getTaskInstanceType().getType());
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
actorRef.tell(jobExecutorResultDTO, actorRef);
}
@Override
protected String chooseNewClient(ClientCallbackContext context) {
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(nodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return null;
}
RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0]));
return ClientInfoUtils.generate(serverNode);
}
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
@ -36,6 +37,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@ -133,6 +135,7 @@ public class JobExecutorActor extends AbstractActor {
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
instanceGenerateContext.setMapName("ROOT_TASK");
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
// TODO 此处需要判断任务类型
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
@ -176,6 +179,7 @@ public class JobExecutorActor extends AbstractActor {
context.setJobId(job.getId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
context.setMapName("ROOT_TASK");
return context;
}

View File

@ -90,4 +90,6 @@ public class JobExecutorContext {
private Long workflowNodeId;
private String mapName;
}

View File

@ -45,4 +45,9 @@ public class JobTaskGenerateContext {
* 动态分片的阶段
*/
private MapReduceStageEnum mrStage;
/**
* 父任务id
*/
private Long parentId;
}

View File

@ -58,7 +58,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
@Override
public String doHandler(final String content, final UrlQuery query, final HttpHeaders headers) {
SnailJobLog.LOCAL.debug("map task Request. content:[{}]", content);
SnailJobLog.LOCAL.info("map task Request. content:[{}]", content);
String groupName = HttpHeaderUtil.getGroupName(headers);
String namespace = HttpHeaderUtil.getNamespace(headers);
@ -72,10 +72,11 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
context.setMrStage(MapReduceStageEnum.MAP);
context.setMapSubTask(mapTaskRequest.getSubTask());
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.TRUE, retryRequest.getReqId()));
new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE, retryRequest.getReqId()));
}
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
@ -86,7 +87,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
if (Objects.isNull(job)) {
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.TRUE,
new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.FALSE,
retryRequest.getReqId()));
}
@ -106,6 +107,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setTaskBatchId(mapTaskRequest.getTaskBatchId());
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
context.setMapName(mapTaskRequest.getMapName());
return context;
}