feat: 2.6.0

1. 修复任务和回调节点关闭后继节点无法运行问题
This commit is contained in:
byteblogs168 2024-01-19 10:53:59 +08:00
parent e9067eda6b
commit e383abacad
25 changed files with 205 additions and 150 deletions

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.client.common.appender;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import org.apache.log4j.MDC;
import org.apache.logging.log4j.core.Filter;
@ -32,11 +32,11 @@ public class EasyRetryLog4j2Appender extends AbstractAppender {
public void append(LogEvent event) {
// Not job context
if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstant.MDC_REMOTE))) {
if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstants.MDC_REMOTE))) {
return;
}
MDC.remove(LogFieldConstant.MDC_REMOTE);
MDC.remove(LogFieldConstants.MDC_REMOTE);
LogContentDTO logContentDTO = new LogContentDTO();
logContentDTO.addTimeStamp(event.getTimeMillis());
logContentDTO.addLevelField(event.getLevel().name());

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.client.common.appender;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.MDC;
@ -28,11 +28,11 @@ public class EasyRetryLog4jAppender extends AppenderSkeleton {
protected void append(LoggingEvent event) {
// Not job context
if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstant.MDC_REMOTE))) {
if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstants.MDC_REMOTE))) {
return;
}
MDC.remove(LogFieldConstant.MDC_REMOTE);
MDC.remove(LogFieldConstants.MDC_REMOTE);
LogContentDTO logContentDTO = new LogContentDTO();
logContentDTO.addTimeStamp(event.getTimeStamp());
logContentDTO.addLevelField(event.getLevel().toString());

View File

