fix(sj_1.1.0): 优化工作流全局上下文逻辑
This commit is contained in:
parent
0c29d86047
commit
aeaa73776f
@ -1,5 +1,6 @@
|
||||
package com.aizuda.snailjob.client.job.core.client;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.client.common.annotation.Mapping;
|
||||
import com.aizuda.snailjob.client.common.annotation.SnailEndPoint;
|
||||
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
|
||||
@ -19,12 +20,15 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
import com.aizuda.snailjob.common.core.model.Result;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
|
||||
import com.google.common.collect.Maps;
|
||||
import jakarta.validation.Valid;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
|
||||
@ -116,6 +120,18 @@ public class JobEndPoint {
|
||||
jobContext.setRetryScene(dispatchJob.getRetryScene());
|
||||
jobContext.setTaskName(dispatchJob.getTaskName());
|
||||
jobContext.setMrStage(dispatchJob.getMrStage());
|
||||
|
||||
String wfContext = dispatchJob.getWfContext();
|
||||
if (StrUtil.isNotBlank(wfContext)) {
|
||||
try {
|
||||
jobContext.setWfContext(JsonUtil.parseConcurrentHashMap(wfContext));
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.REMOTE.warn("workflow context parse error", e);
|
||||
}
|
||||
} else {
|
||||
jobContext.setWfContext(Maps.newConcurrentMap());
|
||||
}
|
||||
|
||||
return jobContext;
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,11 @@
|
||||
package com.aizuda.snailjob.client.job.core.dto;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2023-10-18 16:53
|
||||
@ -15,4 +19,15 @@ public class JobArgs {
|
||||
private String executorInfo;
|
||||
|
||||
private Long taskBatchId;
|
||||
|
||||
private Map<String, Object> wfContext;
|
||||
|
||||
public void appendContext(String key, Object value) {
|
||||
if (Objects.isNull(wfContext) || StrUtil.isBlank(key) || Objects.isNull(value)) {
|
||||
return;
|
||||
}
|
||||
|
||||
wfContext.put(key, value);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
|
||||
import com.aizuda.snailjob.client.job.core.IJobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.cache.FutureCache;
|
||||
@ -12,6 +13,7 @@ import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
@ -60,6 +62,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
jobArgs = buildJobArgs(jobContext);
|
||||
}
|
||||
|
||||
jobArgs.setWfContext(jobContext.getWfContext());
|
||||
|
||||
try {
|
||||
// 初始化调度信息(日志上报LogUtil)
|
||||
initLogContext(jobContext);
|
||||
@ -117,6 +121,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
jobArgs.setArgsStr(jobContext.getArgsStr());
|
||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
jobArgs.setWfContext(jobContext.getWfContext());
|
||||
return jobArgs;
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
|
||||
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
|
||||
@ -32,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CancellationException;
|
||||
@ -160,6 +162,12 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
dispatchJobRequest.setTaskStatus(status);
|
||||
dispatchJobRequest.setRetry(jobContext.isRetry());
|
||||
dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
|
||||
// 传递上下文
|
||||
Map<String, Object> wfContext = jobContext.getWfContext();
|
||||
if (CollUtil.isNotEmpty(wfContext)) {
|
||||
dispatchJobRequest.setWfContext(JsonUtil.toJsonString(wfContext));
|
||||
}
|
||||
|
||||
return dispatchJobRequest;
|
||||
}
|
||||
|
||||
|
@ -73,5 +73,5 @@ public class DispatchJobRequest {
|
||||
/**
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wkContext;
|
||||
private String wfContext;
|
||||
}
|
||||
|
@ -44,5 +44,5 @@ public class DispatchJobResultRequest {
|
||||
/**
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wkContext;
|
||||
private String wfContext;
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package com.aizuda.snailjob.common.core.model;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
@ -59,5 +60,15 @@ public class JobContext {
|
||||
*/
|
||||
private String taskName;
|
||||
|
||||
/**
|
||||
* 动态分片所处的阶段
|
||||
*/
|
||||
private Integer mrStage;
|
||||
|
||||
/**
|
||||
* 工作流全局上下文
|
||||
*/
|
||||
private Map<String, Object> wfContext;
|
||||
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.YYYY_MM_DD;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.YYYY_MM_DD_HH_MM_SS;
|
||||
@ -72,6 +73,16 @@ public class JsonUtil {
|
||||
return JsonMapper.toJavaObject(jsonString, HashMap.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将JSON字符串转ConcurrentHashMap 对象
|
||||
*
|
||||
* @param jsonString
|
||||
* @return
|
||||
*/
|
||||
public static <K, V> Map<K, V> parseConcurrentHashMap(String jsonString) {
|
||||
return JsonMapper.toJavaObject(jsonString, ConcurrentHashMap.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将JSON字符串转Map 对象
|
||||
*
|
||||
|
@ -102,5 +102,5 @@ public class RealJobExecutorDTO extends BaseDTO {
|
||||
/**
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wkContext;
|
||||
private String wfContext;
|
||||
}
|
||||
|
@ -106,7 +106,8 @@ public interface JobTaskConverter {
|
||||
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
|
||||
@Mapping(source = "jobTask.namespaceId", target = "namespaceId"),
|
||||
@Mapping(source = "jobTask.taskName", target = "taskName"),
|
||||
@Mapping(source = "jobTask.mrStage", target = "mrStage")
|
||||
@Mapping(source = "jobTask.mrStage", target = "mrStage"),
|
||||
@Mapping(source = "context.wfContext", target = "wfContext")
|
||||
})
|
||||
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);
|
||||
|
||||
|
@ -50,5 +50,5 @@ public class ClientCallbackContext {
|
||||
/**
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wkContext;
|
||||
private String wfContext;
|
||||
}
|
||||
|
@ -198,7 +198,7 @@ public class JobExecutorActor extends AbstractActor {
|
||||
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
||||
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
|
||||
if (Objects.nonNull(workflowTaskBatch)) {
|
||||
context.setWkContext(workflowTaskBatch.getWfContext());
|
||||
context.setWfContext(workflowTaskBatch.getWfContext());
|
||||
}
|
||||
return context;
|
||||
}
|
||||
|
@ -41,7 +41,6 @@ public class JobExecutorResultActor extends AbstractActor {
|
||||
private static final String KEY = "job_complete_{0}_{1}";
|
||||
private final JobTaskMapper jobTaskMapper;
|
||||
private final JobTaskBatchHandler jobTaskBatchHandler;
|
||||
private final WorkflowBatchHandler workflowBatchHandler;
|
||||
private final DistributedLockHandler distributedLockHandler;
|
||||
|
||||
@Override
|
||||
|
@ -118,7 +118,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
// 添加父节点,为了判断父节点的处理状态
|
||||
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
|
||||
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
|
||||
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
|
||||
.in(JobTaskBatch::getWorkflowNodeId,
|
||||
Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId())))
|
||||
@ -167,6 +167,10 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
|
||||
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
|
||||
|
||||
// TODO 合并job task的结果到全局上下文中
|
||||
workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
|
||||
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
|
||||
|
||||
// 只会条件节点会使用
|
||||
Object evaluationResult = null;
|
||||
for (WorkflowNode workflowNode : workflowNodes) {
|
||||
|
@ -93,6 +93,6 @@ public class JobExecutorContext {
|
||||
/**
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wkContext;
|
||||
private String wfContext;
|
||||
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||
import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor;
|
||||
@ -56,6 +57,9 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
|
||||
|
||||
@Override
|
||||
public void execute(WorkflowExecutorContext context) {
|
||||
|
||||
// 若多个兄弟节点的情况下,同时处理完成则每个节点都有可能来执行后继节点,
|
||||
// 因此这里这里添加分布式锁
|
||||
distributedLockHandler.lockWithDisposableAndRetry(
|
||||
() -> {
|
||||
long total = 0;
|
||||
@ -69,6 +73,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
|
||||
|
||||
if (CollUtil.isNotEmpty(jobTaskBatches)) {
|
||||
total = jobTaskBatches.size();
|
||||
// ToDo
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatches.get(0);
|
||||
if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
|
||||
context.setEvaluationResult(Boolean.FALSE);
|
||||
@ -89,9 +94,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
|
||||
return;
|
||||
}
|
||||
|
||||
// 合并job task的结果到全局上下文中
|
||||
workflowBatchHandler.mergeAllWorkflowContext(context.getWorkflowTaskBatchId(), context.getTaskBatchId());
|
||||
|
||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
@Override
|
||||
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||
|
@ -40,6 +40,7 @@ import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
|
||||
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*;
|
||||
|
||||
/**
|
||||
@ -59,13 +60,23 @@ public class JobTaskBatchHandler {
|
||||
@Transactional
|
||||
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
|
||||
|
||||
// 幂等处理
|
||||
Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
|
||||
.in(JobTaskBatch::getTaskBatchStatus, COMPLETED)
|
||||
);
|
||||
if (countJobTaskBatch > 0) {
|
||||
// 批次已经完成了,不需要重复更新
|
||||
return true;
|
||||
}
|
||||
|
||||
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
||||
new LambdaQueryWrapper<JobTask>()
|
||||
.select(JobTask::getTaskStatus, JobTask::getMrStage)
|
||||
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
|
||||
new LambdaQueryWrapper<JobTask>()
|
||||
.select(JobTask::getTaskStatus, JobTask::getMrStage)
|
||||
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
|
||||
|
||||
if (CollUtil.isEmpty(jobTasks) ||
|
||||
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
|
||||
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -73,7 +84,7 @@ public class JobTaskBatchHandler {
|
||||
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
|
||||
|
||||
Map<Integer, Long> statusCountMap = jobTasks.stream()
|
||||
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
|
||||
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
|
||||
|
||||
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
|
||||
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
|
||||
@ -85,7 +96,8 @@ public class JobTaskBatchHandler {
|
||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
|
||||
} else {
|
||||
// todo 调试完成删除
|
||||
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
|
||||
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(),
|
||||
JsonUtil.toJsonString(jobTasks));
|
||||
|
||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
|
||||
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
|
||||
@ -107,9 +119,9 @@ public class JobTaskBatchHandler {
|
||||
|
||||
jobTaskBatch.setUpdateDt(LocalDateTime.now());
|
||||
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
|
||||
new LambdaUpdateWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
|
||||
new LambdaUpdateWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
|
||||
);
|
||||
|
||||
}
|
||||
@ -149,14 +161,15 @@ public class JobTaskBatchHandler {
|
||||
|
||||
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
|
||||
return jobTasks.size() == jobTasks.stream()
|
||||
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
|
||||
.count();
|
||||
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
|
||||
.count();
|
||||
}
|
||||
|
||||
private static boolean isALeastOneReduceTask(final List<JobTask> jobTasks) {
|
||||
return jobTasks.stream()
|
||||
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
|
||||
.count() > 1;
|
||||
.filter(
|
||||
jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
|
||||
.count() > 1;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -167,21 +180,21 @@ public class JobTaskBatchHandler {
|
||||
*/
|
||||
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
|
||||
if (Objects.isNull(job)
|
||||
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||
// 是否是常驻任务
|
||||
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
|
||||
// 防止任务已经分配到其他节点导致的任务重复执行
|
||||
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
|
||||
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|
||||
// 是否是常驻任务
|
||||
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
|
||||
// 防止任务已经分配到其他节点导致的任务重复执行
|
||||
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
|
||||
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
|
||||
.eq(GroupConfig::getGroupName, job.getGroupName())
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
|
||||
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
|
||||
.eq(GroupConfig::getGroupName, job.getGroupName())
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
|
||||
if (count == 0) {
|
||||
return;
|
||||
}
|
||||
@ -208,7 +221,7 @@ public class JobTaskBatchHandler {
|
||||
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
|
||||
|
||||
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds,
|
||||
DateUtils.toNowMilli() % 1000);
|
||||
DateUtils.toNowMilli() % 1000);
|
||||
job.setNextTriggerAt(nextTriggerAt);
|
||||
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
|
||||
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
|
||||
|
@ -61,7 +61,7 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_C
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class WorkflowBatchHandler {
|
||||
private static final String KEY = "update_wf_context_{}";
|
||||
private static final String KEY = "update_wf_context_{0}";
|
||||
|
||||
private final DistributedLockHandler distributedLockHandler;
|
||||
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||
@ -320,8 +320,14 @@ public class WorkflowBatchHandler {
|
||||
}
|
||||
}
|
||||
|
||||
public void mergeWorkflowContextAndRetry(Long workflowTaskBatchId, String waitMergeContext) {
|
||||
if (StrUtil.isBlank(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) {
|
||||
/**
|
||||
* 合并工作流上下文若合并失败先自旋3次1.5s, 若失败了升级到悲观锁
|
||||
*
|
||||
* @param workflowTaskBatch 工作流批次
|
||||
* @param taskBatchIds 批次列表
|
||||
*/
|
||||
public void mergeWorkflowContextAndRetry(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
|
||||
if (CollUtil.isEmpty(taskBatchIds)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -349,42 +355,62 @@ public class WorkflowBatchHandler {
|
||||
}).build();
|
||||
|
||||
try {
|
||||
retryer.call(() -> mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class)));
|
||||
retryer.call(() -> mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds));
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]",
|
||||
workflowTaskBatchId, waitMergeContext, e);
|
||||
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] taskBatchIds:[{}]",
|
||||
workflowTaskBatch.getId(), taskBatchIds, e);
|
||||
if (e.getClass().isAssignableFrom(RetryException.class)) {
|
||||
// 如果自旋失败,就使用悲观锁
|
||||
distributedLockHandler.lockWithDisposableAndRetry(() -> {
|
||||
mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class));
|
||||
}, MessageFormat.format(KEY, workflowTaskBatchId), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
|
||||
mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds);
|
||||
}, MessageFormat.format(KEY, workflowTaskBatch.getId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) {
|
||||
public boolean mergeAllWorkflowContext(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
|
||||
if (CollUtil.isEmpty(taskBatchIds)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||
.select(JobTask::getWfContext, JobTask::getId)
|
||||
.eq(JobTask::getTaskBatchId, taskBatchId));
|
||||
.in(JobTask::getTaskBatchId, taskBatchIds)
|
||||
);
|
||||
if (CollUtil.isEmpty(jobTasks)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Set<Map<String, Object>> maps = jobTasks.stream().map(r -> {
|
||||
try {
|
||||
return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
|
||||
if (StrUtil.isNotBlank(r.getWfContext())) {
|
||||
return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage());
|
||||
SnailJobLog.LOCAL.warn("taskId:[{}] result value is not a JSON object. result:[{}]", r.getId(), r.getResultMessage());
|
||||
}
|
||||
return new HashMap<String, Object>();
|
||||
}).collect(Collectors.toSet());
|
||||
|
||||
Map<String, Object> mergeMap = Maps.newHashMap();
|
||||
Map<String, Object> mergeMap;
|
||||
if (StrUtil.isBlank(workflowTaskBatch.getWfContext())) {
|
||||
mergeMap = Maps.newHashMap();
|
||||
} else {
|
||||
mergeMap = JsonUtil.parseHashMap(workflowTaskBatch.getWfContext());
|
||||
}
|
||||
|
||||
for (Map<String, Object> map : maps) {
|
||||
mergeMaps(mergeMap, map);
|
||||
}
|
||||
|
||||
return mergeWorkflowContext(workflowTaskBatchId, mergeMap);
|
||||
WorkflowTaskBatch waitUpdateWorkflowTaskBatch = new WorkflowTaskBatch();
|
||||
waitUpdateWorkflowTaskBatch.setId(workflowTaskBatch.getId());
|
||||
waitUpdateWorkflowTaskBatch.setWfContext(JsonUtil.toJsonString(mergeMap));
|
||||
waitUpdateWorkflowTaskBatch.setVersion(1);
|
||||
return 1 == workflowTaskBatchMapper.update(waitUpdateWorkflowTaskBatch, new LambdaQueryWrapper<WorkflowTaskBatch>()
|
||||
.eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId())
|
||||
.eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion())
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user