feat(sj_1.1.0): 客户端支持reduce调度

This commit is contained in:
opensnail 2024-06-15 00:09:15 +08:00
parent 14497be6b9
commit 262b6fa029
15 changed files with 80 additions and 27 deletions

View File

@ -116,6 +116,7 @@ public class JobEndPoint {
jobContext.setRetry(dispatchJob.isRetry());
jobContext.setRetryScene(dispatchJob.getRetryScene());
jobContext.setMapName(dispatchJob.getMapName());
jobContext.setMrStage(dispatchJob.getMrStage());
return jobContext;
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.client.job.core.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* Task执行结果
@ -8,12 +9,10 @@ import lombok.Data;
* @author: opensnail
* @date : 2024-06-12 13:59
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class MapArgs {
public class MapArgs extends JobArgs {
private String mapName;
private Boolean success;
private String result;
}

View File

@ -0,0 +1,24 @@
package com.aizuda.snailjob.client.job.core.dto;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.List;
/**
* Task执行结果
*
* @author: opensnail
* @date : 2024-06-12 13:59
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class ReduceArgs extends JobArgs {
private List<?> mapResult;
public List<?> getMapResult() {
return JsonUtil.parseList(getArgsStr(), List.class);
}
}

View File

@ -4,16 +4,16 @@ import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.cache.FutureCache;
import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ShardingJobArgs;
import com.aizuda.snailjob.client.job.core.dto.*;
import com.aizuda.snailjob.client.job.core.log.JobLogMeta;
import com.aizuda.snailjob.client.job.core.timer.StopTaskTimerTask;
import com.aizuda.snailjob.client.job.core.timer.TimerManager;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -48,8 +48,14 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
JobArgs jobArgs;
if (jobContext.getTaskType() == JobTaskTypeEnum.SHARDING.getType()) {
jobArgs = buildShardingJobArgs(jobContext);
} else if (jobContext.getTaskType() == JobTaskTypeEnum.MAP_REDUCE.getType()) {
jobArgs = buildMapReduceJobArgs(jobContext);
} else if (Lists.newArrayList(JobTaskTypeEnum.MAP_REDUCE.getType(), JobTaskTypeEnum.MAP.getType())
.contains(jobContext.getTaskType())) {
if (MapReduceStageEnum.MAP.name().equals(jobContext.getMrStage())) {
jobArgs = buildMapJobArgs(jobContext);
} else {
jobArgs = buildReduceJobArgs(jobContext);
}
} else {
jobArgs = buildJobArgs(jobContext);
}
@ -96,8 +102,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
return jobArgs;
}
private static JobArgs buildMapReduceJobArgs(JobContext jobContext) {
MapReduceArgs jobArgs = new MapReduceArgs();
private static JobArgs buildMapJobArgs(JobContext jobContext) {
MapArgs jobArgs = new MapArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setMapName(jobContext.getMapName());
@ -105,5 +111,13 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
return jobArgs;
}
private static JobArgs buildReduceJobArgs(JobContext jobContext) {
ReduceArgs jobArgs = new ReduceArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
return jobArgs;
}
protected abstract ExecuteResult doJobExecute(JobArgs jobArgs);
}

View File

@ -5,7 +5,7 @@ import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
@ -33,11 +33,11 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
@Override
protected ExecuteResult doJobExecute(final JobArgs jobArgs) {
MapReduceArgs mapReduceArgs = (MapReduceArgs) jobArgs;
return this.doJobExecute(mapReduceArgs);
MapArgs mapArgs = (MapArgs) jobArgs;
return this.doJobMapExecute(mapArgs);
}
public abstract ExecuteResult doJobExecute(MapReduceArgs mapReduceArgs);
public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs);
public void doMapExecute(List<Object> taskList, String nextMapName) {

View File

@ -1,7 +1,11 @@
package com.aizuda.snailjob.client.job.core.executor;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
import java.util.List;
@ -13,5 +17,17 @@ import java.util.List;
*/
public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
protected abstract ExecuteResult doReduceExecute(JobContext jobContext, List<MapArgs> mapArgsList);
@Override
public ExecuteResult doJobExecute(final JobArgs jobArgs) {
JobContext jobContext = JobContextManager.getJobContext();
if (jobContext.getMrStage().equals("MAP")) {
return super.doJobExecute(jobArgs);
} else if(jobContext.getMrStage().equals("REDUCE")) {
ReduceArgs reduceArgs = (ReduceArgs) jobArgs;
return doReduceExecute(reduceArgs);
}
throw new SnailJobMapReduceException("非法的MapReduceStage");
}
protected abstract ExecuteResult doReduceExecute(ReduceArgs reduceArgs);
}

View File

@ -58,4 +58,6 @@ public class JobContext {
* Map名称
*/
private String mapName;
private String mrStage;
}

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.job.task.dto;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import lombok.Data;
/**

View File

@ -18,7 +18,7 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;

View File

@ -6,7 +6,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
@ -16,7 +16,6 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import lombok.RequiredArgsConstructor;

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.generator.task;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import lombok.Data;
import java.util.List;

View File

@ -13,7 +13,7 @@ import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;

View File

@ -9,7 +9,6 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
@ -20,10 +19,9 @@ import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;

View File

@ -12,7 +12,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;