fix(sj_1.1.0): map主流程完成

This commit is contained in:
opensnail 2024-06-12 18:00:38 +08:00
parent 7e392a2c86
commit d3699cec2f
19 changed files with 436 additions and 23 deletions

View File

@ -41,6 +41,11 @@ public class DispatchJobRequest {
@NotNull(message = "executorTimeout 不能为空")
private Integer executorTimeout;
/**
* 任务名称
*/
private String taskName;
private String argsStr;
private Integer shardingTotal;

View File

@ -0,0 +1,36 @@
package com.aizuda.snailjob.client.model.request;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import java.util.List;
/**
* @author: opensnail
* @date : 2024-06-12 15:10
*/
@Data
public class MapTaskRequest {
@NotNull(message = "jobId 不能为空")
private Long jobId;
@NotNull(message = "taskBatchId 不能为空")
private Long taskBatchId;
@NotNull(message = "parentId 不能为空")
private Long parentId;
private Long workflowTaskBatchId;
private Long workflowNodeId;
@NotBlank(message = "taskName 不能为空")
private String taskName;
@NotEmpty(message = "subTask 不能为空")
private List<Object> subTask;
}

View File

@ -80,6 +80,16 @@ public interface SystemConstants {
*/
String JOB_STOP = "/job/stop/v1";
/**
* 生成同步MAP任务
*/
String JOB_MAP_TASK = "/job/map/task/v1";
/**
* 执行REDUCE任务
*/
String JOB_REDUCE_TASK = "/job/reduce/task/v1";
/**
* 同步配置
*/

View File

@ -15,7 +15,9 @@ public enum JobTaskTypeEnum {
CLUSTER(1),
BROADCAST(2),
SHARDING(3);
SHARDING(3),
MAP_REDUCE(4),
;
private final int type;

View File

@ -37,6 +37,11 @@ public class JobTask implements Serializable {
*/
private String groupName;
/**
* 任务名称
*/
private String taskName;
/**
* 任务信息id
*/

View File

@ -0,0 +1,21 @@
package com.aizuda.snailjob.server.common.util;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import io.netty.handler.codec.http.HttpHeaders;
/**
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
public final class HttpHeaderUtil {
public static String getGroupName(HttpHeaders headers) {
return headers.getAsString(HeadersEnum.GROUP_NAME.getKey());
}
public static String getNamespace(HttpHeaders headers) {
return headers.getAsString(HeadersEnum.NAMESPACE.getKey());
}
}

View File

@ -2,12 +2,14 @@ package com.aizuda.snailjob.server.job.task.client;
import com.aizuda.snailjob.client.model.StopJobDTO;
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Body;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP;
/**
@ -25,4 +27,7 @@ public interface JobRpcClient {
@Mapping(path = JOB_DISPATCH, method = RequestMethod.POST)
Result<Boolean> dispatch(@Body DispatchJobRequest dispatchJobRequest);
@Mapping(path = JOB_MAP_TASK, method = RequestMethod.POST)
Result<Boolean> mapTask(@Body MapTaskRequest mapTaskRequest);
}

View File

@ -0,0 +1,37 @@
package com.aizuda.snailjob.server.job.task.dto;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import lombok.Data;
/**
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
@Data
public class JobTaskExtAttrsDTO {
/**
* 任务名称(目前只有map reduce使用)
*/
private String mapName;
/**
* see: {@link JobTaskTypeEnum}
*/
private Integer taskType;
/**
* 当前任务处于map reduce的哪个阶段
* see: {@link MapReduceStageEnum}
*/
private String mrStage;
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -30,6 +30,11 @@ public class RealJobExecutorDTO extends BaseDTO {
*/
private String argsType;
/**
* 任务名称
*/
private String taskName;
/**
* 扩展字段
*/

View File

@ -0,0 +1,11 @@
package com.aizuda.snailjob.server.job.task.enums;
/**
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
public enum MapReduceStageEnum {
MAP,
REDUCE
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support;
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.job.task.dto.*;
@ -61,6 +62,8 @@ public interface JobTaskConverter {
)
JobTaskGenerateContext toJobTaskInstanceGenerateContext(Job job);
JobTaskGenerateContext toJobTaskInstanceGenerateContext(MapTaskRequest request);
JobTask toJobTaskInstance(JobTaskGenerateContext context);
BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO);

View File

@ -131,6 +131,7 @@ public class JobExecutorActor extends AbstractActor {
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
instanceGenerateContext.setTaskName("ROOT_TASK");
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollUtil.isEmpty(taskList)) {
return;

View File

@ -17,11 +17,9 @@ import java.util.List;
* @date 2023-10-03 22:12:40
* @since 2.4.0
*/
@Slf4j
@Component
public class ClusterJobExecutor extends AbstractJobExecutor {
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.CLUSTER;

View File

@ -0,0 +1,39 @@
package com.aizuda.snailjob.server.job.task.support.executor.job;
import akka.actor.ActorRef;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
@Component
public class MapReduceJobExecutor extends AbstractJobExecutor {
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP_REDUCE;
}
@Override
protected void doExecute(final JobExecutorContext context) {
List<JobTask> taskList = context.getTaskList();
for (int i = 0; i < taskList.size(); i++) {
JobTask jobTask = taskList.get(i);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
}
}
}

View File

@ -2,6 +2,8 @@ package com.aizuda.snailjob.server.job.task.support.generator.task;
import lombok.Data;
import java.util.List;
/**
* @author opensnail
* @date 2023-10-02 13:02:57
@ -27,4 +29,14 @@ public class JobTaskGenerateContext {
* 参数类型 text/json
*/
private Integer argsType;
/**
* 动态分片的Map任务
*/
private List<?> mapSubTask;
/**
* 任务名称
*/
private String taskName;
}

View File

@ -0,0 +1,78 @@
package com.aizuda.snailjob.server.job.task.support.generator.task;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* 生成Map任务
*
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
@Component
@RequiredArgsConstructor
public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private final JobTaskMapper jobTaskMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP_REDUCE;
}
@Override
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(serverNodes)) {
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
List<?> mapSubTask = context.getMapSubTask();
if (CollUtil.isEmpty(mapSubTask)) {
return Lists.newArrayList();
}
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
for (int index = 0; index < mapSubTask.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
return jobTasks;
}
}

View File

@ -1,19 +1,25 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
@ -47,6 +53,7 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
@Slf4j
public class JobTaskBatchHandler {
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
@ -57,9 +64,9 @@ public class JobTaskBatchHandler {
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
@ -73,7 +80,7 @@ public class JobTaskBatchHandler {
}
Map<Integer, Long> statusCountMap = jobTasks.stream()
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
@ -85,6 +92,20 @@ public class JobTaskBatchHandler {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
// 判断是否是mapreduce任务
JobTask firstJobTask = jobTasks.get(0);
String extAttrs = firstJobTask.getExtAttrs();
if (StrUtil.isNotBlank(extAttrs)) {
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(),
JobTaskExtAttrsDTO.class);
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType) {
if (isAllMapTask(jobTasks)) {
// TODO 开启reduce阶段
}
}
}
}
if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) {
@ -100,36 +121,50 @@ public class JobTaskBatchHandler {
jobTaskBatch.setUpdateDt(LocalDateTime.now());
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
return jobTasks.size() == jobTasks.stream()
.filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs()))
.map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class))
.filter(jobTaskExtAttrsDTO -> {
String mrStage = jobTaskExtAttrsDTO.getMrStage();
if (StrUtil.isNotBlank(mrStage) && MapReduceStageEnum.MAP.name().equals(mrStage)) {
return true;
}
return false;
}).count();
}
/**
* 开启常驻任务
*
* @param job 定时任务配置信息
* @param job 定时任务配置信息
* @param taskExecuteDTO 任务执行新
*/
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
) {
return;
}
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
if (count == 0) {
return;
}
@ -155,7 +190,8 @@ public class JobTaskBatchHandler {
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000);
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds,
DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);

