diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java index c3083ed64..604a65ff4 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java @@ -41,6 +41,11 @@ public class DispatchJobRequest { @NotNull(message = "executorTimeout 不能为空") private Integer executorTimeout; + /** + * 任务名称 + */ + private String taskName; + private String argsStr; private Integer shardingTotal; diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java new file mode 100644 index 000000000..7f9480a2e --- /dev/null +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java @@ -0,0 +1,36 @@ +package com.aizuda.snailjob.client.model.request; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +/** + * @author: opensnail + * @date : 2024-06-12 15:10 + */ +@Data +public class MapTaskRequest { + + @NotNull(message = "jobId 不能为空") + private Long jobId; + + @NotNull(message = "taskBatchId 不能为空") + private Long taskBatchId; + + @NotNull(message = "parentId 不能为空") + private Long parentId; + + private Long workflowTaskBatchId; + + private Long workflowNodeId; + + @NotBlank(message = "taskName 不能为空") + private String taskName; + + @NotEmpty(message = "subTask 不能为空") + private List subTask; + +} diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index d7dfd04c6..bcfde1538 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -80,6 +80,16 @@ public interface SystemConstants { */ String JOB_STOP = "/job/stop/v1"; + /** + * 生成同步MAP任务 + */ + String JOB_MAP_TASK = "/job/map/task/v1"; + + /** + * 执行REDUCE任务 + */ + String JOB_REDUCE_TASK = "/job/reduce/task/v1"; + /** * 同步配置 */ diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java index 3dd9cc037..6446283fe 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java @@ -15,7 +15,9 @@ public enum JobTaskTypeEnum { CLUSTER(1), BROADCAST(2), - SHARDING(3); + SHARDING(3), + MAP_REDUCE(4), + ; private final int type; diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java index a5490561a..e9c1bcc5b 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/JobTask.java @@ -37,6 +37,11 @@ public class JobTask implements Serializable { */ private String groupName; + /** + * 任务名称 + */ + private String taskName; + /** * 任务信息id */ diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/HttpHeaderUtil.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/HttpHeaderUtil.java new file mode 100644 index 000000000..4b38625bd --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/util/HttpHeaderUtil.java @@ -0,0 +1,21 @@ +package com.aizuda.snailjob.server.common.util; + +import com.aizuda.snailjob.common.core.enums.HeadersEnum; +import io.netty.handler.codec.http.HttpHeaders; + +/** + * @author: opensnail + * @date : 2024-06-12 + * @since : sj_1.1.0 + */ +public final class HttpHeaderUtil { + + public static String getGroupName(HttpHeaders headers) { + return headers.getAsString(HeadersEnum.GROUP_NAME.getKey()); + } + + public static String getNamespace(HttpHeaders headers) { + return headers.getAsString(HeadersEnum.NAMESPACE.getKey()); + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java index c56efc283..bec7f42fa 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java @@ -2,12 +2,14 @@ package com.aizuda.snailjob.server.job.task.client; import com.aizuda.snailjob.client.model.StopJobDTO; import com.aizuda.snailjob.client.model.request.DispatchJobRequest; +import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.server.common.rpc.client.RequestMethod; import com.aizuda.snailjob.server.common.rpc.client.annotation.Body; import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP; /** @@ -25,4 +27,7 @@ public interface JobRpcClient { @Mapping(path = JOB_DISPATCH, method = RequestMethod.POST) Result dispatch(@Body DispatchJobRequest dispatchJobRequest); + @Mapping(path = JOB_MAP_TASK, method = RequestMethod.POST) + Result mapTask(@Body MapTaskRequest mapTaskRequest); + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobTaskExtAttrsDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobTaskExtAttrsDTO.java new file mode 100644 index 000000000..3122bdb62 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobTaskExtAttrsDTO.java @@ -0,0 +1,37 @@ +package com.aizuda.snailjob.server.job.task.dto; + +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum; +import lombok.Data; + +/** + * @author: opensnail + * @date : 2024-06-12 + * @since : sj_1.1.0 + */ +@Data +public class JobTaskExtAttrsDTO { + + /** + * 任务名称(目前只有map reduce使用) + */ + private String mapName; + + /** + * see: {@link JobTaskTypeEnum} + */ + private Integer taskType; + + /** + * 当前任务处于map reduce的哪个阶段 + * see: {@link MapReduceStageEnum} + */ + private String mrStage; + + + @Override + public String toString() { + return JsonUtil.toJsonString(this); + } +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java index 7378c5543..cfffeffbc 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java @@ -30,6 +30,11 @@ public class RealJobExecutorDTO extends BaseDTO { */ private String argsType; + /** + * 任务名称 + */ + private String taskName; + /** * 扩展字段 */ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/MapReduceStageEnum.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/MapReduceStageEnum.java new file mode 100644 index 000000000..2665a83e1 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/enums/MapReduceStageEnum.java @@ -0,0 +1,11 @@ +package com.aizuda.snailjob.server.job.task.enums; + +/** + * @author: opensnail + * @date : 2024-06-12 + * @since : sj_1.1.0 + */ +public enum MapReduceStageEnum { + MAP, + REDUCE +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java index f6b311569..badabe3c6 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support; import com.aizuda.snailjob.client.model.request.DispatchJobRequest; import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest; +import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.job.task.dto.*; @@ -61,6 +62,8 @@ public interface JobTaskConverter { ) JobTaskGenerateContext toJobTaskInstanceGenerateContext(Job job); + JobTaskGenerateContext toJobTaskInstanceGenerateContext(MapTaskRequest request); + JobTask toJobTaskInstance(JobTaskGenerateContext context); BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 9dc66287b..efd964d0e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -131,6 +131,7 @@ public class JobExecutorActor extends AbstractActor { JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType()); JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); + instanceGenerateContext.setTaskName("ROOT_TASK"); List taskList = taskInstance.generate(instanceGenerateContext); if (CollUtil.isEmpty(taskList)) { return; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java index 33e014204..2da23f21e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java @@ -17,11 +17,9 @@ import java.util.List; * @date 2023-10-03 22:12:40 * @since 2.4.0 */ -@Slf4j @Component public class ClusterJobExecutor extends AbstractJobExecutor { - @Override public JobTaskTypeEnum getTaskInstanceType() { return JobTaskTypeEnum.CLUSTER; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java new file mode 100644 index 000000000..72f920520 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java @@ -0,0 +1,39 @@ +package com.aizuda.snailjob.server.job.task.support.executor.job; + +import akka.actor.ActorRef; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.server.common.akka.ActorGenerator; +import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.job.task.dto.RealJobExecutorDTO; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author: opensnail + * @date : 2024-06-12 + * @since : sj_1.1.0 + */ +@Component +public class MapReduceJobExecutor extends AbstractJobExecutor { + + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.MAP_REDUCE; + } + + @Override + protected void doExecute(final JobExecutorContext context) { + List taskList = context.getTaskList(); + for (int i = 0; i < taskList.size(); i++) { + JobTask jobTask = taskList.get(i); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + } + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java index 56a71c0a8..934113c24 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java @@ -2,6 +2,8 @@ package com.aizuda.snailjob.server.job.task.support.generator.task; import lombok.Data; +import java.util.List; + /** * @author opensnail * @date 2023-10-02 13:02:57 @@ -27,4 +29,14 @@ public class JobTaskGenerateContext { * 参数类型 text/json */ private Integer argsType; + + /** + * 动态分片的Map任务 + */ + private List mapSubTask; + + /** + * 任务名称 + */ + private String taskName; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java new file mode 100644 index 000000000..c0a45e7d5 --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/MapReduceTaskGenerator.java @@ -0,0 +1,78 @@ +package com.aizuda.snailjob.server.job.task.support.generator.task; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.snailjob.server.common.util.ClientInfoUtils; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * 生成Map任务 + * + * @author: opensnail + * @date : 2024-06-12 + * @since : sj_1.1.0 + */ +@Component +@RequiredArgsConstructor +public class MapReduceTaskGenerator extends AbstractJobTaskGenerator { + + private final JobTaskMapper jobTaskMapper; + @Override + public JobTaskTypeEnum getTaskInstanceType() { + return JobTaskTypeEnum.MAP_REDUCE; + } + + @Override + protected List doGenerate(final JobTaskGenerateContext context) { + + Set serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); + if (CollUtil.isEmpty(serverNodes)) { + SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + return Lists.newArrayList(); + } + + List mapSubTask = context.getMapSubTask(); + if (CollUtil.isEmpty(mapSubTask)) { + return Lists.newArrayList(); + } + + List nodeInfoList = new ArrayList<>(serverNodes); + List jobTasks = new ArrayList<>(mapSubTask.size()); + for (int index = 0; index < mapSubTask.size(); index++) { + RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size()); + // 新增任务实例 + JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); + jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo)); + jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index))); + jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); + jobTasks.add(jobTask); + } + + return jobTasks; + } + + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 0fc516445..929746316 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -1,19 +1,25 @@ package com.aizuda.snailjob.server.job.task.support.handler; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.server.common.dto.DistributeInstance; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; +import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO; import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; @@ -47,6 +53,7 @@ import java.util.stream.Collectors; @RequiredArgsConstructor @Slf4j public class JobTaskBatchHandler { + private final JobTaskMapper jobTaskMapper; private final JobTaskBatchMapper jobTaskBatchMapper; private final WorkflowBatchHandler workflowBatchHandler; @@ -57,9 +64,9 @@ public class JobTaskBatchHandler { public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) { List jobTasks = jobTaskMapper.selectList( - new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getResultMessage) - .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); + new LambdaQueryWrapper() + .select(JobTask::getTaskStatus, JobTask::getResultMessage) + .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); JobTaskBatch jobTaskBatch = new JobTaskBatch(); jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId()); @@ -73,7 +80,7 @@ public class JobTaskBatchHandler { } Map statusCountMap = jobTasks.stream() - .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); + .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting())); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); @@ -85,6 +92,20 @@ public class JobTaskBatchHandler { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); } else { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); + + // 判断是否是mapreduce任务 + JobTask firstJobTask = jobTasks.get(0); + String extAttrs = firstJobTask.getExtAttrs(); + if (StrUtil.isNotBlank(extAttrs)) { + JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(), + JobTaskExtAttrsDTO.class); + Integer taskType = jobTaskExtAttrsDTO.getTaskType(); + if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType) { + if (isAllMapTask(jobTasks)) { + // TODO 开启reduce阶段 + } + } + } } if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) { @@ -100,36 +121,50 @@ public class JobTaskBatchHandler { jobTaskBatch.setUpdateDt(LocalDateTime.now()); return 1 == jobTaskBatchMapper.update(jobTaskBatch, - new LambdaUpdateWrapper() - .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) ); } + private static boolean isAllMapTask(final List jobTasks) { + return jobTasks.size() == jobTasks.stream() + .filter(jobTask -> StrUtil.isNotBlank(jobTask.getExtAttrs())) + .map(jobTask -> JsonUtil.parseObject(jobTask.getExtAttrs(), JobTaskExtAttrsDTO.class)) + .filter(jobTaskExtAttrsDTO -> { + String mrStage = jobTaskExtAttrsDTO.getMrStage(); + if (StrUtil.isNotBlank(mrStage) && MapReduceStageEnum.MAP.name().equals(mrStage)) { + return true; + } + + return false; + }).count(); + } + /** * 开启常驻任务 * - * @param job 定时任务配置信息 + * @param job 定时任务配置信息 * @param taskExecuteDTO 任务执行新 */ public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { if (Objects.isNull(job) - || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - // 是否是常驻任务 - || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) - // 防止任务已经分配到其他节点导致的任务重复执行 - || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) + || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + // 是否是常驻任务 + || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) + // 防止任务已经分配到其他节点导致的任务重复执行 + || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) ) { return; } long count = groupConfigMapper.selectCount(new LambdaQueryWrapper() - .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) - .eq(GroupConfig::getGroupName, job.getGroupName()) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); + .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) + .eq(GroupConfig::getGroupName, job.getGroupName()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); if (count == 0) { return; } @@ -155,7 +190,8 @@ public class JobTaskBatchHandler { Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000); - log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000); + log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, + DateUtils.toNowMilli() % 1000); job.setNextTriggerAt(nextTriggerAt); JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration); ResidentTaskCache.refresh(job.getId(), nextTriggerAt); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java new file mode 100644 index 000000000..83ad91e2c --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -0,0 +1,110 @@ +package com.aizuda.snailjob.server.job.task.support.request; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.client.model.request.MapTaskRequest; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; +import com.aizuda.snailjob.server.common.util.HttpHeaderUtil; +import com.aizuda.snailjob.server.job.task.support.JobExecutor; +import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; +import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory; +import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; +import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; +import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Objects; + +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK; + +/** + * 动态分片客户端生成map任务 + * + * @author: opensnail + * @date : 2024-06-12 + * @since : sj_1.1.0 + */ +@Component +@RequiredArgsConstructor +public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { + + private final JobMapper jobMapper; + + @Override + public boolean supports(final String path) { + return JOB_MAP_TASK.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.POST; + } + + @Override + public String doHandler(final String content, final UrlQuery query, final HttpHeaders headers) { + SnailJobLog.LOCAL.debug("map task Request. content:[{}]", content); + String groupName = HttpHeaderUtil.getGroupName(headers); + String namespace = HttpHeaderUtil.getNamespace(headers); + + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + Object[] args = retryRequest.getArgs(); + MapTaskRequest mapTaskRequest = JsonUtil.parseObject(JsonUtil.toJsonString(args[0]), MapTaskRequest.class); + + // 创建map任务 + JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); + JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest); + context.setGroupName(HttpHeaderUtil.getGroupName(headers)); + context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); + List taskList = taskInstance.generate(context); + if (CollUtil.isEmpty(taskList)) { + return JsonUtil.toJsonString( + new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.TRUE, retryRequest.getReqId())); + } + + Job job = jobMapper.selectOne(new LambdaQueryWrapper() + .eq(Job::getId, mapTaskRequest.getJobId()) + .eq(Job::getGroupName, groupName) + .eq(Job::getNamespaceId, namespace) + ); + + if (Objects.isNull(job)) { + return JsonUtil.toJsonString( + new NettyResult(StatusEnum.NO.getStatus(), "Job config not existed", Boolean.TRUE, + retryRequest.getReqId())); + } + + // 执行任务 + JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType()); + jobExecutor.execute(buildJobExecutorContext(mapTaskRequest, job, taskList)); + + return JsonUtil.toJsonString( + new NettyResult(StatusEnum.YES.getStatus(), "Report Map Task Processed Successfully", Boolean.TRUE, + retryRequest.getReqId())); + } + + private static JobExecutorContext buildJobExecutorContext(MapTaskRequest mapTaskRequest, Job job, + List taskList) { + JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); + context.setTaskList(taskList); + context.setTaskBatchId(mapTaskRequest.getTaskBatchId()); + context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId()); + context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId()); + return context; + } + +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java index 53051b131..83a579fd8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java @@ -25,7 +25,6 @@ import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH * @date 2023-09-30 23:01:58 * @since 2.4.0 */ -@Slf4j @Component public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestHandler {