diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java
index 3025ab44..84c89245 100644
--- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java
@@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.job.core.client;
+import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.annotation.SnailEndPoint;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
@@ -19,12 +20,15 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.Result;
+import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
+import com.google.common.collect.Maps;
import jakarta.validation.Valid;
import org.springframework.validation.annotation.Validated;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
@@ -116,6 +120,18 @@ public class JobEndPoint {
jobContext.setRetryScene(dispatchJob.getRetryScene());
jobContext.setTaskName(dispatchJob.getTaskName());
jobContext.setMrStage(dispatchJob.getMrStage());
+
+ String wfContext = dispatchJob.getWfContext();
+ if (StrUtil.isNotBlank(wfContext)) {
+ try {
+ jobContext.setWfContext(JsonUtil.parseConcurrentHashMap(wfContext));
+ } catch (Exception e) {
+ SnailJobLog.REMOTE.warn("workflow context parse error", e);
+ }
+ } else {
+ jobContext.setWfContext(Maps.newConcurrentMap());
+ }
+
return jobContext;
}
diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java
index d183bc64..e7523b19 100644
--- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java
@@ -1,7 +1,11 @@
package com.aizuda.snailjob.client.job.core.dto;
+import cn.hutool.core.util.StrUtil;
import lombok.Data;
+import java.util.Map;
+import java.util.Objects;
+
/**
* @author: opensnail
* @date : 2023-10-18 16:53
@@ -15,4 +19,15 @@ public class JobArgs {
private String executorInfo;
private Long taskBatchId;
+
+ private Map<String, Object> wfContext;
+
+ public void appendContext(String key, Object value) {
+ if (Objects.isNull(wfContext) || StrUtil.isBlank(key) || Objects.isNull(value)) {
+ return;
+ }
+
+ wfContext.put(key, value);
+ }
+
}
diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java
index 22bbd034..d1e28161 100644
--- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java
@@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.job.core.executor;
+import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.cache.FutureCache;
@@ -12,6 +13,7 @@ import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
+import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
@@ -60,6 +62,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
jobArgs = buildJobArgs(jobContext);
}
+ jobArgs.setWfContext(jobContext.getWfContext());
+
try {
// 初始化调度信息(日志上报LogUtil)
initLogContext(jobContext);
@@ -117,6 +121,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
+ jobArgs.setWfContext(jobContext.getWfContext());
return jobArgs;
}
diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java
index 92385214..1d0641df 100644
--- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java
+++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java
@@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.job.core.executor;
+import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
@@ -32,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
@@ -160,6 +162,12 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
dispatchJobRequest.setTaskStatus(status);
dispatchJobRequest.setRetry(jobContext.isRetry());
dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
+ // 传递上下文
+ Map<String, Object> wfContext = jobContext.getWfContext();
+ if (CollUtil.isNotEmpty(wfContext)) {
+ dispatchJobRequest.setWfContext(JsonUtil.toJsonString(wfContext));
+ }
+
return dispatchJobRequest;
}
diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
index 48cccb93..d3b5cb56 100644
--- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
+++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java
@@ -73,5 +73,5 @@ public class DispatchJobRequest {
/**
* 工作流上下文
*/
- private String wkContext;
+ private String wfContext;
}
diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java
index f3d723de..701d4fa2 100644
--- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java
+++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java
@@ -44,5 +44,5 @@ public class DispatchJobResultRequest {
/**
* 工作流上下文
*/
- private String wkContext;
+ private String wfContext;
}
diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java
index 5b976dc1..e9ee837d 100644
--- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java
+++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java
@@ -3,6 +3,7 @@ package com.aizuda.snailjob.common.core.model;
import lombok.Data;
import java.util.List;
+import java.util.Map;
/**
* @author: opensnail
@@ -59,5 +60,15 @@ public class JobContext {
*/
private String taskName;
+ /**
+ * 动态分片所处的阶段
+ */
private Integer mrStage;
+
+ /**
+ * 工作流全局上下文
+ */
+ private Map<String, Object> wfContext;
+
+
}
diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java
index 80911376..d91c7b99 100644
--- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java
+++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/util/JsonUtil.java
@@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* @author: byteblogs
@@ -69,6 +70,16 @@ public class JsonUtil {
return JsonMapper.toJavaObject(jsonString, HashMap.class);
}
+ /**
+ * 将JSON字符串转ConcurrentHashMap 对象
+ *
+ * @param jsonString
+ * @return
+ */
+ public static <K, V> Map<K, V> parseConcurrentHashMap(String jsonString) {
+ return JsonMapper.toJavaObject(jsonString, ConcurrentHashMap.class);
+ }
+
/**
* 将JSON字符串转Map 对象
*
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
index fa1a47f1..620e144c 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java
@@ -102,5 +102,5 @@ public class RealJobExecutorDTO extends BaseDTO {
/**
* 工作流上下文
*/
- private String wkContext;
+ private String wfContext;
}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java
index 0b9069b2..f9805265 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/JobTaskConverter.java
@@ -106,7 +106,8 @@ public interface JobTaskConverter {
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
@Mapping(source = "jobTask.namespaceId", target = "namespaceId"),
@Mapping(source = "jobTask.taskName", target = "taskName"),
- @Mapping(source = "jobTask.mrStage", target = "mrStage")
+ @Mapping(source = "jobTask.mrStage", target = "mrStage"),
+ @Mapping(source = "context.wfContext", target = "wfContext")
})
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java
index ed3f4f7e..178df6ee 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java
@@ -50,5 +50,5 @@ public class ClientCallbackContext {
/**
* 工作流上下文
*/
- private String wkContext;
+ private String wfContext;
}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
index 47f795fa..e7b5118f 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java
@@ -198,7 +198,7 @@ public class JobExecutorActor extends AbstractActor {
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
if (Objects.nonNull(workflowTaskBatch)) {
- context.setWkContext(workflowTaskBatch.getWfContext());
+ context.setWfContext(workflowTaskBatch.getWfContext());
}
return context;
}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java
index 483fe726..3710f644 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java
@@ -41,7 +41,6 @@ public class JobExecutorResultActor extends AbstractActor {
private static final String KEY = "job_complete_{0}_{1}";
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchHandler jobTaskBatchHandler;
- private final WorkflowBatchHandler workflowBatchHandler;
private final DistributedLockHandler distributedLockHandler;
@Override
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java
index 157a88fd..fdb2dbb7 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java
@@ -118,7 +118,7 @@ public class WorkflowExecutorActor extends AbstractActor {
// 添加父节点,为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
- JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
+ JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId,
Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId())))
@@ -167,6 +167,10 @@ public class WorkflowExecutorActor extends AbstractActor {
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
+ // TODO 合并job task的结果到全局上下文中
+ workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
+ StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
+
// 只会条件节点会使用
Object evaluationResult = null;
for (WorkflowNode workflowNode : workflowNodes) {
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
index 8a1e6734..14e8c982 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/JobExecutorContext.java
@@ -93,6 +93,6 @@ public class JobExecutorContext {
/**
* 工作流上下文
*/
- private String wkContext;
+ private String wfContext;
}
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
index 06a23f81..2bb1d608 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
@@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
+import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor;
@@ -56,6 +57,9 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Override
public void execute(WorkflowExecutorContext context) {
+
+ // 若多个兄弟节点的情况下,同时处理完成则每个节点都有可能来执行后继节点,
+ // 因此这里这里添加分布式锁
distributedLockHandler.lockWithDisposableAndRetry(
() -> {
long total = 0;
@@ -69,6 +73,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
if (CollUtil.isNotEmpty(jobTaskBatches)) {
total = jobTaskBatches.size();
+ // ToDo
JobTaskBatch jobTaskBatch = jobTaskBatches.get(0);
if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
context.setEvaluationResult(Boolean.FALSE);
@@ -89,9 +94,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return;
}
- // 合并job task的结果到全局上下文中
- workflowBatchHandler.mergeAllWorkflowContext(context.getWorkflowTaskBatchId(), context.getTaskBatchId());
-
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java
index 8c941e54..f64613a2 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*;
/**
@@ -59,13 +60,23 @@ public class JobTaskBatchHandler {
@Transactional
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
+ // 幂等处理
+ Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
+ .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
+ .in(JobTaskBatch::getTaskBatchStatus, COMPLETED)
+ );
+ if (countJobTaskBatch > 0) {
+ // 批次已经完成了,不需要重复更新
+ return true;
+ }
+
List<JobTask> jobTasks = jobTaskMapper.selectList(
- new LambdaQueryWrapper<JobTask>()
- .select(JobTask::getTaskStatus, JobTask::getMrStage)
- .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
+ new LambdaQueryWrapper<JobTask>()
+ .select(JobTask::getTaskStatus, JobTask::getMrStage)
+ .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
if (CollUtil.isEmpty(jobTasks) ||
- jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
+ jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
@@ -73,7 +84,7 @@ public class JobTaskBatchHandler {
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
Map<Integer, Long> statusCountMap = jobTasks.stream()
- .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
+ .collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
@@ -85,7 +96,8 @@ public class JobTaskBatchHandler {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else {
// todo 调试完成删除
- SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
+ SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(),
+ JsonUtil.toJsonString(jobTasks));
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
@@ -107,9 +119,9 @@ public class JobTaskBatchHandler {
jobTaskBatch.setUpdateDt(LocalDateTime.now());
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
- new LambdaUpdateWrapper<JobTaskBatch>()
- .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
- .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
+ new LambdaUpdateWrapper<JobTaskBatch>()
+ .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
+ .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}
@@ -149,14 +161,15 @@ public class JobTaskBatchHandler {
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
return jobTasks.size() == jobTasks.stream()
- .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
- .count();
+ .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
+ .count();
}
private static boolean isALeastOneReduceTask(final List<JobTask> jobTasks) {
return jobTasks.stream()
- .filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
- .count() > 1;
+ .filter(
+ jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
+ .count() > 1;
}
/**
@@ -167,21 +180,21 @@ public class JobTaskBatchHandler {
*/
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
- || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
- || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
- || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
- // 是否是常驻任务
- || Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
- // 防止任务已经分配到其他节点导致的任务重复执行
- || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
+ || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
+ || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
+ || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
+ // 是否是常驻任务
+ || Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
+ // 防止任务已经分配到其他节点导致的任务重复执行
+ || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
) {
return;
}
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
- .eq(GroupConfig::getNamespaceId, job.getNamespaceId())
- .eq(GroupConfig::getGroupName, job.getGroupName())
- .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
+ .eq(GroupConfig::getNamespaceId, job.getNamespaceId())
+ .eq(GroupConfig::getGroupName, job.getGroupName())
+ .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
if (count == 0) {
return;
}
@@ -208,7 +221,7 @@ public class JobTaskBatchHandler {
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds,
- DateUtils.toNowMilli() % 1000);
+ DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java
index 85d781b8..3d2de476 100644
--- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java
+++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java
@@ -61,7 +61,7 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_C
@Component
@RequiredArgsConstructor
public class WorkflowBatchHandler {
- private static final String KEY = "update_wf_context_{}";
+ private static final String KEY = "update_wf_context_{0}";
private final DistributedLockHandler distributedLockHandler;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
@@ -320,8 +320,14 @@ public class WorkflowBatchHandler {
}
}
- public void mergeWorkflowContextAndRetry(Long workflowTaskBatchId, String waitMergeContext) {
- if (StrUtil.isBlank(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) {
+ /**
+ * 合并工作流上下文若合并失败先自旋3次1.5s, 若失败了升级到悲观锁
+ *
+ * @param workflowTaskBatch 工作流批次
+ * @param taskBatchIds 批次列表
+ */
+ public void mergeWorkflowContextAndRetry(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
+ if (CollUtil.isEmpty(taskBatchIds)) {
return;
}
@@ -349,42 +355,62 @@ public class WorkflowBatchHandler {
}).build();
try {
- retryer.call(() -> mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class)));
+ retryer.call(() -> mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds));
} catch (Exception e) {
- SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]",
- workflowTaskBatchId, waitMergeContext, e);
+ SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] taskBatchIds:[{}]",
+ workflowTaskBatch.getId(), taskBatchIds, e);
if (e.getClass().isAssignableFrom(RetryException.class)) {
// 如果自旋失败,就使用悲观锁
distributedLockHandler.lockWithDisposableAndRetry(() -> {
- mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class));
- }, MessageFormat.format(KEY, workflowTaskBatchId), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
+ mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds);
+ }, MessageFormat.format(KEY, workflowTaskBatch.getId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
}
}
}
- public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) {
+ public boolean mergeAllWorkflowContext(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
+ if (CollUtil.isEmpty(taskBatchIds)) {
+ return true;
+ }
+
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getWfContext, JobTask::getId)
- .eq(JobTask::getTaskBatchId, taskBatchId));
+ .in(JobTask::getTaskBatchId, taskBatchIds)
+ );
if (CollUtil.isEmpty(jobTasks)) {
return true;
}
Set<Map<String, Object>> maps = jobTasks.stream().map(r -> {
try {
- return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
+ if (StrUtil.isNotBlank(r.getWfContext())) {
+ return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
+ }
} catch (Exception e) {
- SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage());
+ SnailJobLog.LOCAL.warn("taskId:[{}] result value is not a JSON object. result:[{}]", r.getId(), r.getResultMessage());
}
return new HashMap<String, Object>();
}).collect(Collectors.toSet());
- Map<String, Object> mergeMap = Maps.newHashMap();
+ Map<String, Object> mergeMap;
+ if (StrUtil.isBlank(workflowTaskBatch.getWfContext())) {
+ mergeMap = Maps.newHashMap();
+ } else {
+ mergeMap = JsonUtil.parseHashMap(workflowTaskBatch.getWfContext());
+ }
+
for (Map<String, Object> map : maps) {
mergeMaps(mergeMap, map);
}
- return mergeWorkflowContext(workflowTaskBatchId, mergeMap);
+ WorkflowTaskBatch waitUpdateWorkflowTaskBatch = new WorkflowTaskBatch();
+ waitUpdateWorkflowTaskBatch.setId(workflowTaskBatch.getId());
+ waitUpdateWorkflowTaskBatch.setWfContext(JsonUtil.toJsonString(mergeMap));
+ waitUpdateWorkflowTaskBatch.setVersion(1);
+ return 1 == workflowTaskBatchMapper.update(waitUpdateWorkflowTaskBatch, new LambdaQueryWrapper<WorkflowTaskBatch>()
+ .eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId())
+ .eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion())
+ );
}
/**