View File

@ -0,0 +1,110 @@
package com.aizuda.snailjob.server.job.task.support.request;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK;
/**
* 动态分片客户端生成map任务
*
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
@Component
@RequiredArgsConstructor
public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
private final JobMapper jobMapper;
@Override
public boolean supports(final String path) {
return JOB_MAP_TASK.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.POST;
}
@Override
public String doHandler(final String content, final UrlQuery query, final HttpHeaders headers) {
SnailJobLog.LOCAL.debug("map task Request. content:[{}]", content);
String groupName = HttpHeaderUtil.getGroupName(headers);
String namespace = HttpHeaderUtil.getNamespace(headers);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
Object[] args = retryRequest.getArgs();
MapTaskRequest mapTaskRequest = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), MapTaskRequest.class);
// 创建map任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest);
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.TRUE, retryRequest.getReqId()));
}
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
.eq(Job::getId, mapTaskRequest.getJobId())
.eq(Job::getGroupName, groupName)
.eq(Job::getNamespaceId, namespace)
);
if (Objects.isNull(job)) {
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.TRUE,
retryRequest.getReqId()));
}
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType());
jobExecutor.execute(buildJobExecutorContext(mapTaskRequest, job, taskList));
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.YES.getStatus(), "Report Map Task Processed Successfully", Boolean.TRUE,
retryRequest.getReqId()));
}
private static JobExecutorContext buildJobExecutorContext(MapTaskRequest mapTaskRequest, Job job,
List<JobTask> taskList) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(mapTaskRequest.getTaskBatchId());
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
return context;
}
}

View File

@ -25,7 +25,6 @@ import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH
* @date 2023-09-30 23:01:58
* @since 2.4.0
*/
@Slf4j
@Component
public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestHandler {