fix:(1.2.0-beta1): 重构结果处理逻辑

This commit is contained in:
opensnail 2024-09-04 19:27:16 +08:00
parent 6ca192e69e
commit acaa2e9668
14 changed files with 524 additions and 165 deletions

View File

@ -18,5 +18,6 @@ public class CompleteJobBatchDTO {
private Integer jobOperationReason;
private Object result;
private Integer taskType;
private boolean isRetry;
}

View File

@ -44,4 +44,6 @@ public class JobExecutorResultDTO {
private String wfContext;
private boolean isRetry;
}

View File

@ -0,0 +1,17 @@
package com.aizuda.snailjob.server.job.task.support;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
public interface JobExecutorResultHandler {
JobTaskTypeEnum getTaskInstanceType();
void handleResult(JobExecutorResultContext context);
}

View File

@ -13,6 +13,7 @@ import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorConte
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.snailjob.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.server.model.dto.LogTaskDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
@ -70,6 +71,8 @@ public interface JobTaskConverter {
TaskStopJobContext toStopJobContext(BlockStrategyContext context);
TaskStopJobContext toStopJobContext(JobExecutorResultContext context);
TaskStopJobContext toStopJobContext(JobExecutorResultDTO context);
@Mappings(
@ -140,4 +143,8 @@ public interface JobTaskConverter {
JobLogMessage toJobLogMessage(JobLogMessage jobLogMessage);
ReduceTaskDTO toReduceTaskDTO(CompleteJobBatchDTO jobBatchDTO);
ReduceTaskDTO toReduceTaskDTO(JobExecutorResultContext context);
JobExecutorResultContext toJobExecutorResultContext(CompleteJobBatchDTO completeJobBatchDTO);
}

View File

@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -11,12 +10,8 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@ -62,7 +57,6 @@ public class JobExecutorResultActor extends AbstractActor {
} else {
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
}
}
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
@ -95,19 +89,6 @@ public class JobExecutorResultActor extends AbstractActor {
private boolean tryCompleteAndStop(JobExecutorResultDTO result) {
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO);
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
return complete;
return jobTaskBatchHandler.handleResult(completeJobBatchDTO);
}
}

View File

@ -1,45 +1,34 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.*;
import com.aizuda.snailjob.server.job.task.support.JobExecutorResultHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.job.task.support.result.job.JobExecutorResultContext;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*;
@ -53,148 +42,37 @@ import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*;
@Slf4j
public class JobTaskBatchHandler {
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final GroupConfigMapper groupConfigMapper;
private final List<JobExecutorResultHandler> resultHandlerList;
@Transactional
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
public boolean handleResult(CompleteJobBatchDTO completeJobBatchDTO) {
Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null"));
// 幂等处理
Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
// 非重试流量幂等处理
if(!completeJobBatchDTO.isRetry()) {
// 幂等处理
Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, COMPLETED)
);
if (countJobTaskBatch > 0) {
// 批次已经完成了不需要重复更新
return true;
}
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getMrStage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
if (CollUtil.isEmpty(jobTasks) ||
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
Map<Integer, Long> statusCountMap = jobTasks.stream()
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
if (failCount > 0) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(completeJobBatchDTO.getTaskBatchId()));
} else if (stopCount > 0) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
if (needReduceTask(completeJobBatchDTO, jobTasks)
&& JobTaskTypeEnum.MAP_REDUCE.getType() == completeJobBatchDTO.getTaskType()) {
// 此时中断批次完成需要开启reduce任务
return false;
);
if (countJobTaskBatch > 0) {
// 批次已经完成了不需要重复更新
return true;
}
}
if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) {
jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason());
}
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());
workflowBatchHandler.openNextNode(taskExecuteDTO);
jobTaskBatch.setUpdateDt(LocalDateTime.now());
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}
/**
* 若需要执行reduce则返回false 不需要更新批次状态 否则需要更新批次状态
*
* @param completeJobBatchDTO 需要执行批次完成所需的参数信息
* @param jobTasks 任务项列表
* @return true-需要reduce false-不需要reduce
*/
private boolean needReduceTask(final CompleteJobBatchDTO completeJobBatchDTO, final List<JobTask> jobTasks) {
Integer mrStage = null;
int reduceCount = 0;
int mapCount = 0;
for (final JobTask jobTask : jobTasks) {
if (Objects.isNull(jobTask.getMrStage())) {
continue;
}
// 存在MERGE_REDUCE任务了不需要生成
if (MERGE_REDUCE.getStage() == jobTask.getMrStage()) {
return false;
}
// REDUCE任务累加
if (REDUCE.getStage() == jobTask.getMrStage()) {
reduceCount++;
continue;
}
// MAP任务累加
if (MAP.getStage() == jobTask.getMrStage()) {
mapCount++;
JobExecutorResultContext context = JobTaskConverter.INSTANCE.toJobExecutorResultContext(completeJobBatchDTO);
for (final JobExecutorResultHandler jobExecutorResultHandler : resultHandlerList) {
if (completeJobBatchDTO.getTaskType().equals(jobExecutorResultHandler.getTaskInstanceType().getType())) {
jobExecutorResultHandler.handleResult(context);
break;
}
}
// 若存在2个以上的reduce任务则开启merge reduce任务
if (reduceCount > 1) {
mrStage = MERGE_REDUCE.getStage();
} else if (mapCount == jobTasks.size()) {
// 若都是MAP任务则开启Reduce任务
mrStage = REDUCE.getStage();
} else {
// 若既不是MAP也是不REDUCE则是其他类型的任务直接返回即可
return false;
}
// 开启reduce or mergeReduce阶段
try {
ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO);
reduceTaskDTO.setMrStage(mrStage);
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(reduceTaskDTO, actorRef);
return true;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
return false;
}
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
return jobTasks.size() == jobTasks.stream()
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
.count();
}
private static boolean isALeastOneReduceTask(final List<JobTask> jobTasks) {
return jobTasks.stream()
.filter(
jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
.count() > 1;
// 处理的结果 若已经更新成功 或者 需要开启reduce任务都算是已经处理了
return context.isTaskBatchComplete() || context.isCreateReduceTask();
}
/**

View File

@ -49,7 +49,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrepareHandler {
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.completeJobBatchDTO(prepare);
completeJobBatchDTO.setJobOperationReason(jobOperationReasonEnum.getReason());
if (jobTaskBatchHandler.complete(completeJobBatchDTO)) {
if (jobTaskBatchHandler.handleResult(completeJobBatchDTO)) {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间

View File

@ -0,0 +1,125 @@
package com.aizuda.snailjob.server.job.task.support.result.job;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutorResultHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.RequiredArgsConstructor;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
@RequiredArgsConstructor
public abstract class AbstractJobExecutorResultHandler implements JobExecutorResultHandler {
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final GroupConfigMapper groupConfigMapper;
@Override
public void handleResult(final JobExecutorResultContext context) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getMrStage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
if (CollUtil.isEmpty(jobTasks) ||
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return;
}
Map<Integer, Long> statusCountMap = jobTasks.stream()
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
int taskBatchStatus;
if (failCount > 0) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
SnailSpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(context.getTaskBatchId()));
doHandleFail(context);
} else if (stopCount > 0) {
taskBatchStatus = JobTaskBatchStatusEnum.STOP.getStatus();
doHandleStop(context);
} else {
taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
doHandleSuccess(context);
}
// 开启下一个工作流节点
openNextWorkflowNode(context);
boolean res = updateStatus(context, taskBatchStatus);
context.setTaskBatchComplete(res);
if (res) {
// 停止客户端的任务
stop(context);
}
}
protected void openNextWorkflowNode(final JobExecutorResultContext context) {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
workflowBatchHandler.openNextNode(taskExecuteDTO);
}
protected boolean updateStatus(final JobExecutorResultContext context, final Integer taskBatchStatus) {
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(context.getTaskBatchId());
jobTaskBatch.setTaskBatchStatus(taskBatchStatus);
jobTaskBatch.setUpdateDt(LocalDateTime.now());
if (Objects.nonNull(context.getJobOperationReason())) {
jobTaskBatch.setOperationReason(context.getJobOperationReason());
}
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, context.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}
protected void stop(JobExecutorResultContext context) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(getTaskInstanceType().getType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
protected abstract void doHandleSuccess(final JobExecutorResultContext context);
protected abstract void doHandleStop(final JobExecutorResultContext context);
protected abstract void doHandleFail(final JobExecutorResultContext context);
}

View File

@ -0,0 +1,45 @@
package com.aizuda.snailjob.server.job.task.support.result.job;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
@Component
public class BroadcastJobExecutorHandler extends AbstractJobExecutorResultHandler {
public BroadcastJobExecutorHandler(
final JobTaskMapper jobTaskMapper,
final JobTaskBatchMapper jobTaskBatchMapper,
final WorkflowBatchHandler workflowBatchHandler,
final GroupConfigMapper groupConfigMapper) {
super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper);
}
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.BROADCAST;
}
@Override
protected void doHandleSuccess(final JobExecutorResultContext context) {
}
@Override
protected void doHandleStop(final JobExecutorResultContext context) {
}
@Override
protected void doHandleFail(final JobExecutorResultContext context) {
}
}

View File

@ -0,0 +1,52 @@
package com.aizuda.snailjob.server.job.task.support.result.job;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import org.springframework.stereotype.Component;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MERGE_REDUCE;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.REDUCE;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
@Component
public class ClusterJobExecutorHandler extends AbstractJobExecutorResultHandler {
public ClusterJobExecutorHandler(
final JobTaskMapper jobTaskMapper,
final JobTaskBatchMapper jobTaskBatchMapper,
final WorkflowBatchHandler workflowBatchHandler,
final GroupConfigMapper groupConfigMapper) {
super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper);
}
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.CLUSTER;
}
@Override
protected void doHandleSuccess(final JobExecutorResultContext context) {
}
@Override
protected void doHandleStop(final JobExecutorResultContext context) {
}
@Override
protected void doHandleFail(final JobExecutorResultContext context) {
}
@Override
protected void stop(final JobExecutorResultContext context) {
}
}

View File

@ -0,0 +1,35 @@
package com.aizuda.snailjob.server.job.task.support.result.job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import lombok.Data;
import java.util.List;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
@Data
public class JobExecutorResultContext {
private Long jobId;
private Long workflowNodeId;
private Long workflowTaskBatchId;
private Long taskBatchId;
private Integer jobOperationReason;
private Integer taskType;
private boolean isRetry;
private List<JobTask> jobTaskList;
/**
* 是否开启创建Reduce任务
*/
private boolean createReduceTask;
/**
* 是否更新批次完成
*/
private boolean taskBatchComplete;
}

View File

@ -0,0 +1,45 @@
package com.aizuda.snailjob.server.job.task.support.result.job;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
@Component
public class MapJobExecutorHandler extends AbstractJobExecutorResultHandler {
public MapJobExecutorHandler(
final JobTaskMapper jobTaskMapper,
final JobTaskBatchMapper jobTaskBatchMapper,
final WorkflowBatchHandler workflowBatchHandler,
final GroupConfigMapper groupConfigMapper) {
super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper);
}
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP;
}
@Override
protected void doHandleSuccess(final JobExecutorResultContext context) {
}
@Override
protected void doHandleStop(final JobExecutorResultContext context) {
}
@Override
protected void doHandleFail(final JobExecutorResultContext context) {
}
}