@ -9,7 +9,7 @@ import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import org.slf4j.MDC;
@ -33,11 +33,11 @@ public class EasyRetryLogbackAppender<E> extends UnsynchronizedAppenderBase<E> {
// Not job context
if (!(eventObject instanceof LoggingEvent)
|| Objects.isNull(ThreadLocalLogUtil.getContext())
|| Objects.isNull(MDC.get(LogFieldConstant.MDC_REMOTE))) {
|| Objects.isNull(MDC.get(LogFieldConstants.MDC_REMOTE))) {
return;
}
MDC.remove(LogFieldConstant.MDC_REMOTE);
MDC.remove(LogFieldConstants.MDC_REMOTE);
LogContentDTO logContentDTO = new LogContentDTO();
// Prepare processing

View File

@ -19,20 +19,20 @@ import java.util.List;
public enum JobOperationReasonEnum {
NONE(0, StrUtil.EMPTY),
EXECUTE_TIMEOUT(1, "任务执行超时"),
TASK_EXECUTION_TIMEOUT(1, "任务执行超时"),
NOT_CLIENT(2, "无客户端节点"),
JOB_CLOSED(3, "JOB已关闭"),
JOB_DISCARD(4, "任务丢弃"),
JOB_OVERLAY(5, "任务被覆盖"),
NOT_EXECUTE_TASK(6, "无可执行任务项"),
TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"),
NOT_EXECUTION_TASK(6, "无可执行任务项"),
TASK_EXECUTION_ERROR(7, "任务执行期间发生非预期异常"),
MANNER_STOP(8, "手动停止"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(9, "条件节点执行异常"),
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR(9, "条件节点执行异常"),
JOB_TASK_INTERRUPTED(10, "任务中断"),
WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR(11, "回调节点执行异常"),
WORKFLOW_NODE_NO_OPERATION_REQUIRED(12, "无需处理"),
WORKFLOW_NODE_EXECUTOR_ERROR_SKIP(13, "节点处理失败并跳过"),
WORKFLOW_DECISION_FOR_FALSE(14, "判定未通过"),
WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR(11, "回调节点执行异常"),
WORKFLOW_NODE_NO_REQUIRED(12, "无需处理"),
WORKFLOW_NODE_CLOSED_SKIP_EXECUTION(13, "节点关闭跳过执行"),
WORKFLOW_DECISION_FAILED(14, "判定未通过"),
;
@ -43,8 +43,8 @@ public enum JobOperationReasonEnum {
/**
* 工作流后续节点跳过执行配置
*/
public static final List<Integer> WORKFLOW_SUCCESSOR_SKIP_EXECUTE = Arrays.asList(
WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason(), WORKFLOW_DECISION_FOR_FALSE.getReason(),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.reason);
public static final List<Integer> WORKFLOW_SUCCESSOR_SKIP_EXECUTION = Arrays.asList(
WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason(),
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.reason);
}

View File

@ -5,10 +5,9 @@ package com.aizuda.easy.retry.common.log.constant;
* @date 2023-12-27
* @since 2.6.0
*/
public interface LogFieldConstant {
public interface LogFieldConstants {
String MDC_REMOTE = "remote";
String TIME = "time";
String TIME_STAMP = "time_stamp";
String THREAD = "thread";

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.common.log.dialect.log4j;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.dialect.AbstractLog;
import com.aizuda.easy.retry.common.log.factory.LogFactory;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.MDC;
@ -117,7 +117,7 @@ public class Log4jLog extends AbstractLog {
if (logger.isEnabledFor(log4jLevel)) {
if (remote) {
MDC.put(LogFieldConstant.MDC_REMOTE, remote.toString());
MDC.put(LogFieldConstants.MDC_REMOTE, remote.toString());
}
if (t == null) {
t = LogFactory.extractThrowable(arguments);

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.common.log.dialect.log4j2;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.dialect.AbstractLog;
import com.aizuda.easy.retry.common.log.factory.LogFactory;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import org.apache.log4j.MDC;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@ -135,7 +135,7 @@ public class Log4j2Log extends AbstractLog {
if (this.logger.isEnabled(level)) {
if (remote) {
MDC.put(LogFieldConstant.MDC_REMOTE, remote.toString());
MDC.put(LogFieldConstants.MDC_REMOTE, remote.toString());
}
if (this.logger instanceof AbstractLogger) {
((AbstractLogger) this.logger).logIfEnabled(fqcn, level, null, StrUtil.format(msgTemplate, arguments), t);

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.common.log.dialect.slf4j;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.dialect.AbstractLog;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.log.level.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -220,7 +220,7 @@ public class Slf4jLog extends AbstractLog {
private void setContextMap(Boolean remote) {
if (remote) {
Map<String, String> map = new LinkedHashMap<>();
map.put(LogFieldConstant.MDC_REMOTE, remote.toString());
map.put(LogFieldConstants.MDC_REMOTE, remote.toString());
MDC.setContextMap(map);
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.common.log.dto;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import java.util.ArrayList;
import java.util.List;
@ -37,37 +37,37 @@ public class LogContentDTO {
}
public void addTimeField(String time) {
this.addField(LogFieldConstant.TIME, time);
this.addField(LogFieldConstants.TIME, time);
}
public void addTimeStamp(Long timeStamp) {
this.addField(LogFieldConstant.TIME_STAMP, String.valueOf(timeStamp));
this.addField(LogFieldConstants.TIME_STAMP, String.valueOf(timeStamp));
}
public Long getTimeStamp() {
return Long.parseLong(fieldList.stream().filter(taskLogFieldDTO -> !Objects.isNull(taskLogFieldDTO.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))
.get(LogFieldConstant.TIME_STAMP));
.get(LogFieldConstants.TIME_STAMP));
}
public void addLevelField(String level) {
this.addField(LogFieldConstant.LEVEL, level);
this.addField(LogFieldConstants.LEVEL, level);
}
public void addThreadField(String thread) {
this.addField(LogFieldConstant.THREAD, thread);
this.addField(LogFieldConstants.THREAD, thread);
}
public void addMessageField(String message) {
this.addField(LogFieldConstant.MESSAGE, message);
this.addField(LogFieldConstants.MESSAGE, message);
}
public void addLocationField(String location) {
this.addField(LogFieldConstant.LOCATION, location);
this.addField(LogFieldConstants.LOCATION, location);
}
public void addThrowableField(String throwable) {
this.addField(LogFieldConstant.THROWABLE, throwable);
this.addField(LogFieldConstants.THROWABLE, throwable);
}
}

View File

@ -4,6 +4,7 @@ import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
import com.aizuda.easy.retry.server.job.task.dto.*;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
@ -32,15 +33,22 @@ public interface JobTaskConverter {
JobTaskConverter INSTANCE = Mappers.getMapper(JobTaskConverter.class);
@Mappings(
@Mapping(source = "id", target = "jobId")
@Mapping(source = "id", target = "jobId")
)
JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTaskDTO job);
@Mappings(
@Mapping(source = "id", target = "jobId")
@Mapping(source = "id", target = "jobId")
)
JobTaskPrepareDTO toJobTaskPrepare(Job job);
@Mappings({
@Mapping(source = "job.id", target = "jobId"),
@Mapping(source = "job.namespaceId", target = "namespaceId"),
@Mapping(source = "job.groupName", target = "groupName")
})
JobTaskPrepareDTO toJobTaskPrepare(Job job, WorkflowExecutorContext context);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategyContext context);
@ -79,14 +87,14 @@ public interface JobTaskConverter {
DispatchJobRequest toDispatchJobRequest(RealJobExecutorDTO realJobExecutorDTO);
@Mappings({
@Mapping(source = "jobTask.groupName", target = "groupName"),
@Mapping(source = "jobTask.jobId", target = "jobId"),
@Mapping(source = "jobTask.taskBatchId", target = "taskBatchId"),
@Mapping(source = "jobTask.id", target = "taskId"),
@Mapping(source = "jobTask.argsStr", target = "argsStr"),
@Mapping(source = "jobTask.argsType", target = "argsType"),
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
@Mapping(source = "jobTask.namespaceId", target = "namespaceId")
@Mapping(source = "jobTask.groupName", target = "groupName"),
@Mapping(source = "jobTask.jobId", target = "jobId"),
@Mapping(source = "jobTask.taskBatchId", target = "taskBatchId"),
@Mapping(source = "jobTask.id", target = "taskId"),
@Mapping(source = "jobTask.argsStr", target = "argsStr"),
@Mapping(source = "jobTask.argsType", target = "argsType"),
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
@Mapping(source = "jobTask.namespaceId", target = "namespaceId")
})
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);
@ -95,7 +103,7 @@ public interface JobTaskConverter {
JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context);
@Mappings(
@Mapping(source = "id", target = "taskId")
@Mapping(source = "id", target = "taskId")
)
JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask);

View File

@ -8,7 +8,7 @@ import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.dto.TaskLogFieldDTO;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
@ -36,11 +36,11 @@ public class EasyRetryServerLogbackAppender<E> extends UnsynchronizedAppenderBas
// Not job context
if (!(eventObject instanceof LoggingEvent) || Objects.isNull(
MDC.getMDCAdapter().get(LogFieldConstant.MDC_REMOTE))) {
MDC.getMDCAdapter().get(LogFieldConstants.MDC_REMOTE))) {
return;
}
MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE);
MDC.getMDCAdapter().remove(LogFieldConstants.MDC_REMOTE);
// Prepare processing
((LoggingEvent) eventObject).prepareForDeferredProcessing();
LoggingEvent event = (LoggingEvent) eventObject;

View File

@ -79,7 +79,7 @@ public class JobExecutorActor extends AbstractActor {
} catch (Exception e) {
EasyRetryLog.LOCAL.error("job executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason());
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SpringContext.CONTEXT.publishEvent(new JobTaskFailAlarmEvent(taskExecute.getTaskBatchId()));
} finally {
getContext().stop(getSelf());

View File

@ -35,7 +35,6 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -66,7 +65,7 @@ public class WorkflowExecutorActor extends AbstractActor {
doExecutor(taskExecute);
} catch (Exception e) {
EasyRetryLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason());
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
// TODO 发送通知
} finally {
getContext().stop(getSelf());
@ -108,7 +107,7 @@ public class WorkflowExecutorActor extends AbstractActor {
parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTE::contains)) {
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}

View File

@ -8,7 +8,6 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
@ -17,7 +16,6 @@ import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatc
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -33,13 +31,10 @@ import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.io.IOException;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* @author xiaowoniu
@ -98,19 +93,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
}
protected boolean preValidate(WorkflowExecutorContext context) {
// 无需处理
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
return false;
}
return doPreValidate(context);
}
@ -162,6 +144,20 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return jobTask;
}
public void generate(WorkflowExecutorContext context) {
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.YES.getStatus())) {
return;
}
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
}
@Override
public void afterPropertiesSet() {
WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this);

View File

@ -5,6 +5,7 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
@ -36,6 +37,7 @@ import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@ -65,12 +67,29 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
protected void doExecute(WorkflowExecutorContext context) {
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class);
int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
// 初始化上下状态
context.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
context.setOperationReason(JobOperationReasonEnum.NONE.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
} else {
invokeCallback(context);
}
// 执行下一个节点
workflowTaskExecutor(context);
}
private void invokeCallback(WorkflowExecutorContext context) {
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class);
String message = StrUtil.EMPTY;
String result = null;
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set(SECRET, decisionConfig.getSecret());
@ -79,28 +98,29 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getClientInfo)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
.select(JobTask::getResultMessage, JobTask::getClientInfo)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks);
context.setTaskResult(JsonUtil.toJsonString(callbackParamsList));
String result = null;
try {
Map<String, String> uriVariables = new HashMap<>();
uriVariables.put(SECRET, decisionConfig.getSecret());
ResponseEntity<String> response = buildRetryer(decisionConfig).call(
() -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables));
() -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables));
result = response.getBody();
EasyRetryLog.LOCAL.info("回调结果. webHook:[{}],结果: [{}]", decisionConfig.getWebhook(), result);
} catch (Exception e) {
EasyRetryLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(),
context.getTaskResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
context.getTaskResult(), e);
context.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
Throwable throwable = e;
if (e.getClass().isAssignableFrom(RetryException.class)) {
@ -111,13 +131,9 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
message = throwable.getMessage();
}
workflowTaskExecutor(context);
context.setTaskBatchStatus(taskBatchStatus);
context.setOperationReason(operationReason);
context.setJobTaskStatus(jobTaskStatus);
context.setEvaluationResult(result);
context.setLogMessage(message);
}
private static Retryer<ResponseEntity<String>> buildRetryer(CallbackConfig decisionConfig) {
@ -137,7 +153,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
return retryer;
}
@Override
protected boolean doPreValidate(WorkflowExecutorContext context) {
return true;

View File

@ -61,7 +61,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
// 多个条件节点直接是或的关系只要一个成功其他节点就取消且是无需处理状态
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
jobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason();
operationReason = JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason();
} else {
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) {
@ -100,12 +100,12 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
context.setTaskResult(JsonUtil.toJsonString(taskResult));
result = Optional.ofNullable(tempResult).orElse(Boolean.FALSE);
if (!result) {
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FOR_FALSE.getReason();
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
}
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getTaskResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
}
@ -146,7 +146,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
logMetaDTO.setJobId(SystemConstants.DECISION_JOB_ID);
logMetaDTO.setTaskId(jobTask.getId());
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()
|| JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason() == context.getOperationReason()) {
|| JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() == context.getOperationReason()) {
EasyRetryLog.REMOTE.info("workflowNodeId:[{}]决策完成. 上下文:[{}] 决策结果:[{}] <|>{}<|>",
context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), logMetaDTO);
} else {

View File

@ -1,15 +1,24 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @author xiaowoniu
* @date 2023-12-24 08:09:14
@ -19,6 +28,8 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.JOB_TASK;
@ -41,13 +52,32 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
protected void doExecute(WorkflowExecutorContext context) {
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
// 执行下一个节点
workflowTaskExecutor(context);
} else {
invokeJobTask(context);
}
}
private static void invokeJobTask(final WorkflowExecutorContext context) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob(), context);
// jobTaskPrepare.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
// jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
// jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
// jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);

