fix(sj_1.1.0): 优化工作流全局上下文逻辑

This commit is contained in:
opensnail 2024-06-18 18:38:01 +08:00
parent 36ba663fbf
commit fda554fc46
18 changed files with 161 additions and 50 deletions

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.job.core.client;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.annotation.SnailEndPoint;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
@ -19,12 +20,15 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.google.common.collect.Maps;
import jakarta.validation.Valid;
import org.springframework.validation.annotation.Validated;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
@ -116,6 +120,18 @@ public class JobEndPoint {
jobContext.setRetryScene(dispatchJob.getRetryScene());
jobContext.setTaskName(dispatchJob.getTaskName());
jobContext.setMrStage(dispatchJob.getMrStage());
String wfContext = dispatchJob.getWfContext();
if (StrUtil.isNotBlank(wfContext)) {
try {
jobContext.setWfContext(JsonUtil.parseConcurrentHashMap(wfContext));
} catch (Exception e) {
SnailJobLog.REMOTE.warn("workflow context parse error", e);
}
} else {
jobContext.setWfContext(Maps.newConcurrentMap());
}
return jobContext;
}

View File

@ -1,7 +1,11 @@
package com.aizuda.snailjob.client.job.core.dto;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
import java.util.Map;
import java.util.Objects;
/**
* @author: opensnail
* @date : 2023-10-18 16:53
@ -15,4 +19,15 @@ public class JobArgs {
private String executorInfo;
private Long taskBatchId;
private Map<String, Object> wfContext;
public void appendContext(String key, Object value) {
if (Objects.isNull(wfContext) || StrUtil.isBlank(key) || Objects.isNull(value)) {
return;
}
wfContext.put(key, value);
}
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.job.core.executor;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.cache.FutureCache;
@ -12,6 +13,7 @@ import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
@ -60,6 +62,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
jobArgs = buildJobArgs(jobContext);
}
jobArgs.setWfContext(jobContext.getWfContext());
try {
// 初始化调度信息日志上报LogUtil
initLogContext(jobContext);
@ -117,6 +121,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
jobArgs.setWfContext(jobContext.getWfContext());
return jobArgs;
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.job.core.executor;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
@ -32,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
@ -160,6 +162,12 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
dispatchJobRequest.setTaskStatus(status);
dispatchJobRequest.setRetry(jobContext.isRetry());
dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
// 传递上下文
Map<String, Object> wfContext = jobContext.getWfContext();
if (CollUtil.isNotEmpty(wfContext)) {
dispatchJobRequest.setWfContext(JsonUtil.toJsonString(wfContext));
}
return dispatchJobRequest;
}

View File

@ -73,5 +73,5 @@ public class DispatchJobRequest {
/**
* 工作流上下文
*/
private String wkContext;
private String wfContext;
}

View File

@ -44,5 +44,5 @@ public class DispatchJobResultRequest {
/**
* 工作流上下文
*/
private String wkContext;
private String wfContext;
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.common.core.model;
import lombok.Data;
import java.util.List;
import java.util.Map;
/**
* @author: opensnail
@ -59,5 +60,15 @@ public class JobContext {
*/
private String taskName;
/**
* 动态分片所处的阶段
*/
private Integer mrStage;
/**
* 工作流全局上下文
*/
private Map<String, Object> wfContext;
}

View File

@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: byteblogs
@ -69,6 +70,16 @@ public class JsonUtil {
return JsonMapper.toJavaObject(jsonString, HashMap.class);
}
/**
* 将JSON字符串转ConcurrentHashMap 对象
*
* @param jsonString
* @return
*/
public static <K, V> Map<K, V> parseConcurrentHashMap(String jsonString) {
return JsonMapper.toJavaObject(jsonString, ConcurrentHashMap.class);
}
/**
* 将JSON字符串转Map 对象
*

View File

@ -102,5 +102,5 @@ public class RealJobExecutorDTO extends BaseDTO {
/**
* 工作流上下文
*/
private String wkContext;
private String wfContext;
}

View File