View File

@ -0,0 +1,126 @@
package com.aizuda.snailjob.server.job.task.support.result.job;
import akka.actor.ActorRef;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import org.springframework.stereotype.Component;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MAP;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.MERGE_REDUCE;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.REDUCE;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
@Component
public class MapReduceJobExecutorHandler extends AbstractJobExecutorResultHandler {
public MapReduceJobExecutorHandler(
final JobTaskMapper jobTaskMapper,
final JobTaskBatchMapper jobTaskBatchMapper,
final WorkflowBatchHandler workflowBatchHandler,
final GroupConfigMapper groupConfigMapper) {
super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper);
}
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP_REDUCE;
}
@Override
protected void doHandleSuccess(final JobExecutorResultContext context) {
// 判断是否需要创建Reduce任务
context.setCreateReduceTask(needReduceTask(context));
}
@Override
protected void doHandleStop(final JobExecutorResultContext context) {
}
@Override
protected void doHandleFail(final JobExecutorResultContext context) {
}
@Override
protected boolean updateStatus(final JobExecutorResultContext context, final Integer status) {
if (context.isCreateReduceTask()) {
// 此时中断批次完成需要开启reduce任务
return false;
}
return super.updateStatus(context, status);
}
/**
* 若需要执行reduce则返回false 不需要更新批次状态 否则需要更新批次状态
*
* @param context 需要执行批次完成所需的参数信息
* @return true-需要reduce false-不需要reduce
*/
private boolean needReduceTask(final JobExecutorResultContext context) {
int mrStage;
int reduceCount = 0;
int mapCount = 0;
for (final JobTask jobTask : context.getJobTaskList()) {
if (Objects.isNull(jobTask.getMrStage())) {
continue;
}
// 存在MERGE_REDUCE任务了不需要生成
if (MERGE_REDUCE.getStage() == jobTask.getMrStage()) {
return false;
}
// REDUCE任务累加
if (REDUCE.getStage() == jobTask.getMrStage()) {
reduceCount++;
continue;
}
// MAP任务累加
if (MAP.getStage() == jobTask.getMrStage()) {
mapCount++;
}
}
// 若存在2个以上的reduce任务则开启merge reduce任务
if (reduceCount > 1) {
mrStage = MERGE_REDUCE.getStage();
} else if (mapCount == context.getJobTaskList().size()) {
// 若都是MAP任务则开启Reduce任务
mrStage = REDUCE.getStage();
} else {
// 若既不是MAP也是不REDUCE则是其他类型的任务直接返回即可
return false;
}
// 开启reduce or mergeReduce阶段
try {
ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(context);
reduceTaskDTO.setMrStage(mrStage);
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(reduceTaskDTO, actorRef);
return true;
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
return false;
}
}

View File

@ -0,0 +1,45 @@
package com.aizuda.snailjob.server.job.task.support.result.job;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-09-04
* @since :1.2.0
*/
@Component
public class ShardingJobExecutorHandler extends AbstractJobExecutorResultHandler {
public ShardingJobExecutorHandler(
final JobTaskMapper jobTaskMapper,
final JobTaskBatchMapper jobTaskBatchMapper,
final WorkflowBatchHandler workflowBatchHandler,
final GroupConfigMapper groupConfigMapper) {
super(jobTaskMapper, jobTaskBatchMapper, workflowBatchHandler, groupConfigMapper);
}
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.SHARDING;
}
@Override
protected void doHandleSuccess(final JobExecutorResultContext context) {
}
@Override
protected void doHandleStop(final JobExecutorResultContext context) {
}
@Override
protected void doHandleFail(final JobExecutorResultContext context) {
}
}