feat: 2.6.0

1. 修复任务和回调节点关闭后继节点无法运行问题
This commit is contained in:
byteblogs168 2024-01-19 10:53:59 +08:00
parent c20a63a7cc
commit bbecc0a4e8
25 changed files with 205 additions and 150 deletions

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.client.common.appender;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog; import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil; import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO; import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import org.apache.log4j.MDC; import org.apache.log4j.MDC;
import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Filter;
@ -32,11 +32,11 @@ public class EasyRetryLog4j2Appender extends AbstractAppender {
public void append(LogEvent event) { public void append(LogEvent event) {
// Not job context // Not job context
if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstant.MDC_REMOTE))) { if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstants.MDC_REMOTE))) {
return; return;
} }
MDC.remove(LogFieldConstant.MDC_REMOTE); MDC.remove(LogFieldConstants.MDC_REMOTE);
LogContentDTO logContentDTO = new LogContentDTO(); LogContentDTO logContentDTO = new LogContentDTO();
logContentDTO.addTimeStamp(event.getTimeMillis()); logContentDTO.addTimeStamp(event.getTimeMillis());
logContentDTO.addLevelField(event.getLevel().name()); logContentDTO.addLevelField(event.getLevel().name());

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.client.common.appender;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog; import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil; import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO; import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.MDC; import org.apache.log4j.MDC;
@ -28,11 +28,11 @@ public class EasyRetryLog4jAppender extends AppenderSkeleton {
protected void append(LoggingEvent event) { protected void append(LoggingEvent event) {
// Not job context // Not job context
if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstant.MDC_REMOTE))) { if (Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(MDC.get(LogFieldConstants.MDC_REMOTE))) {
return; return;
} }
MDC.remove(LogFieldConstant.MDC_REMOTE); MDC.remove(LogFieldConstants.MDC_REMOTE);
LogContentDTO logContentDTO = new LogContentDTO(); LogContentDTO logContentDTO = new LogContentDTO();
logContentDTO.addTimeStamp(event.getTimeStamp()); logContentDTO.addTimeStamp(event.getTimeStamp());
logContentDTO.addLevelField(event.getLevel().toString()); logContentDTO.addLevelField(event.getLevel().toString());

View File

@ -9,7 +9,7 @@ import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.aizuda.easy.retry.client.common.report.AsyncReportLog; import com.aizuda.easy.retry.client.common.report.AsyncReportLog;
import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil; import com.aizuda.easy.retry.client.common.util.ThreadLocalLogUtil;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO; import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import org.slf4j.MDC; import org.slf4j.MDC;
@ -33,11 +33,11 @@ public class EasyRetryLogbackAppender<E> extends UnsynchronizedAppenderBase<E> {
// Not job context // Not job context
if (!(eventObject instanceof LoggingEvent) if (!(eventObject instanceof LoggingEvent)
|| Objects.isNull(ThreadLocalLogUtil.getContext()) || Objects.isNull(ThreadLocalLogUtil.getContext())
|| Objects.isNull(MDC.get(LogFieldConstant.MDC_REMOTE))) { || Objects.isNull(MDC.get(LogFieldConstants.MDC_REMOTE))) {
return; return;
} }
MDC.remove(LogFieldConstant.MDC_REMOTE); MDC.remove(LogFieldConstants.MDC_REMOTE);
LogContentDTO logContentDTO = new LogContentDTO(); LogContentDTO logContentDTO = new LogContentDTO();
// Prepare processing // Prepare processing

View File

@ -19,20 +19,20 @@ import java.util.List;
public enum JobOperationReasonEnum { public enum JobOperationReasonEnum {
NONE(0, StrUtil.EMPTY), NONE(0, StrUtil.EMPTY),
EXECUTE_TIMEOUT(1, "任务执行超时"), TASK_EXECUTION_TIMEOUT(1, "任务执行超时"),
NOT_CLIENT(2, "无客户端节点"), NOT_CLIENT(2, "无客户端节点"),
JOB_CLOSED(3, "JOB已关闭"), JOB_CLOSED(3, "JOB已关闭"),
JOB_DISCARD(4, "任务丢弃"), JOB_DISCARD(4, "任务丢弃"),
JOB_OVERLAY(5, "任务被覆盖"), JOB_OVERLAY(5, "任务被覆盖"),
NOT_EXECUTE_TASK(6, "无可执行任务项"), NOT_EXECUTION_TASK(6, "无可执行任务项"),
TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"), TASK_EXECUTION_ERROR(7, "任务执行期间发生非预期异常"),
MANNER_STOP(8, "手动停止"), MANNER_STOP(8, "手动停止"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(9, "条件节点执行异常"), WORKFLOW_CONDITION_NODE_EXECUTION_ERROR(9, "条件节点执行异常"),
JOB_TASK_INTERRUPTED(10, "任务中断"), JOB_TASK_INTERRUPTED(10, "任务中断"),
WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR(11, "回调节点执行异常"), WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR(11, "回调节点执行异常"),
WORKFLOW_NODE_NO_OPERATION_REQUIRED(12, "无需处理"), WORKFLOW_NODE_NO_REQUIRED(12, "无需处理"),
WORKFLOW_NODE_EXECUTOR_ERROR_SKIP(13, "节点处理失败并跳过"), WORKFLOW_NODE_CLOSED_SKIP_EXECUTION(13, "节点关闭跳过执行"),
WORKFLOW_DECISION_FOR_FALSE(14, "判定未通过"), WORKFLOW_DECISION_FAILED(14, "判定未通过"),
; ;
@ -43,8 +43,8 @@ public enum JobOperationReasonEnum {
/** /**
* 工作流后续节点跳过执行配置 * 工作流后续节点跳过执行配置
*/ */
public static final List<Integer> WORKFLOW_SUCCESSOR_SKIP_EXECUTE = Arrays.asList( public static final List<Integer> WORKFLOW_SUCCESSOR_SKIP_EXECUTION = Arrays.asList(
WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason(), WORKFLOW_DECISION_FOR_FALSE.getReason(), WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason(),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.reason); WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.reason);
} }

View File

@ -5,10 +5,9 @@ package com.aizuda.easy.retry.common.log.constant;
* @date 2023-12-27 * @date 2023-12-27
* @since 2.6.0 * @since 2.6.0
*/ */
public interface LogFieldConstant { public interface LogFieldConstants {
String MDC_REMOTE = "remote"; String MDC_REMOTE = "remote";
String TIME = "time"; String TIME = "time";
String TIME_STAMP = "time_stamp"; String TIME_STAMP = "time_stamp";
String THREAD = "thread"; String THREAD = "thread";

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.common.log.dialect.log4j;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.dialect.AbstractLog; import com.aizuda.easy.retry.common.log.dialect.AbstractLog;
import com.aizuda.easy.retry.common.log.factory.LogFactory; import com.aizuda.easy.retry.common.log.factory.LogFactory;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.MDC; import org.apache.log4j.MDC;
@ -117,7 +117,7 @@ public class Log4jLog extends AbstractLog {
if (logger.isEnabledFor(log4jLevel)) { if (logger.isEnabledFor(log4jLevel)) {
if (remote) { if (remote) {
MDC.put(LogFieldConstant.MDC_REMOTE, remote.toString()); MDC.put(LogFieldConstants.MDC_REMOTE, remote.toString());
} }
if (t == null) { if (t == null) {
t = LogFactory.extractThrowable(arguments); t = LogFactory.extractThrowable(arguments);

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.common.log.dialect.log4j2;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.dialect.AbstractLog; import com.aizuda.easy.retry.common.log.dialect.AbstractLog;
import com.aizuda.easy.retry.common.log.factory.LogFactory; import com.aizuda.easy.retry.common.log.factory.LogFactory;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import org.apache.log4j.MDC; import org.apache.log4j.MDC;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -135,7 +135,7 @@ public class Log4j2Log extends AbstractLog {
if (this.logger.isEnabled(level)) { if (this.logger.isEnabled(level)) {
if (remote) { if (remote) {
MDC.put(LogFieldConstant.MDC_REMOTE, remote.toString()); MDC.put(LogFieldConstants.MDC_REMOTE, remote.toString());
} }
if (this.logger instanceof AbstractLogger) { if (this.logger instanceof AbstractLogger) {
((AbstractLogger) this.logger).logIfEnabled(fqcn, level, null, StrUtil.format(msgTemplate, arguments), t); ((AbstractLogger) this.logger).logIfEnabled(fqcn, level, null, StrUtil.format(msgTemplate, arguments), t);

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.common.log.dialect.slf4j;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.dialect.AbstractLog; import com.aizuda.easy.retry.common.log.dialect.AbstractLog;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.log.level.Level; import com.aizuda.easy.retry.common.log.level.Level;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -220,7 +220,7 @@ public class Slf4jLog extends AbstractLog {
private void setContextMap(Boolean remote) { private void setContextMap(Boolean remote) {
if (remote) { if (remote) {
Map<String, String> map = new LinkedHashMap<>(); Map<String, String> map = new LinkedHashMap<>();
map.put(LogFieldConstant.MDC_REMOTE, remote.toString()); map.put(LogFieldConstants.MDC_REMOTE, remote.toString());
MDC.setContextMap(map); MDC.setContextMap(map);
} }
} }

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.common.log.dto; package com.aizuda.easy.retry.common.log.dto;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -37,37 +37,37 @@ public class LogContentDTO {
} }
public void addTimeField(String time) { public void addTimeField(String time) {
this.addField(LogFieldConstant.TIME, time); this.addField(LogFieldConstants.TIME, time);
} }
public void addTimeStamp(Long timeStamp) { public void addTimeStamp(Long timeStamp) {
this.addField(LogFieldConstant.TIME_STAMP, String.valueOf(timeStamp)); this.addField(LogFieldConstants.TIME_STAMP, String.valueOf(timeStamp));
} }
public Long getTimeStamp() { public Long getTimeStamp() {
return Long.parseLong(fieldList.stream().filter(taskLogFieldDTO -> !Objects.isNull(taskLogFieldDTO.getValue())) return Long.parseLong(fieldList.stream().filter(taskLogFieldDTO -> !Objects.isNull(taskLogFieldDTO.getValue()))
.collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue)) .collect(Collectors.toMap(TaskLogFieldDTO::getName, TaskLogFieldDTO::getValue))
.get(LogFieldConstant.TIME_STAMP)); .get(LogFieldConstants.TIME_STAMP));
} }
public void addLevelField(String level) { public void addLevelField(String level) {
this.addField(LogFieldConstant.LEVEL, level); this.addField(LogFieldConstants.LEVEL, level);
} }
public void addThreadField(String thread) { public void addThreadField(String thread) {
this.addField(LogFieldConstant.THREAD, thread); this.addField(LogFieldConstants.THREAD, thread);
} }
public void addMessageField(String message) { public void addMessageField(String message) {
this.addField(LogFieldConstant.MESSAGE, message); this.addField(LogFieldConstants.MESSAGE, message);
} }
public void addLocationField(String location) { public void addLocationField(String location) {
this.addField(LogFieldConstant.LOCATION, location); this.addField(LogFieldConstants.LOCATION, location);
} }
public void addThrowableField(String throwable) { public void addThrowableField(String throwable) {
this.addField(LogFieldConstant.THROWABLE, throwable); this.addField(LogFieldConstants.THROWABLE, throwable);
} }
} }

View File

@ -4,6 +4,7 @@ import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest; import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
import com.aizuda.easy.retry.server.job.task.dto.*; import com.aizuda.easy.retry.server.job.task.dto.*;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext; import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerateContext; import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext; import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
@ -32,15 +33,22 @@ public interface JobTaskConverter {
JobTaskConverter INSTANCE = Mappers.getMapper(JobTaskConverter.class); JobTaskConverter INSTANCE = Mappers.getMapper(JobTaskConverter.class);
@Mappings( @Mappings(
@Mapping(source = "id", target = "jobId") @Mapping(source = "id", target = "jobId")
) )
JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTaskDTO job); JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTaskDTO job);
@Mappings( @Mappings(
@Mapping(source = "id", target = "jobId") @Mapping(source = "id", target = "jobId")
) )
JobTaskPrepareDTO toJobTaskPrepare(Job job); JobTaskPrepareDTO toJobTaskPrepare(Job job);
@Mappings({
@Mapping(source = "job.id", target = "jobId"),
@Mapping(source = "job.namespaceId", target = "namespaceId"),
@Mapping(source = "job.groupName", target = "groupName")
})
JobTaskPrepareDTO toJobTaskPrepare(Job job, WorkflowExecutorContext context);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO); JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategyContext context); JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategyContext context);
@ -79,14 +87,14 @@ public interface JobTaskConverter {
DispatchJobRequest toDispatchJobRequest(RealJobExecutorDTO realJobExecutorDTO); DispatchJobRequest toDispatchJobRequest(RealJobExecutorDTO realJobExecutorDTO);
@Mappings({ @Mappings({
@Mapping(source = "jobTask.groupName", target = "groupName"), @Mapping(source = "jobTask.groupName", target = "groupName"),
@Mapping(source = "jobTask.jobId", target = "jobId"), @Mapping(source = "jobTask.jobId", target = "jobId"),
@Mapping(source = "jobTask.taskBatchId", target = "taskBatchId"), @Mapping(source = "jobTask.taskBatchId", target = "taskBatchId"),
@Mapping(source = "jobTask.id", target = "taskId"), @Mapping(source = "jobTask.id", target = "taskId"),
@Mapping(source = "jobTask.argsStr", target = "argsStr"), @Mapping(source = "jobTask.argsStr", target = "argsStr"),
@Mapping(source = "jobTask.argsType", target = "argsType"), @Mapping(source = "jobTask.argsType", target = "argsType"),
@Mapping(source = "jobTask.extAttrs", target = "extAttrs"), @Mapping(source = "jobTask.extAttrs", target = "extAttrs"),
@Mapping(source = "jobTask.namespaceId", target = "namespaceId") @Mapping(source = "jobTask.namespaceId", target = "namespaceId")
}) })
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask); RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);
@ -95,7 +103,7 @@ public interface JobTaskConverter {
JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context); JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context);
@Mappings( @Mappings(
@Mapping(source = "id", target = "taskId") @Mapping(source = "id", target = "taskId")
) )
JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask); JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask);

View File

@ -8,7 +8,7 @@ import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.UnsynchronizedAppenderBase; import ch.qos.logback.core.UnsynchronizedAppenderBase;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.log.constant.LogFieldConstant; import com.aizuda.easy.retry.common.log.constant.LogFieldConstants;
import com.aizuda.easy.retry.common.log.dto.LogContentDTO; import com.aizuda.easy.retry.common.log.dto.LogContentDTO;
import com.aizuda.easy.retry.common.log.dto.TaskLogFieldDTO; import com.aizuda.easy.retry.common.log.dto.TaskLogFieldDTO;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
@ -36,11 +36,11 @@ public class EasyRetryServerLogbackAppender<E> extends UnsynchronizedAppenderBas
// Not job context // Not job context
if (!(eventObject instanceof LoggingEvent) || Objects.isNull( if (!(eventObject instanceof LoggingEvent) || Objects.isNull(
MDC.getMDCAdapter().get(LogFieldConstant.MDC_REMOTE))) { MDC.getMDCAdapter().get(LogFieldConstants.MDC_REMOTE))) {
return; return;
} }
MDC.getMDCAdapter().remove(LogFieldConstant.MDC_REMOTE); MDC.getMDCAdapter().remove(LogFieldConstants.MDC_REMOTE);
// Prepare processing // Prepare processing
((LoggingEvent) eventObject).prepareForDeferredProcessing(); ((LoggingEvent) eventObject).prepareForDeferredProcessing();
LoggingEvent event = (LoggingEvent) eventObject; LoggingEvent event = (LoggingEvent) eventObject;

View File

@ -79,7 +79,7 @@ public class JobExecutorActor extends AbstractActor {
} catch (Exception e) { } catch (Exception e) {
EasyRetryLog.LOCAL.error("job executor exception. [{}]", taskExecute, e); EasyRetryLog.LOCAL.error("job executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason()); handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SpringContext.CONTEXT.publishEvent(new JobTaskFailAlarmEvent(taskExecute.getTaskBatchId())); SpringContext.CONTEXT.publishEvent(new JobTaskFailAlarmEvent(taskExecute.getTaskBatchId()));
} finally { } finally {
getContext().stop(getSelf()); getContext().stop(getSelf());

View File

@ -35,7 +35,6 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -66,7 +65,7 @@ public class WorkflowExecutorActor extends AbstractActor {
doExecutor(taskExecute); doExecutor(taskExecute);
} catch (Exception e) { } catch (Exception e) {
EasyRetryLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e); EasyRetryLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason()); handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
// TODO 发送通知 // TODO 发送通知
} finally { } finally {
getContext().stop(getSelf()); getContext().stop(getSelf());
@ -108,7 +107,7 @@ public class WorkflowExecutorActor extends AbstractActor {
parentJobTaskBatchList.stream() parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason) .map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTE::contains)) { .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return; return;
} }

View File

@ -8,7 +8,6 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
@ -17,7 +16,6 @@ import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatc
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -33,13 +31,10 @@ import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import java.io.IOException;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.Duration; import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -98,19 +93,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
} }
protected boolean preValidate(WorkflowExecutorContext context) { protected boolean preValidate(WorkflowExecutorContext context) {
// 无需处理
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
return false;
}
return doPreValidate(context); return doPreValidate(context);
} }
@ -162,6 +144,20 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return jobTask; return jobTask;
} }
public void generate(WorkflowExecutorContext context) {
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.YES.getStatus())) {
return;
}
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
}
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this); WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this);