@ -106,7 +106,8 @@ public interface JobTaskConverter {
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
@Mapping(source = "jobTask.namespaceId", target = "namespaceId"),
@Mapping(source = "jobTask.taskName", target = "taskName"),
@Mapping(source = "jobTask.mrStage", target = "mrStage")
@Mapping(source = "jobTask.mrStage", target = "mrStage"),
@Mapping(source = "context.wfContext", target = "wfContext")
})
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);

View File

@ -50,5 +50,5 @@ public class ClientCallbackContext {
/**
* 工作流上下文
*/
private String wkContext;
private String wfContext;
}

View File

@ -198,7 +198,7 @@ public class JobExecutorActor extends AbstractActor {
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
if (Objects.nonNull(workflowTaskBatch)) {
context.setWkContext(workflowTaskBatch.getWfContext());
context.setWfContext(workflowTaskBatch.getWfContext());
}
return context;
}

View File

@ -41,7 +41,6 @@ public class JobExecutorResultActor extends AbstractActor {
private static final String KEY = "job_complete_{0}_{1}";
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchHandler jobTaskBatchHandler;
private final WorkflowBatchHandler workflowBatchHandler;
private final DistributedLockHandler distributedLockHandler;
@Override

View File

@ -118,7 +118,7 @@ public class WorkflowExecutorActor extends AbstractActor {
// 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId,
Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId())))
@ -167,6 +167,10 @@ public class WorkflowExecutorActor extends AbstractActor {
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
// TODO 合并job task的结果到全局上下文中
workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
// 只会条件节点会使用
Object evaluationResult = null;
for (WorkflowNode workflowNode : workflowNodes) {

View File

@ -93,6 +93,6 @@ public class JobExecutorContext {
/**
* 工作流上下文
*/
private String wkContext;
private String wfContext;
}

View File

@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobArgsTypeEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor;
@ -56,6 +57,9 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Override
public void execute(WorkflowExecutorContext context) {
// 若多个兄弟节点的情况下同时处理完成则每个节点都有可能来执行后继节点
// 因此这里这里添加分布式锁
distributedLockHandler.lockWithDisposableAndRetry(
() -> {
long total = 0;
@ -69,6 +73,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
if (CollUtil.isNotEmpty(jobTaskBatches)) {
total = jobTaskBatches.size();
// ToDo
JobTaskBatch jobTaskBatch = jobTaskBatches.get(0);
if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
context.setEvaluationResult(Boolean.FALSE);
@ -89,9 +94,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return;
}
// 合并job task的结果到全局上下文中
workflowBatchHandler.mergeAllWorkflowContext(context.getWorkflowTaskBatchId(), context.getTaskBatchId());
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {

View File

@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
import static com.aizuda.snailjob.common.core.enums.MapReduceStageEnum.*;
/**
@ -59,13 +60,23 @@ public class JobTaskBatchHandler {
@Transactional
public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
// 幂等处理
Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, COMPLETED)
);
if (countJobTaskBatch > 0) {
// 批次已经完成了不需要重复更新
return true;
}
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getMrStage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getMrStage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
if (CollUtil.isEmpty(jobTasks) ||
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
@ -73,7 +84,7 @@ public class JobTaskBatchHandler {
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
Map<Integer, Long> statusCountMap = jobTasks.stream()
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
.collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
@ -85,7 +96,8 @@ public class JobTaskBatchHandler {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else {
// todo 调试完成删除
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(),
JsonUtil.toJsonString(jobTasks));
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
@ -107,9 +119,9 @@ public class JobTaskBatchHandler {
jobTaskBatch.setUpdateDt(LocalDateTime.now());
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}
@ -149,14 +161,15 @@ public class JobTaskBatchHandler {
private static boolean isAllMapTask(final List<JobTask> jobTasks) {
return jobTasks.size() == jobTasks.stream()
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
.count();
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MAP.getStage() == jobTask.getMrStage())
.count();
}
private static boolean isALeastOneReduceTask(final List<JobTask> jobTasks) {
return jobTasks.stream()
.filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
.count() > 1;
.filter(
jobTask -> Objects.nonNull(jobTask.getMrStage()) && REDUCE.getStage() == jobTask.getMrStage())
.count() > 1;
}
/**
@ -167,21 +180,21 @@ public class JobTaskBatchHandler {
*/
public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
// 防止任务已经分配到其他节点导致的任务重复执行
|| !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())
) {
return;
}
long count = groupConfigMapper.selectCount(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()));
if (count == 0) {
return;
}
@ -208,7 +221,7 @@ public class JobTaskBatchHandler {
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
log.debug("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds,
DateUtils.toNowMilli() % 1000);
DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);

View File

@ -61,7 +61,7 @@ import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.NOT_C
@Component
@RequiredArgsConstructor
public class WorkflowBatchHandler {
private static final String KEY = "update_wf_context_{}";
private static final String KEY = "update_wf_context_{0}";
private final DistributedLockHandler distributedLockHandler;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
@ -320,8 +320,14 @@ public class WorkflowBatchHandler {
}
}
public void mergeWorkflowContextAndRetry(Long workflowTaskBatchId, String waitMergeContext) {
if (StrUtil.isBlank(waitMergeContext) || Objects.isNull(workflowTaskBatchId)) {
/**
* 合并工作流上下文若合并失败先自旋3次1.5s, 若失败了升级到悲观锁
*
* @param workflowTaskBatch 工作流批次
* @param taskBatchIds 批次列表
*/
public void mergeWorkflowContextAndRetry(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
if (CollUtil.isEmpty(taskBatchIds)) {
return;
}
@ -349,42 +355,62 @@ public class WorkflowBatchHandler {
}).build();
try {
retryer.call(() -> mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class)));
retryer.call(() -> mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds));
} catch (Exception e) {
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] waitMergeContext:[{}]",
workflowTaskBatchId, waitMergeContext, e);
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] taskBatchIds:[{}]",
workflowTaskBatch.getId(), taskBatchIds, e);
if (e.getClass().isAssignableFrom(RetryException.class)) {
// 如果自旋失败就使用悲观锁
distributedLockHandler.lockWithDisposableAndRetry(() -> {
mergeWorkflowContext(workflowTaskBatchId, JsonUtil.parseHashMap(waitMergeContext, Object.class));
}, MessageFormat.format(KEY, workflowTaskBatchId), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds);
}, MessageFormat.format(KEY, workflowTaskBatch.getId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
}
}
}
public boolean mergeAllWorkflowContext(Long workflowTaskBatchId, Long taskBatchId) {
public boolean mergeAllWorkflowContext(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
if (CollUtil.isEmpty(taskBatchIds)) {
return true;
}
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getWfContext, JobTask::getId)
.eq(JobTask::getTaskBatchId, taskBatchId));
.in(JobTask::getTaskBatchId, taskBatchIds)
);
if (CollUtil.isEmpty(jobTasks)) {
return true;
}
Set<Map<String, Object>> maps = jobTasks.stream().map(r -> {
try {
return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
if (StrUtil.isNotBlank(r.getWfContext())) {
return JsonUtil.parseHashMap(r.getWfContext(), Object.class);
}
} catch (Exception e) {
SnailJobLog.LOCAL.warn("taskId:[{}] result value 不是一个json对象. result:[{}]", r.getId(), r.getResultMessage());
SnailJobLog.LOCAL.warn("taskId:[{}] result value is not a JSON object. result:[{}]", r.getId(), r.getResultMessage());
}
return new HashMap<String, Object>();
}).collect(Collectors.toSet());
Map<String, Object> mergeMap = Maps.newHashMap();
Map<String, Object> mergeMap;
if (StrUtil.isBlank(workflowTaskBatch.getWfContext())) {
mergeMap = Maps.newHashMap();
} else {
mergeMap = JsonUtil.parseHashMap(workflowTaskBatch.getWfContext());
}
for (Map<String, Object> map : maps) {
mergeMaps(mergeMap, map);
}
return mergeWorkflowContext(workflowTaskBatchId, mergeMap);
WorkflowTaskBatch waitUpdateWorkflowTaskBatch = new WorkflowTaskBatch();
waitUpdateWorkflowTaskBatch.setId(workflowTaskBatch.getId());
waitUpdateWorkflowTaskBatch.setWfContext(JsonUtil.toJsonString(mergeMap));
waitUpdateWorkflowTaskBatch.setVersion(1);
return 1 == workflowTaskBatchMapper.update(waitUpdateWorkflowTaskBatch, new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId())
.eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion())
);
}
/**