feat: 3.1.0

1.  工作流等待所有节点完成才执行子节点
This commit is contained in:
byteblogs168 2024-02-01 19:29:13 +08:00
parent 28c4a8a49c
commit b7ede1ccf0
18 changed files with 343 additions and 92 deletions

View File

@ -16,6 +16,8 @@
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>

View File

@ -16,6 +16,8 @@
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>

View File

@ -1,4 +0,0 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.aizuda.easy.retry.common.core.CommonCoreConfigure

View File

@ -1,57 +0,0 @@
package com.aizuda.easy.retry.common.log.lang;
import cn.hutool.core.util.ArrayUtil;
import java.io.Serializable;
/**
* {@link SecurityManager} 方式获取调用者
*
* @author wodeyangzipingpingwuqi
*/
@Deprecated
public class SecurityManagerCaller extends SecurityManager implements Caller, Serializable {
private static final long serialVersionUID = 1L;
private static final int OFFSET = 1;
@Override
public Class<?> getCaller() {
final Class<?>[] context = getClassContext();
if (null != context && (OFFSET + 1) < context.length) {
return context[OFFSET + 1];
}
return null;
}
@Override
public Class<?> getCallerCaller() {
final Class<?>[] context = getClassContext();
if (null != context && (OFFSET + 2) < context.length) {
return context[OFFSET + 2];
}
return null;
}
@Override
public Class<?> getCaller(int depth) {
final Class<?>[] context = getClassContext();
if (null != context && (OFFSET + depth) < context.length) {
return context[OFFSET + depth];
}
return null;
}
@Override
public boolean isCalledBy(Class<?> clazz) {
final Class<?>[] classes = getClassContext();
if(ArrayUtil.isNotEmpty(classes)) {
for (Class<?> contextClass : classes) {
if (contextClass.equals(clazz)) {
return true;
}
}
}
return false;
}
}

View File

@ -16,6 +16,8 @@
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>

View File

@ -16,6 +16,8 @@
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<modules>

View File

@ -16,7 +16,8 @@
<properties>
<java.version>17</java.version>
<mysql.version>8.0.33</mysql.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
@ -26,9 +27,8 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>

View File

@ -16,6 +16,8 @@
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<modules>

View File