View File

@ -5,7 +5,6 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@ -19,11 +18,9 @@ import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
@ -105,7 +102,8 @@ public class WorkflowBatchHandler {
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) {
// 只要叶子节点不是无需处理的都是失败
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason() != jobTaskBatch.getOperationReason()) {
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason()
&& JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
}
}
@ -133,7 +131,7 @@ public class WorkflowBatchHandler {
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
// 只要是无需处理的说明后面的子节点都不需要处理了isNeedProcess为false
if (JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTE.contains(jobTaskBatch.getOperationReason())) {
if (JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
isNeedProcess = false;
continue;
}
@ -229,7 +227,7 @@ public class WorkflowBatchHandler {
// 判定条件节点是否已经执行完成
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId);
if (Objects.nonNull(parentJobTaskBatch) &&
JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason()
JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()
== parentJobTaskBatch.getOperationReason()) {
return;
}

View File

@ -59,7 +59,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
// 超时停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(prepare.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(prepare);
stopJobContext.setJobOperationReason(JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason());
stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}

View File

@ -54,7 +54,7 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]",
prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason());
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
}
}

View File

@ -40,8 +40,6 @@ import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.MDC;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -178,7 +176,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
// 只为前端展示提供
nodeInfo.setTaskBatchStatus(NOT_HANDLE_STATUS);
jobBatchResponseVO.setTaskBatchStatus(NOT_HANDLE_STATUS);
jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason());
jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
nodeInfo.setJobBatchList(Lists.newArrayList(jobBatchResponseVO));
}
try {
@ -203,7 +201,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
}
private static boolean isNoOperation(JobTaskBatch i) {
return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTE.contains(i.getOperationReason())
return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(i.getOperationReason())
|| i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus();
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title>
<script type="module" crossorigin src="./assets/n2RgmoEm.js"></script>
<link rel="stylesheet" crossorigin href="./assets/lzdWuqGY.css">
<script type="module" crossorigin src="./assets/gfVhN6lL.js"></script>
<link rel="stylesheet" crossorigin href="./assets/-o3uFske.css">
</head>
<body>

View File

@ -7,32 +7,32 @@
title="日志详情"
@cancel="onCancel">
<div class="log">
<div class="scroller">
<div class="gutters">
<div style="margin-top: 4px"></div>
<div v-for="(log, index) in logList" :key="index">
<div class="gutter-element">{{ index + 1 }}</div>
<div style="height: 25px"></div>
</div>
</div>
<div class="content">
<div class="line" v-for="log in logList" :key="log.time_stamp">
<div class="flex">
<div class="text" style="color: #2db7f5">{{ timestampToDate(log.time_stamp) }}</div>
<div class="text" :style="{ color: LevelEnum[log.level].color }">
{{ log.level.length === 4 ? log.level + '\t' : log.level }}
<table class="scroller">
<tbody>
<tr v-for="(log, index) in logList" :key="index">
<td class="index">
{{ index + 1 }}
</td>
<td>
<div class="content">
<div class="line">
<div class="flex">
<div class="text" style="color: #2db7f5">{{ timestampToDate(log.time_stamp) }}</div>
<div class="text" :style="{ color: LevelEnum[log.level].color }">
{{ log.level.length === 4 ? log.level + ' ' : log.level }}
</div>
<div class="text" style="color: #00a3a3">[{{ log.thread }}]</div>
<div class="text" style="color: #a771bf; font-weight: 500">{{ log.location }}</div>
<div class="text">:</div>
</div>
<div class="text" style="font-size: 16px">{{ log.message }}</div>
<div class="text" style="font-size: 16px">{{ log.throwable }}</div>
</div>
<div class="text" style="color: #00a3a3">[{{ log.thread }}]</div>
<div class="text" style="color: #a771bf; font-weight: 500">{{ log.location }}</div>
<div class="text">:</div>
</div>
<div class="text" style="font-size: 16px">{{ log.message }}</div>
</div>
<div style="text-align: center; width: 100vw">
<a-spin :indicator="indicator" :spinning="!finished"/>
</div>
</div>
</div>
</td>
</tr>
</tbody>
</table>
</div>
</a-modal>
</template>
@ -179,6 +179,19 @@ export default {
position: relative;
z-index: 0;
.index{
width: 32px;
min-width: 32px;
height: 100%;
background-color: #1e1f22;
color: #7d8799;
text-align: center;
vertical-align: top;
padding-top: 4px;
font-size: 16px;
z-index: 200;
}
.gutters {
min-height: 100%;
position: sticky;
@ -211,13 +224,13 @@ export default {
tab-size: 4;
caret-color: transparent !important;
margin: 0;
flex-grow: 2;
flex-shrink: 0;
//flex-grow: 2;
//flex-shrink: 0;
// display: block;
white-space: pre;
word-wrap: normal;
box-sizing: border-box;
min-height: 100%;
//word-wrap: normal;
//box-sizing: border-box;
//min-height: 100%;
padding: 4px 8px;
outline: none;
color: #bcbec4;
@ -238,7 +251,6 @@ export default {
.text {
font-size: 16px;
height: 25px;
}
}
}