diff --git a/easy-retry-common/easy-retry-common-client-api/pom.xml b/easy-retry-common/easy-retry-common-client-api/pom.xml
index 543e6e70f..809748096 100644
--- a/easy-retry-common/easy-retry-common-client-api/pom.xml
+++ b/easy-retry-common/easy-retry-common-client-api/pom.xml
@@ -16,6 +16,8 @@
17
+ 17
+ 17
diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/resources/application.properties b/easy-retry-common/easy-retry-common-client-api/src/main/resources/application.properties
deleted file mode 100644
index 8b1378917..000000000
--- a/easy-retry-common/easy-retry-common-client-api/src/main/resources/application.properties
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/easy-retry-common/easy-retry-common-core/pom.xml b/easy-retry-common/easy-retry-common-core/pom.xml
index f6bad9ee3..1291e5166 100644
--- a/easy-retry-common/easy-retry-common-core/pom.xml
+++ b/easy-retry-common/easy-retry-common-core/pom.xml
@@ -16,6 +16,8 @@
17
+ 17
+ 17
diff --git a/easy-retry-common/easy-retry-common-core/src/main/resources/META-INF/spring.factories b/easy-retry-common/easy-retry-common-core/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index b01f06049..000000000
--- a/easy-retry-common/easy-retry-common-core/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,4 +0,0 @@
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
- com.aizuda.easy.retry.common.core.CommonCoreConfigure
-
-
diff --git a/easy-retry-common/easy-retry-common-log/src/main/java/com/aizuda/easy/retry/common/log/lang/SecurityManagerCaller.java b/easy-retry-common/easy-retry-common-log/src/main/java/com/aizuda/easy/retry/common/log/lang/SecurityManagerCaller.java
deleted file mode 100644
index 129d0473d..000000000
--- a/easy-retry-common/easy-retry-common-log/src/main/java/com/aizuda/easy/retry/common/log/lang/SecurityManagerCaller.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.aizuda.easy.retry.common.log.lang;
-
-import cn.hutool.core.util.ArrayUtil;
-
-import java.io.Serializable;
-
-/**
- * {@link SecurityManager} 方式获取调用者
- *
- * @author wodeyangzipingpingwuqi
- */
-@Deprecated
-public class SecurityManagerCaller extends SecurityManager implements Caller, Serializable {
- private static final long serialVersionUID = 1L;
-
- private static final int OFFSET = 1;
-
- @Override
- public Class> getCaller() {
- final Class>[] context = getClassContext();
- if (null != context && (OFFSET + 1) < context.length) {
- return context[OFFSET + 1];
- }
- return null;
- }
-
- @Override
- public Class> getCallerCaller() {
- final Class>[] context = getClassContext();
- if (null != context && (OFFSET + 2) < context.length) {
- return context[OFFSET + 2];
- }
- return null;
- }
-
- @Override
- public Class> getCaller(int depth) {
- final Class>[] context = getClassContext();
- if (null != context && (OFFSET + depth) < context.length) {
- return context[OFFSET + depth];
- }
- return null;
- }
-
- @Override
- public boolean isCalledBy(Class> clazz) {
- final Class>[] classes = getClassContext();
- if(ArrayUtil.isNotEmpty(classes)) {
- for (Class> contextClass : classes) {
- if (contextClass.equals(clazz)) {
- return true;
- }
- }
- }
- return false;
- }
-}
diff --git a/easy-retry-common/easy-retry-common-server-api/pom.xml b/easy-retry-common/easy-retry-common-server-api/pom.xml
index 6ba8cbfad..4cb70bef7 100644
--- a/easy-retry-common/easy-retry-common-server-api/pom.xml
+++ b/easy-retry-common/easy-retry-common-server-api/pom.xml
@@ -16,6 +16,8 @@
17
+ 17
+ 17
diff --git a/easy-retry-common/easy-retry-common-server-api/src/main/resources/application.properties b/easy-retry-common/easy-retry-common-server-api/src/main/resources/application.properties
deleted file mode 100644
index 8b1378917..000000000
--- a/easy-retry-common/easy-retry-common-server-api/src/main/resources/application.properties
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/easy-retry-common/pom.xml b/easy-retry-common/pom.xml
index 496270c08..5c29527ad 100644
--- a/easy-retry-common/pom.xml
+++ b/easy-retry-common/pom.xml
@@ -16,6 +16,8 @@
17
+ 17
+ 17
diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/pom.xml b/easy-retry-datasource/easy-retry-mysql-datasource/pom.xml
index da6fa3221..90a97f3ba 100644
--- a/easy-retry-datasource/easy-retry-mysql-datasource/pom.xml
+++ b/easy-retry-datasource/easy-retry-mysql-datasource/pom.xml
@@ -16,7 +16,8 @@
17
- 8.0.33
+ 17
+ 17
@@ -26,9 +27,8 @@
true
- mysql
- mysql-connector-java
- ${mysql.version}
+ com.mysql
+ mysql-connector-j
com.aizuda
diff --git a/easy-retry-datasource/pom.xml b/easy-retry-datasource/pom.xml
index 985fcf0cc..a1bf94bc1 100644
--- a/easy-retry-datasource/pom.xml
+++ b/easy-retry-datasource/pom.xml
@@ -16,6 +16,8 @@
17
+ 17
+ 17
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/RestTemplateConfig.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/RestTemplateConfig.java
index 1048be2b8..961e6df6b 100644
--- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/RestTemplateConfig.java
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/RestTemplateConfig.java
@@ -1,12 +1,12 @@
package com.aizuda.easy.retry.server.common.config;
import com.aizuda.easy.retry.server.common.client.RequestInterceptor;
+import com.aizuda.easy.retry.server.common.okhttp.OkHttp3ClientHttpRequestFactory;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
-import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.TimeUnit;
@@ -23,7 +23,6 @@ public class RestTemplateConfig {
return new RestTemplate(factory);
}
-
@Bean
public ClientHttpRequestFactory okHttp3ClientHttpRequestFactory() {
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpRequest.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpRequest.java
new file mode 100644
index 000000000..6e9ef7763
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpRequest.java
@@ -0,0 +1,137 @@
+package com.aizuda.easy.retry.server.common.okhttp;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okio.BufferedSink;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.StreamingHttpOutputMessage.Body;
+import org.springframework.http.client.AbstractClientHttpRequest;
+import org.springframework.http.client.ClientHttpRequest;
+import org.springframework.http.client.ClientHttpResponse;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import org.springframework.util.FastByteArrayOutputStream;
+import org.springframework.util.StringUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+
+class OkHttp3ClientHttpRequest extends AbstractClientHttpRequest {
+
+ private final OkHttpClient client;
+
+ private final URI uri;
+
+ private final HttpMethod method;
+ @Nullable
+ private Body body;
+
+ @Nullable
+ private FastByteArrayOutputStream bodyStream;
+
+ public OkHttp3ClientHttpRequest(OkHttpClient client, URI uri, HttpMethod method) {
+ this.client = client;
+ this.uri = uri;
+ this.method = method;
+ }
+
+
+ @Override
+ public HttpMethod getMethod() {
+ return this.method;
+ }
+
+ @Override
+ public URI getURI() {
+ return this.uri;
+ }
+
+ @Override
+ protected OutputStream getBodyInternal(final HttpHeaders headers) throws IOException {
+ Assert.state(this.body == null, "Invoke either getBody or setBody; not both");
+
+ if (this.bodyStream == null) {
+ this.bodyStream = new FastByteArrayOutputStream(1024);
+ }
+ return this.bodyStream;
+ }
+
+ @NotNull
+ @Override
+ protected ClientHttpResponse executeInternal(final HttpHeaders headers) throws IOException {
+
+ if (this.body == null && this.bodyStream != null) {
+ this.body = outputStream -> this.bodyStream.writeTo(outputStream);
+ }
+
+ RequestBody requestBody;
+ if (body != null) {
+ requestBody = new BodyRequestBody(headers, body);
+ }
+ else if (okhttp3.internal.http.HttpMethod.requiresRequestBody(getMethod().name())) {
+ String header = headers.getFirst(HttpHeaders.CONTENT_TYPE);
+ MediaType contentType = (header != null) ? MediaType.parse(header) : null;
+ requestBody = RequestBody.create(contentType, new byte[0]);
+ }
+ else {
+ requestBody = null;
+ }
+ Request.Builder builder = new Request.Builder()
+ .url(this.uri.toURL());
+ builder.method(this.method.name(), requestBody);
+ headers.forEach((headerName, headerValues) -> {
+ for (String headerValue : headerValues) {
+ builder.addHeader(headerName, headerValue);
+ }
+ });
+ Request request = builder.build();
+ return new OkHttp3ClientHttpResponse(this.client.newCall(request).execute());
+ }
+
+ private static class BodyRequestBody extends RequestBody {
+
+ private final HttpHeaders headers;
+
+ private final Body body;
+
+
+ public BodyRequestBody(HttpHeaders headers, Body body) {
+ this.headers = headers;
+ this.body = body;
+ }
+
+ @Override
+ public long contentLength() {
+ return this.headers.getContentLength();
+ }
+
+ @Nullable
+ @Override
+ public MediaType contentType() {
+ String contentType = this.headers.getFirst(HttpHeaders.CONTENT_TYPE);
+ if (StringUtils.hasText(contentType)) {
+ return MediaType.parse(contentType);
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public void writeTo(BufferedSink sink) throws IOException {
+ this.body.writeTo(sink.outputStream());
+ }
+
+ @Override
+ public boolean isOneShot() {
+ return !this.body.repeatable();
+ }
+ }
+
+
+}
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpRequestFactory.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpRequestFactory.java
new file mode 100644
index 000000000..8c7e438ee
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpRequestFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2002-2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.aizuda.easy.retry.server.common.okhttp;
+
+import okhttp3.Cache;
+import okhttp3.OkHttpClient;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.client.ClientHttpRequest;
+import org.springframework.http.client.ClientHttpRequestFactory;
+import org.springframework.util.Assert;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+public class OkHttp3ClientHttpRequestFactory implements ClientHttpRequestFactory, DisposableBean {
+
+ private OkHttpClient client;
+
+ public OkHttp3ClientHttpRequestFactory(OkHttpClient client) {
+ Assert.notNull(client, "OkHttpClient must not be null");
+ this.client = client;
+ }
+
+ @Override
+ public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) {
+ return new OkHttp3ClientHttpRequest(this.client, uri, httpMethod);
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ // Clean up the client if we created it in the constructor
+ Cache cache = this.client.cache();
+ if (cache != null) {
+ cache.close();
+ }
+ this.client.dispatcher().executorService().shutdown();
+ this.client.connectionPool().evictAll();
+ }
+
+}
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpResponse.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpResponse.java
new file mode 100644
index 000000000..5161c41d2
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/okhttp/OkHttp3ClientHttpResponse.java
@@ -0,0 +1,67 @@
+package com.aizuda.easy.retry.server.common.okhttp;
+
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatusCode;
+import org.springframework.http.client.ClientHttpResponse;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+class OkHttp3ClientHttpResponse implements ClientHttpResponse {
+
+ private final Response response;
+
+ @Nullable
+ private volatile HttpHeaders headers;
+
+
+ public OkHttp3ClientHttpResponse(Response response) {
+ Assert.notNull(response, "Response must not be null");
+ this.response = response;
+ }
+
+
+ @Override
+ public HttpStatusCode getStatusCode() throws IOException {
+ return HttpStatusCode.valueOf(this.response.code());
+ }
+
+ @Override
+ public String getStatusText() {
+ return this.response.message();
+ }
+
+ @Override
+ public InputStream getBody() throws IOException {
+ ResponseBody body = this.response.body();
+ return (body != null ? body.byteStream() : InputStream.nullInputStream());
+ }
+
+ @Override
+ public HttpHeaders getHeaders() {
+ HttpHeaders headers = this.headers;
+ if (headers == null) {
+ headers = new HttpHeaders();
+ for (String headerName : this.response.headers().names()) {
+ for (String headerValue : this.response.headers(headerName)) {
+ headers.add(headerName, headerValue);
+ }
+ }
+ this.headers = headers;
+ }
+ return headers;
+ }
+
+ @Override
+ public void close() {
+ ResponseBody body = this.response.body();
+ if (body != null) {
+ body.close();
+ }
+ }
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/MutableGraphCache.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/MutableGraphCache.java
index 1051540a7..e67515753 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/MutableGraphCache.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/cache/MutableGraphCache.java
@@ -76,6 +76,11 @@ public class MutableGraphCache {
return descendants;
}
+ public static Set getBrotherNode(MutableGraph graph, Long nodeId) {
+ Set predecessors = graph.predecessors(nodeId);
+ return graph.successors(predecessors.stream().findFirst().get());
+ }
+
private static void getAllDescendantsHelper(MutableGraph graph, Long parentId, Set descendants) {
Set successors = graph.successors(parentId);
descendants.addAll(successors);
@@ -84,4 +89,5 @@ public class MutableGraphCache {
getAllDescendantsHelper(graph, successor, descendants);
}
}
+
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
index 9550f58ea..b7b6c88b7 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
@@ -53,6 +53,7 @@ import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class WorkflowExecutorActor extends AbstractActor {
+
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
@@ -69,7 +70,8 @@ public class WorkflowExecutorActor extends AbstractActor {
} catch (Exception e) {
EasyRetryLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
- handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
+ handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(),
+ JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
// TODO 发送通知
} finally {
getContext().stop(getSelf());
@@ -95,37 +97,41 @@ public class WorkflowExecutorActor extends AbstractActor {
return;
}
+ Set brotherNode = MutableGraphCache.getBrotherNode(graph, taskExecute.getParentId());
// 添加父节点,为了判断父节点的处理状态
List allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper()
- .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
- JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
- .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
- .in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
+ .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
+ JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
+ .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
+ .in(JobTaskBatch::getWorkflowNodeId,
+ Sets.union(successors, Sets.newHashSet(taskExecute.getParentId(), brotherNode)))
);
List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper()
- .in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
- .orderByAsc(WorkflowNode::getPriorityLevel));
+ .in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
+ .orderByAsc(WorkflowNode::getPriorityLevel));
- Map> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
- Map workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
+ Map> jobTaskBatchMap = allJobTaskBatchList.stream()
+ .collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
+ Map workflowNodeMap = workflowNodes.stream()
+ .collect(Collectors.toMap(WorkflowNode::getId, i -> i));
List parentJobTaskBatchList = jobTaskBatchMap.get(taskExecute.getParentId());
// 如果父节点是无需处理则不再继续执行
if (!CollectionUtils.isEmpty(parentJobTaskBatchList) &&
- parentJobTaskBatchList.stream()
- .map(JobTaskBatch::getOperationReason)
- .filter(Objects::nonNull)
- .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
+ parentJobTaskBatchList.stream()
+ .map(JobTaskBatch::getOperationReason)
+ .filter(Objects::nonNull)
+ .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}
// 失败策略处理
if (!CollectionUtils.isEmpty(parentJobTaskBatchList)
- && parentJobTaskBatchList.stream()
- .map(JobTaskBatch::getTaskBatchStatus)
- .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
+ && parentJobTaskBatchList.stream()
+ .map(JobTaskBatch::getTaskBatchStatus)
+ .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
// 判断是否继续处理,根据失败策略
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
// 失败了阻塞策略
@@ -134,11 +140,17 @@ public class WorkflowExecutorActor extends AbstractActor {
}
}
+ if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap)) {
+ return;
+ }
+
// 去掉父节点
- workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
+ workflowNodes = workflowNodes.stream()
+ .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
Collectors.toList());
- List jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
+ List jobs = jobMapper.selectBatchIds(
+ workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
// 只会条件节点会使用
@@ -169,6 +181,31 @@ public class WorkflowExecutorActor extends AbstractActor {
}
+ private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set brotherNode,
+ Map> jobTaskBatchMap) {
+
+ if (SystemConstants.ROOT.equals(taskExecute.getParentId())) {
+ return Boolean.TRUE;
+ }
+
+ // 判断所有节点是否都完成
+ for (final Long nodeId : brotherNode) {
+ List jobTaskBatches = jobTaskBatchMap.get(nodeId);
+ // 说明此节点未执行, 继续等待执行完成
+ if (CollectionUtils.isEmpty(jobTaskBatches)) {
+ return Boolean.FALSE;
+ }
+
+ boolean isCompleted = jobTaskBatches.stream().anyMatch(
+ jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
+ if (isCompleted) {
+ return Boolean.FALSE;
+ }
+ }
+
+ return Boolean.TRUE;
+ }
+
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
@@ -178,7 +215,7 @@ public class WorkflowExecutorActor extends AbstractActor {
jobTaskBatch.setOperationReason(operationReason);
jobTaskBatch.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
- () -> new EasyRetryServerException("更新任务失败"));
+ () -> new EasyRetryServerException("更新任务失败"));
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
index c84ad8789..8a76acfa5 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java
@@ -61,7 +61,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Override
public void execute(WorkflowExecutorContext context) {
- distributedLockHandler.lockWithDisposable(
+ distributedLockHandler.lockWithDisposableAndRetry(
() -> {
long total = 0;
// 条件节点存在并发问题,需要特殊处理
@@ -109,7 +109,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
}
});
}, MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()),
- Duration.ofSeconds(10));
+ Duration.ofSeconds(10), Duration.ofSeconds(3), 16);
}
diff --git a/easy-retry-server/pom.xml b/easy-retry-server/pom.xml
index 036bc274c..6e8f9dd2d 100644
--- a/easy-retry-server/pom.xml
+++ b/easy-retry-server/pom.xml
@@ -27,7 +27,6 @@
1.5.3.Final
2.6.21
4.4.0
- 5.0.0-alpha.11
0.9.16
32.0.0-jre
@@ -83,8 +82,10 @@
com.squareup.okhttp3
- okhttp
+ okhttp-bom
${okhttp.version}
+ pom
+ import
org.mapstruct