@ -1,12 +1,12 @@
package com.aizuda.easy.retry.server.common.config;
import com.aizuda.easy.retry.server.common.client.RequestInterceptor;
import com.aizuda.easy.retry.server.common.okhttp.OkHttp3ClientHttpRequestFactory;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.TimeUnit;
@ -23,7 +23,6 @@ public class RestTemplateConfig {
return new RestTemplate(factory);
}
@Bean
public ClientHttpRequestFactory okHttp3ClientHttpRequestFactory() {

View File

@ -0,0 +1,137 @@
package com.aizuda.easy.retry.server.common.okhttp;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.BufferedSink;
import org.jetbrains.annotations.NotNull;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.StreamingHttpOutputMessage.Body;
import org.springframework.http.client.AbstractClientHttpRequest;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.FastByteArrayOutputStream;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
class OkHttp3ClientHttpRequest extends AbstractClientHttpRequest {
private final OkHttpClient client;
private final URI uri;
private final HttpMethod method;
@Nullable
private Body body;
@Nullable
private FastByteArrayOutputStream bodyStream;
public OkHttp3ClientHttpRequest(OkHttpClient client, URI uri, HttpMethod method) {
this.client = client;
this.uri = uri;
this.method = method;
}
@Override
public HttpMethod getMethod() {
return this.method;
}
@Override
public URI getURI() {
return this.uri;
}
@Override
protected OutputStream getBodyInternal(final HttpHeaders headers) throws IOException {
Assert.state(this.body == null, "Invoke either getBody or setBody; not both");
if (this.bodyStream == null) {
this.bodyStream = new FastByteArrayOutputStream(1024);
}
return this.bodyStream;
}
@NotNull
@Override
protected ClientHttpResponse executeInternal(final HttpHeaders headers) throws IOException {
if (this.body == null && this.bodyStream != null) {
this.body = outputStream -> this.bodyStream.writeTo(outputStream);
}
RequestBody requestBody;
if (body != null) {
requestBody = new BodyRequestBody(headers, body);
}
else if (okhttp3.internal.http.HttpMethod.requiresRequestBody(getMethod().name())) {
String header = headers.getFirst(HttpHeaders.CONTENT_TYPE);
MediaType contentType = (header != null) ? MediaType.parse(header) : null;
requestBody = RequestBody.create(contentType, new byte[0]);
}
else {
requestBody = null;
}
Request.Builder builder = new Request.Builder()
.url(this.uri.toURL());
builder.method(this.method.name(), requestBody);
headers.forEach((headerName, headerValues) -> {
for (String headerValue : headerValues) {
builder.addHeader(headerName, headerValue);
}
});
Request request = builder.build();
return new OkHttp3ClientHttpResponse(this.client.newCall(request).execute());
}
private static class BodyRequestBody extends RequestBody {
private final HttpHeaders headers;
private final Body body;
public BodyRequestBody(HttpHeaders headers, Body body) {
this.headers = headers;
this.body = body;
}
@Override
public long contentLength() {
return this.headers.getContentLength();
}
@Nullable
@Override
public MediaType contentType() {
String contentType = this.headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasText(contentType)) {
return MediaType.parse(contentType);
}
else {
return null;
}
}
@Override
public void writeTo(BufferedSink sink) throws IOException {
this.body.writeTo(sink.outputStream());
}
@Override
public boolean isOneShot() {
return !this.body.repeatable();
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aizuda.easy.retry.server.common.okhttp;
import okhttp3.Cache;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.util.Assert;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public class OkHttp3ClientHttpRequestFactory implements ClientHttpRequestFactory, DisposableBean {
private OkHttpClient client;
public OkHttp3ClientHttpRequestFactory(OkHttpClient client) {
Assert.notNull(client, "OkHttpClient must not be null");
this.client = client;
}
@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) {
return new OkHttp3ClientHttpRequest(this.client, uri, httpMethod);
}
@Override
public void destroy() throws IOException {
// Clean up the client if we created it in the constructor
Cache cache = this.client.cache();
if (cache != null) {
cache.close();
}
this.client.dispatcher().executorService().shutdown();
this.client.connectionPool().evictAll();
}
}

View File

@ -0,0 +1,67 @@
package com.aizuda.easy.retry.server.common.okhttp;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import java.io.IOException;
import java.io.InputStream;
class OkHttp3ClientHttpResponse implements ClientHttpResponse {
private final Response response;
@Nullable
private volatile HttpHeaders headers;
public OkHttp3ClientHttpResponse(Response response) {
Assert.notNull(response, "Response must not be null");
this.response = response;
}
@Override
public HttpStatusCode getStatusCode() throws IOException {
return HttpStatusCode.valueOf(this.response.code());
}
@Override
public String getStatusText() {
return this.response.message();
}
@Override
public InputStream getBody() throws IOException {
ResponseBody body = this.response.body();
return (body != null ? body.byteStream() : InputStream.nullInputStream());
}
@Override
public HttpHeaders getHeaders() {
HttpHeaders headers = this.headers;
if (headers == null) {
headers = new HttpHeaders();
for (String headerName : this.response.headers().names()) {
for (String headerValue : this.response.headers(headerName)) {
headers.add(headerName, headerValue);
}
}
this.headers = headers;
}
return headers;
}
@Override
public void close() {
ResponseBody body = this.response.body();
if (body != null) {
body.close();
}
}
}

View File

@ -76,6 +76,11 @@ public class MutableGraphCache {
return descendants;
}
public static Set<Long> getBrotherNode(MutableGraph<Long> graph, Long nodeId) {
Set<Long> predecessors = graph.predecessors(nodeId);
return graph.successors(predecessors.stream().findFirst().get());
}
private static void getAllDescendantsHelper(MutableGraph<Long> graph, Long parentId, Set<Long> descendants) {
Set<Long> successors = graph.successors(parentId);
descendants.addAll(successors);
@ -84,4 +89,5 @@ public class MutableGraphCache {
getAllDescendantsHelper(graph, successor, descendants);
}
}
}

View File

@ -53,6 +53,7 @@ import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class WorkflowExecutorActor extends AbstractActor {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
@ -69,7 +70,8 @@ public class WorkflowExecutorActor extends AbstractActor {
} catch (Exception e) {
EasyRetryLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(),
JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
// TODO 发送通知
} finally {
getContext().stop(getSelf());
@ -95,37 +97,41 @@ public class WorkflowExecutorActor extends AbstractActor {
return;
}
Set<Long> brotherNode = MutableGraphCache.getBrotherNode(graph, taskExecute.getParentId());
// 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId,
Sets.union(successors, Sets.newHashSet(taskExecute.getParentId(), brotherNode)))
);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream()
.collect(Collectors.toMap(WorkflowNode::getId, i -> i));
List<JobTaskBatch> parentJobTaskBatchList = jobTaskBatchMap.get(taskExecute.getParentId());
// 如果父节点是无需处理则不再继续执行
if (!CollectionUtils.isEmpty(parentJobTaskBatchList) &&
parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}
// 失败策略处理
if (!CollectionUtils.isEmpty(parentJobTaskBatchList)
&& parentJobTaskBatchList.stream()
.map(JobTaskBatch::getTaskBatchStatus)
.anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
&& parentJobTaskBatchList.stream()
.map(JobTaskBatch::getTaskBatchStatus)
.anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
// 判断是否继续处理根据失败策略
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
// 失败了阻塞策略
@ -134,11 +140,17 @@ public class WorkflowExecutorActor extends AbstractActor {
}
}
if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap)) {
return;
}
// 去掉父节点
workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
workflowNodes = workflowNodes.stream()
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
Collectors.toList());
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
List<Job> jobs = jobMapper.selectBatchIds(
workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
// 只会条件节点会使用
@ -169,6 +181,31 @@ public class WorkflowExecutorActor extends AbstractActor {
}
private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> brotherNode,
Map<Long, List<JobTaskBatch>> jobTaskBatchMap) {
if (SystemConstants.ROOT.equals(taskExecute.getParentId())) {
return Boolean.TRUE;
}
// 判断所有节点是否都完成
for (final Long nodeId : brotherNode) {
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(nodeId);
// 说明此节点未执行, 继续等待执行完成
if (CollectionUtils.isEmpty(jobTaskBatches)) {
return Boolean.FALSE;
}
boolean isCompleted = jobTaskBatches.stream().anyMatch(
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
if (isCompleted) {
return Boolean.FALSE;
}
}
return Boolean.TRUE;
}
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
@ -178,7 +215,7 @@ public class WorkflowExecutorActor extends AbstractActor {
jobTaskBatch.setOperationReason(operationReason);
jobTaskBatch.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
() -> new EasyRetryServerException("更新任务失败"));
}

View File

@ -61,7 +61,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Override
public void execute(WorkflowExecutorContext context) {
distributedLockHandler.lockWithDisposable(
distributedLockHandler.lockWithDisposableAndRetry(
() -> {
long total = 0;
// 条件节点存在并发问题需要特殊处理
@ -109,7 +109,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
}
});
}, MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()),
Duration.ofSeconds(10));
Duration.ofSeconds(10), Duration.ofSeconds(3), 16);
}

View File

@ -27,7 +27,6 @@
<org.mapstruct.version>1.5.3.Final</org.mapstruct.version>
<akka.version>2.6.21</akka.version>
<java-jwt.version>4.4.0</java-jwt.version>
<okhttp.version>5.0.0-alpha.11</okhttp.version>
<perf4j.version>0.9.16</perf4j.version>
<guava.version>32.0.0-jre</guava.version>
</properties>
@ -83,8 +82,10 @@
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<artifactId>okhttp-bom</artifactId>
<version>${okhttp.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>