diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
index 8418f276..ee076dcc 100644
--- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
+++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
@@ -46,6 +46,8 @@ public class DispatchJobRequest {
      */
     private String mapName;
 
+    private String mrStage;
+
     private String argsStr;
 
     private Integer shardingTotal;
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java
index e56f6243..9eb50a1b 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java
@@ -11,6 +11,7 @@ import lombok.Data;
 @Data
 public class CompleteJobBatchDTO {
 
+    private Long jobId;
     private Long workflowNodeId;
     private Long workflowTaskBatchId;
     private Long taskBatchId;
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
index a09fb2a2..9f7e6639 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
@@ -31,10 +31,15 @@ public class RealJobExecutorDTO extends BaseDTO {
     private String argsType;
 
     /**
-     * 任务名称
+     * MAP名称
      */
     private String mapName;
 
+    /**
+     * MAP名称
+     */
+    private String mrState;
+
     /**
      * 扩展字段
      */
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
index bfbab20e..72b65165 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
@@ -182,6 +182,7 @@ public class JobExecutorActor extends AbstractActor {
         context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
         context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
         context.setMapName(SystemConstants.MAP_ROOT);
+        context.setMrStage(MapReduceStageEnum.MAP.name());
         return context;
     }
 
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java
index 21945663..54e70aa0 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java
@@ -18,6 +18,7 @@ import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
 import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
 import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -35,20 +36,17 @@ import java.util.Objects;
  */
 @Component(ActorGenerator.JOB_EXECUTOR_RESULT_ACTOR)
 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-@Slf4j
+@RequiredArgsConstructor
 public class JobExecutorResultActor extends AbstractActor {
     private static final String KEY = "job_complete_{0}_{1}";
-    @Autowired
-    private JobTaskMapper jobTaskMapper;
-    @Autowired
-    private JobTaskBatchHandler jobTaskBatchHandler;
-    @Autowired
-    private DistributedLockHandler distributedLockHandler;
+    private final JobTaskMapper jobTaskMapper;
+    private final JobTaskBatchHandler jobTaskBatchHandler;
+    private final DistributedLockHandler distributedLockHandler;
 
     @Override
     public Receive createReceive() {
         return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
-            log.debug("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
+            SnailJobLog.LOCAL.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
             try {
                 JobTask jobTask = new JobTask();
                 jobTask.setTaskStatus(result.getTaskStatus());
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java
index 7990cfc0..2e87c4c6 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java
@@ -57,9 +57,11 @@ public class ReduceActor extends AbstractActor {
         // 创建reduce任务
         JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
         JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
+        context.setTaskBatchId(reduceTask.getTaskBatchId());
         context.setMrStage(MapReduceStageEnum.REDUCE);
         List<JobTask> taskList = taskInstance.generate(context);
         if (CollUtil.isEmpty(taskList)) {
+            SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());
             return;
         }
 
@@ -76,6 +78,7 @@ public class ReduceActor extends AbstractActor {
         context.setTaskBatchId(reduceTask.getTaskBatchId());
         context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
         context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
+        context.setMrStage(MapReduceStageEnum.REDUCE.name());
         return context;
     }
 }
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
index 6a7db868..5cd27e97 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
@@ -92,4 +92,6 @@ public class JobExecutorContext {
 
     private String mapName;
 
+    private String mrStage;
+
 }
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java
index de234c8c..45b02b9a 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java
@@ -70,9 +70,10 @@ public class JobTaskBatchHandler {
 
         List<JobTask> jobTasks = jobTaskMapper.selectList(
             new LambdaQueryWrapper<JobTask>()
-                .select(JobTask::getTaskStatus, JobTask::getResultMessage)
+                .select(JobTask::getTaskStatus, JobTask::getResultMessage, JobTask::getExtAttrs)
                 .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
 
+        SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
         JobTaskBatch jobTaskBatch = new JobTaskBatch();
         jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
 
@@ -106,15 +107,14 @@ public class JobTaskBatchHandler {
                 JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(),
                     JobTaskExtAttrsDTO.class);
                 Integer taskType = jobTaskExtAttrsDTO.getTaskType();
-                if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType) {
-                    if (isAllMapTask(jobTasks)) {
-                        // 开启reduce阶段
-                        try {
-                            ActorRef actorRef = ActorGenerator.jobReduceActor();
-                            actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
-                        } catch (Exception e) {
-                            SnailJobLog.LOCAL.error("tell reduce actor error", e);
-                        }
+                if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
+                    // 开启reduce阶段
+                    try {
+                        ActorRef actorRef = ActorGenerator.jobReduceActor();
+                        actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
+                        return false;
+                    } catch (Exception e) {
+                        SnailJobLog.LOCAL.error("tell reduce actor error", e);
                     }
                 }
             }
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java
index f3fcb9df..295c892a 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java
@@ -108,6 +108,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
         context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
         context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
         context.setMapName(mapTaskRequest.getMapName());
+        context.setMrStage(MapReduceStageEnum.MAP.name());
         return context;
     }