View File

@ -5,6 +5,7 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.log.EasyRetryLog;
@ -36,6 +37,7 @@ import org.springframework.web.client.RestTemplate;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -65,12 +67,29 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
@Override @Override
protected void doExecute(WorkflowExecutorContext context) { protected void doExecute(WorkflowExecutorContext context) {
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class); // 初始化上下状态
int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus(); context.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
int operationReason = JobOperationReasonEnum.NONE.getReason(); context.setOperationReason(JobOperationReasonEnum.NONE.getReason());
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
} else {
invokeCallback(context);
}
// 执行下一个节点
workflowTaskExecutor(context);
}
private void invokeCallback(WorkflowExecutorContext context) {
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class);
String message = StrUtil.EMPTY; String message = StrUtil.EMPTY;
String result = null;
HttpHeaders requestHeaders = new HttpHeaders(); HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set(SECRET, decisionConfig.getSecret()); requestHeaders.set(SECRET, decisionConfig.getSecret());
@ -79,28 +98,29 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT); requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>() List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getClientInfo) .select(JobTask::getResultMessage, JobTask::getClientInfo)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())); .eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks); List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks);
context.setTaskResult(JsonUtil.toJsonString(callbackParamsList)); context.setTaskResult(JsonUtil.toJsonString(callbackParamsList));
String result = null;
try { try {
Map<String, String> uriVariables = new HashMap<>(); Map<String, String> uriVariables = new HashMap<>();
uriVariables.put(SECRET, decisionConfig.getSecret()); uriVariables.put(SECRET, decisionConfig.getSecret());
ResponseEntity<String> response = buildRetryer(decisionConfig).call( ResponseEntity<String> response = buildRetryer(decisionConfig).call(
() -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST, () -> restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables)); new HttpEntity<>(callbackParamsList, requestHeaders), String.class, uriVariables));
result = response.getBody(); result = response.getBody();
EasyRetryLog.LOCAL.info("回调结果. webHook:[{}],结果: [{}]", decisionConfig.getWebhook(), result); EasyRetryLog.LOCAL.info("回调结果. webHook:[{}],结果: [{}]", decisionConfig.getWebhook(), result);
} catch (Exception e) { } catch (Exception e) {
EasyRetryLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), EasyRetryLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(),
context.getTaskResult(), e); context.getTaskResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR.getReason(); context.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); context.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
Throwable throwable = e; Throwable throwable = e;
if (e.getClass().isAssignableFrom(RetryException.class)) { if (e.getClass().isAssignableFrom(RetryException.class)) {
@ -111,13 +131,9 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
message = throwable.getMessage(); message = throwable.getMessage();
} }
workflowTaskExecutor(context);
context.setTaskBatchStatus(taskBatchStatus);
context.setOperationReason(operationReason);
context.setJobTaskStatus(jobTaskStatus);
context.setEvaluationResult(result); context.setEvaluationResult(result);
context.setLogMessage(message); context.setLogMessage(message);
} }
private static Retryer<ResponseEntity<String>> buildRetryer(CallbackConfig decisionConfig) { private static Retryer<ResponseEntity<String>> buildRetryer(CallbackConfig decisionConfig) {
@ -137,7 +153,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
return retryer; return retryer;
} }
@Override @Override
protected boolean doPreValidate(WorkflowExecutorContext context) { protected boolean doPreValidate(WorkflowExecutorContext context) {
return true; return true;

View File

@ -61,7 +61,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
// 多个条件节点直接是或的关系只要一个成功其他节点就取消且是无需处理状态 // 多个条件节点直接是或的关系只要一个成功其他节点就取消且是无需处理状态
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
jobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus(); jobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason(); operationReason = JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason();
} else { } else {
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class); DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) { if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) {
@ -100,12 +100,12 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
context.setTaskResult(JsonUtil.toJsonString(taskResult)); context.setTaskResult(JsonUtil.toJsonString(taskResult));
result = Optional.ofNullable(tempResult).orElse(Boolean.FALSE); result = Optional.ofNullable(tempResult).orElse(Boolean.FALSE);
if (!result) { if (!result) {
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FOR_FALSE.getReason(); operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getTaskResult(), e); log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getTaskResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason(); operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage(); message = e.getMessage();
} }
@ -146,7 +146,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
logMetaDTO.setJobId(SystemConstants.DECISION_JOB_ID); logMetaDTO.setJobId(SystemConstants.DECISION_JOB_ID);
logMetaDTO.setTaskId(jobTask.getId()); logMetaDTO.setTaskId(jobTask.getId());
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus() if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()
|| JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason() == context.getOperationReason()) { || JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() == context.getOperationReason()) {
EasyRetryLog.REMOTE.info("workflowNodeId:[{}]决策完成. 上下文:[{}] 决策结果:[{}] <|>{}<|>", EasyRetryLog.REMOTE.info("workflowNodeId:[{}]决策完成. 上下文:[{}] 决策结果:[{}] <|>{}<|>",
context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), logMetaDTO); context.getWorkflowNodeId(), context.getTaskResult(), context.getEvaluationResult(), logMetaDTO);
} else { } else {

View File

@ -1,15 +1,24 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow; package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects;
/** /**
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:09:14 * @date 2023-12-24 08:09:14
@ -19,6 +28,8 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor @RequiredArgsConstructor
public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override @Override
public WorkflowNodeTypeEnum getWorkflowNodeType() { public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.JOB_TASK; return WorkflowNodeTypeEnum.JOB_TASK;
@ -41,13 +52,32 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
@Override @Override
protected void doExecute(WorkflowExecutorContext context) { protected void doExecute(WorkflowExecutorContext context) {
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
// 执行下一个节点
workflowTaskExecutor(context);
} else {
invokeJobTask(context);
}
}
private static void invokeJobTask(final WorkflowExecutorContext context) {
// 生成任务批次 // 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob(), context);
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); // jobTaskPrepare.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId()); // jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); // jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId()); // jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef); actorRef.tell(jobTaskPrepare, actorRef);

View File

@ -5,7 +5,6 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@ -19,11 +18,9 @@ import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -105,7 +102,8 @@ public class WorkflowBatchHandler {
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) { for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) { if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) {
// 只要叶子节点不是无需处理的都是失败 // 只要叶子节点不是无需处理的都是失败
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason() != jobTaskBatch.getOperationReason()) { if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason()
&& JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
} }
} }
@ -133,7 +131,7 @@ public class WorkflowBatchHandler {
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) { for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
// 只要是无需处理的说明后面的子节点都不需要处理了isNeedProcess为false // 只要是无需处理的说明后面的子节点都不需要处理了isNeedProcess为false
if (JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTE.contains(jobTaskBatch.getOperationReason())) { if (JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) {
isNeedProcess = false; isNeedProcess = false;
continue; continue;
} }
@ -229,7 +227,7 @@ public class WorkflowBatchHandler {
// 判定条件节点是否已经执行完成 // 判定条件节点是否已经执行完成
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId); JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId);
if (Objects.nonNull(parentJobTaskBatch) && if (Objects.nonNull(parentJobTaskBatch) &&
JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason() JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()
== parentJobTaskBatch.getOperationReason()) { == parentJobTaskBatch.getOperationReason()) {
return; return;
} }

View File

@ -59,7 +59,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
// 超时停止任务 // 超时停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(prepare.getTaskType()); JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(prepare.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(prepare); TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(prepare);
stopJobContext.setJobOperationReason(JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason()); stopJobContext.setJobOperationReason(JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext); instanceInterrupt.stop(stopJobContext);
} }

View File

@ -54,7 +54,7 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]", log.info("任务执行超时.workflowTaskBatchId:[{}] delay:[{}] executorTimeout:[{}]",
prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout())); prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务 // 超时停止任务
workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.EXECUTE_TIMEOUT.getReason()); workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason());
} }
} }

View File

@ -40,8 +40,6 @@ import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph; import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.MDC;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -178,7 +176,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
// 只为前端展示提供 // 只为前端展示提供
nodeInfo.setTaskBatchStatus(NOT_HANDLE_STATUS); nodeInfo.setTaskBatchStatus(NOT_HANDLE_STATUS);
jobBatchResponseVO.setTaskBatchStatus(NOT_HANDLE_STATUS); jobBatchResponseVO.setTaskBatchStatus(NOT_HANDLE_STATUS);
jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason()); jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
nodeInfo.setJobBatchList(Lists.newArrayList(jobBatchResponseVO)); nodeInfo.setJobBatchList(Lists.newArrayList(jobBatchResponseVO));
} }
try { try {
@ -203,7 +201,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
} }
private static boolean isNoOperation(JobTaskBatch i) { private static boolean isNoOperation(JobTaskBatch i) {
return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTE.contains(i.getOperationReason()) return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(i.getOperationReason())
|| i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus(); || i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus();
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title> <title>Easy Retry</title>
<script type="module" crossorigin src="./assets/n2RgmoEm.js"></script> <script type="module" crossorigin src="./assets/gfVhN6lL.js"></script>
<link rel="stylesheet" crossorigin href="./assets/lzdWuqGY.css"> <link rel="stylesheet" crossorigin href="./assets/-o3uFske.css">
</head> </head>
<body> <body>

View File

@ -7,32 +7,32 @@
title="日志详情" title="日志详情"
@cancel="onCancel"> @cancel="onCancel">
<div class="log"> <div class="log">
<div class="scroller"> <table class="scroller">
<div class="gutters"> <tbody>
<div style="margin-top: 4px"></div> <tr v-for="(log, index) in logList" :key="index">
<div v-for="(log, index) in logList" :key="index"> <td class="index">
<div class="gutter-element">{{ index + 1 }}</div> {{ index + 1 }}
<div style="height: 25px"></div> </td>
</div> <td>
</div> <div class="content">
<div class="content"> <div class="line">
<div class="line" v-for="log in logList" :key="log.time_stamp"> <div class="flex">
<div class="flex"> <div class="text" style="color: #2db7f5">{{ timestampToDate(log.time_stamp) }}</div>
<div class="text" style="color: #2db7f5">{{ timestampToDate(log.time_stamp) }}</div> <div class="text" :style="{ color: LevelEnum[log.level].color }">
<div class="text" :style="{ color: LevelEnum[log.level].color }"> {{ log.level.length === 4 ? log.level + ' ' : log.level }}
{{ log.level.length === 4 ? log.level + '\t' : log.level }} </div>
<div class="text" style="color: #00a3a3">[{{ log.thread }}]</div>
<div class="text" style="color: #a771bf; font-weight: 500">{{ log.location }}</div>
<div class="text">:</div>
</div>
<div class="text" style="font-size: 16px">{{ log.message }}</div>
<div class="text" style="font-size: 16px">{{ log.throwable }}</div>
</div> </div>
<div class="text" style="color: #00a3a3">[{{ log.thread }}]</div>
<div class="text" style="color: #a771bf; font-weight: 500">{{ log.location }}</div>
<div class="text">:</div>
</div> </div>
<div class="text" style="font-size: 16px">{{ log.message }}</div> </td>
</div> </tr>
<div style="text-align: center; width: 100vw"> </tbody>
<a-spin :indicator="indicator" :spinning="!finished"/> </table>
</div>
</div>
</div>
</div> </div>
</a-modal> </a-modal>
</template> </template>
@ -179,6 +179,19 @@ export default {
position: relative; position: relative;
z-index: 0; z-index: 0;
.index{
width: 32px;
min-width: 32px;
height: 100%;
background-color: #1e1f22;
color: #7d8799;
text-align: center;
vertical-align: top;
padding-top: 4px;
font-size: 16px;
z-index: 200;
}
.gutters { .gutters {
min-height: 100%; min-height: 100%;
position: sticky; position: sticky;
@ -211,13 +224,13 @@ export default {
tab-size: 4; tab-size: 4;
caret-color: transparent !important; caret-color: transparent !important;
margin: 0; margin: 0;
flex-grow: 2; //flex-grow: 2;
flex-shrink: 0; //flex-shrink: 0;
// display: block; // display: block;
white-space: pre; white-space: pre;
word-wrap: normal; //word-wrap: normal;
box-sizing: border-box; //box-sizing: border-box;
min-height: 100%; //min-height: 100%;
padding: 4px 8px; padding: 4px 8px;
outline: none; outline: none;
color: #bcbec4; color: #bcbec4;
@ -238,7 +251,6 @@ export default {
.text { .text {
font-size: 16px; font-size: 16px;
height: 25px;
} }
} }
} }