Merge remote-tracking branch 'origin/dev' into dev
This commit is contained in:
commit
92d64617e4
22
DirectoryV3.xml
Normal file
22
DirectoryV3.xml
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<trees>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java" title="工作流执行"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobTaskPrepareActor.java" title="调度任务准备阶段"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java" title="处理工作流中各种节点类型"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java" title="回调节点"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java" title="条件节点"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java" title="任务节点"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowWorkflowExecutor.java" title="工作流节点"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow" title="工作流节点"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ClusterJobExecutor.java" title="集群任务"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/BroadcastTaskJobExecutor.java" title="广播任务"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapJobExecutor.java" title="Map任务"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/MapReduceJobExecutor.java" title="MapReduce任务"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/RequestClientActor.java" title="调用客户端执行任务"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/ShardingJobExecutor.java" title="分片任务"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job" title="任务执行"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/timer/WorkflowTimeoutCheckTask.java" title="工作流任务超时检查"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java" title="客户端执行任务完成回调"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClusterClientCallbackHandler.java" title="集群任务执行结果回调"/>
|
||||||
|
<tree path="/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java" title="任务执行结果处理"/>
|
||||||
|
</trees>
|
@ -81,11 +81,26 @@ public enum JobOperationReasonEnum {
|
|||||||
* 工作流决策未通过
|
* 工作流决策未通过
|
||||||
*/
|
*/
|
||||||
WORKFLOW_DECISION_FAILED(14, "Judgment not passed"),
|
WORKFLOW_DECISION_FAILED(14, "Judgment not passed"),
|
||||||
|
/**
|
||||||
|
* 手动调用
|
||||||
|
*/
|
||||||
|
MANUAL_CALL(15, "Manual call"),
|
||||||
|
/**
|
||||||
|
* 由工作流中被调用
|
||||||
|
*/
|
||||||
|
WORKFLOW_CALLED(16, "Called by workflow"),
|
||||||
|
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 原因
|
||||||
|
*/
|
||||||
private final int reason;
|
private final int reason;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 描述
|
||||||
|
*/
|
||||||
private final String desc;
|
private final String desc;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -51,7 +51,7 @@ public enum JobTaskBatchStatusEnum {
|
|||||||
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
|
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务完成 状态 包含 SUCCESS, FAIL, STOP, CANCEL
|
* 任务完成 状态 包含 SUCCESS 3, FAIL 4, STOP 5, CANCEL 6
|
||||||
*/
|
*/
|
||||||
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status);
|
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status);
|
||||||
|
|
||||||
|
@ -13,7 +13,13 @@ import lombok.Getter;
|
|||||||
@Getter
|
@Getter
|
||||||
public enum StatusEnum {
|
public enum StatusEnum {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 0、关闭、1、开启
|
||||||
|
*/
|
||||||
NO(0),
|
NO(0),
|
||||||
|
/**
|
||||||
|
* 0、关闭、1、开启
|
||||||
|
*/
|
||||||
YES(1);
|
YES(1);
|
||||||
|
|
||||||
private final Integer status;
|
private final Integer status;
|
||||||
|
@ -261,7 +261,7 @@ public class ActorGenerator {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Job任务执行阶段actor
|
* Job任务执行阶段actor
|
||||||
*
|
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorActor
|
||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobTaskExecutorActor() {
|
public static ActorRef jobTaskExecutorActor() {
|
||||||
@ -288,7 +288,7 @@ public class ActorGenerator {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Job任务执行结果actor
|
* Job任务执行结果actor
|
||||||
*
|
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorResultActor
|
||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobTaskExecutorResultActor() {
|
public static ActorRef jobTaskExecutorResultActor() {
|
||||||
|
@ -28,6 +28,8 @@ import java.time.Duration;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 客户端执行任务完成回调处理
|
||||||
|
*
|
||||||
* @author opensnail
|
* @author opensnail
|
||||||
* @date 2023-10-03 23:12:33
|
* @date 2023-10-03 23:12:33
|
||||||
* @since 2.4.0
|
* @since 2.4.0
|
||||||
@ -128,6 +130,9 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判定是否需要重试
|
||||||
|
*/
|
||||||
private boolean isNeedRetry(ClientCallbackContext context) {
|
private boolean isNeedRetry(ClientCallbackContext context) {
|
||||||
|
|
||||||
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
|
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
|
||||||
|
@ -18,6 +18,9 @@ public class ClientCallbackFactory {
|
|||||||
CACHE.put(taskInstanceType, callbackHandler);
|
CACHE.put(taskInstanceType, callbackHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据任务类型获取任务执行器
|
||||||
|
*/
|
||||||
public static ClientCallbackHandler getClientCallback(Integer type) {
|
public static ClientCallbackHandler getClientCallback(Integer type) {
|
||||||
return CACHE.get(JobTaskTypeEnum.valueOf(type));
|
return CACHE.get(JobTaskTypeEnum.valueOf(type));
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,8 @@ import org.springframework.stereotype.Component;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 集群任务执行结果回调处理
|
||||||
|
*
|
||||||
* @author opensnail
|
* @author opensnail
|
||||||
* @date 2023-10-03 23:12:12
|
* @date 2023-10-03 23:12:12
|
||||||
* @since 2.4.0
|
* @since 2.4.0
|
||||||
@ -44,6 +46,10 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
|
|||||||
return ClientInfoUtils.generate(serverNode);
|
return ClientInfoUtils.generate(serverNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 调用Job任务执行结果actor
|
||||||
|
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobExecutorResultActor
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void doCallback(ClientCallbackContext context) {
|
protected void doCallback(ClientCallbackContext context) {
|
||||||
|
|
||||||
|
@ -110,6 +110,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
Job job = jobMapper.selectOne(queryWrapper.eq(Job::getId, taskExecute.getJobId()));
|
Job job = jobMapper.selectOne(queryWrapper.eq(Job::getId, taskExecute.getJobId()));
|
||||||
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
|
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
|
||||||
try {
|
try {
|
||||||
|
// 操作原因
|
||||||
int operationReason = JobOperationReasonEnum.NONE.getReason();
|
int operationReason = JobOperationReasonEnum.NONE.getReason();
|
||||||
if (Objects.isNull(job)) {
|
if (Objects.isNull(job)) {
|
||||||
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
|
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
|
||||||
|
@ -22,6 +22,8 @@ import org.springframework.stereotype.Component;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 任务执行结果处理
|
||||||
|
*
|
||||||
* @author opensnail
|
* @author opensnail
|
||||||
* @date 2023-10-05 17:16:35
|
* @date 2023-10-05 17:16:35
|
||||||
* @since 2.4.0
|
* @since 2.4.0
|
||||||
@ -74,6 +76,9 @@ public class JobExecutorResultActor extends AbstractActor {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 尝试完成任务
|
||||||
|
*/
|
||||||
private void tryCompleteAndStop(JobExecutorResultDTO jobExecutorResultDTO) {
|
private void tryCompleteAndStop(JobExecutorResultDTO jobExecutorResultDTO) {
|
||||||
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(jobExecutorResultDTO);
|
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(jobExecutorResultDTO);
|
||||||
jobTaskBatchHandler.handleResult(completeJobBatchDTO);
|
jobTaskBatchHandler.handleResult(completeJobBatchDTO);
|
||||||
|
@ -53,7 +53,7 @@ public class JobTaskPrepareActor extends AbstractActor {
|
|||||||
private void doPrepare(JobTaskPrepareDTO prepare) {
|
private void doPrepare(JobTaskPrepareDTO prepare) {
|
||||||
LambdaQueryWrapper<JobTaskBatch> queryWrapper = new LambdaQueryWrapper<JobTaskBatch>()
|
LambdaQueryWrapper<JobTaskBatch> queryWrapper = new LambdaQueryWrapper<JobTaskBatch>()
|
||||||
.eq(JobTaskBatch::getJobId, prepare.getJobId())
|
.eq(JobTaskBatch::getJobId, prepare.getJobId())
|
||||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE);
|
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE);//未完成状态
|
||||||
|
|
||||||
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
|
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
|
||||||
prepare.getTaskExecutorScene());
|
prepare.getTaskExecutorScene());
|
||||||
|
@ -47,6 +47,8 @@ import java.util.stream.Collectors;
|
|||||||
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
|
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 工作流执行器
|
||||||
|
*
|
||||||
* @author: xiaowoniu
|
* @author: xiaowoniu
|
||||||
* @date : 2023-12-22 10:34
|
* @date : 2023-12-22 10:34
|
||||||
* @since : 2.6.0
|
* @since : 2.6.0
|
||||||
@ -255,6 +257,9 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 前置任务是否已完成
|
||||||
|
*/
|
||||||
private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors,
|
private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors,
|
||||||
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode,
|
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode,
|
||||||
Map<Long, WorkflowNode> workflowNodeMap) {
|
Map<Long, WorkflowNode> workflowNodeMap) {
|
||||||
@ -303,6 +308,12 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
return Boolean.TRUE;
|
return Boolean.TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新工作流任务批次
|
||||||
|
*
|
||||||
|
* @param taskStatus 任务批次状态
|
||||||
|
* @param operationReason 操作原因
|
||||||
|
*/
|
||||||
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
|
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
|
||||||
|
|
||||||
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
||||||
|
@ -1,12 +1,19 @@
|
|||||||
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
|
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
|
||||||
|
|
||||||
|
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
|
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
|
||||||
|
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||||
|
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
|
public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||||
@ -23,6 +30,27 @@ public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void afterExecute(WorkflowExecutorContext context) {
|
protected void afterExecute(WorkflowExecutorContext context) {
|
||||||
|
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
|
||||||
|
|
||||||
|
JobTask jobTask = generateJobTask(context, jobTaskBatch);
|
||||||
|
|
||||||
|
JobLogMetaDTO jobLogMetaDTO = new JobLogMetaDTO();
|
||||||
|
jobLogMetaDTO.setNamespaceId(context.getNamespaceId());
|
||||||
|
jobLogMetaDTO.setGroupName(context.getGroupName());
|
||||||
|
jobLogMetaDTO.setTaskBatchId(jobTaskBatch.getId());
|
||||||
|
jobLogMetaDTO.setJobId(SystemConstants.WORKFLOW_JOB_ID);
|
||||||
|
jobLogMetaDTO.setTaskId(jobTask.getId());
|
||||||
|
if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
|
||||||
|
SnailJobLog.REMOTE.info("Node [{}] workflow success.\nworkflow params: {} \nworkflow result: [{}] <|>{}<|>", context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
|
||||||
|
} else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) {
|
||||||
|
if (WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(context.getParentOperationReason())) {
|
||||||
|
SnailJobLog.REMOTE.warn("Node [{}] cancels workflow. Cancellation reason: Current task does not require processing <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO);
|
||||||
|
} else {
|
||||||
|
SnailJobLog.REMOTE.warn("Node [{}] cancels workflow. Cancellation reason: Task status is closed <|>{}<|>", context.getWorkflowNodeId(), jobLogMetaDTO);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SnailJobLog.REMOTE.error("Node [{}] fail to workflow.\nReason: {} <|>{}<|>", context.getWorkflowNodeId(), context.getLogMessage(), jobLogMetaDTO);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -54,6 +54,9 @@ public class JobTaskBatchHandler {
|
|||||||
private final GroupConfigMapper groupConfigMapper;
|
private final GroupConfigMapper groupConfigMapper;
|
||||||
private final List<JobExecutorResultHandler> resultHandlerList;
|
private final List<JobExecutorResultHandler> resultHandlerList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理任务批次结果
|
||||||
|
*/
|
||||||
@Transactional
|
@Transactional
|
||||||
public boolean handleResult(CompleteJobBatchDTO completeJobBatchDTO) {
|
public boolean handleResult(CompleteJobBatchDTO completeJobBatchDTO) {
|
||||||
Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null"));
|
Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null"));
|
||||||
|
@ -103,6 +103,9 @@ public class WorkflowBatchHandler {
|
|||||||
return complete(workflowTaskBatchId, null);
|
return complete(workflowTaskBatchId, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流执行完成
|
||||||
|
*/
|
||||||
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) {
|
public boolean complete(Long workflowTaskBatchId, WorkflowTaskBatch workflowTaskBatch) {
|
||||||
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
|
workflowTaskBatch = Optional.ofNullable(workflowTaskBatch)
|
||||||
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
|
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
|
||||||
@ -175,6 +178,9 @@ public class WorkflowBatchHandler {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 修改工作流任务批次状态
|
||||||
|
*/
|
||||||
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
|
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
|
||||||
|
|
||||||
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
||||||
@ -312,6 +318,9 @@ public class WorkflowBatchHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 开启下一个工作流节点
|
||||||
|
*/
|
||||||
public void openNextNode(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
|
public void openNextNode(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
|
||||||
if (Objects.isNull(taskExecuteDTO.getParentId()) || Objects.isNull(taskExecuteDTO.getWorkflowTaskBatchId()) || Long.valueOf(0).equals(taskExecuteDTO.getWorkflowTaskBatchId())) {
|
if (Objects.isNull(taskExecuteDTO.getParentId()) || Objects.isNull(taskExecuteDTO.getWorkflowTaskBatchId()) || Long.valueOf(0).equals(taskExecuteDTO.getWorkflowTaskBatchId())) {
|
||||||
return;
|
return;
|
||||||
@ -330,12 +339,16 @@ public class WorkflowBatchHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通知工作流执行器
|
||||||
|
* @see com.aizuda.snailjob.server.job.task.support.dispatch.WorkflowExecutorActor
|
||||||
|
*/
|
||||||
private void tellWorkflowTaskExecutor(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
|
private void tellWorkflowTaskExecutor(WorkflowNodeTaskExecuteDTO taskExecuteDTO) {
|
||||||
try {
|
try {
|
||||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||||
actorRef.tell(taskExecuteDTO, actorRef);
|
actorRef.tell(taskExecuteDTO, actorRef);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SnailJobLog.LOCAL.error("Task scheduling execution failed", e);
|
SnailJobLog.LOCAL.error("Task scheduling execution failed", e); //任务调度执行失败
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,9 @@ public class TerminalJobPrepareHandler extends AbstractJobPrepareHandler {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private JobTaskBatchGenerator jobTaskBatchGenerator;
|
private JobTaskBatchGenerator jobTaskBatchGenerator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 任务完成状态
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean matches(Integer status) {
|
public boolean matches(Integer status) {
|
||||||
return COMPLETED.contains(status);
|
return COMPLETED.contains(status);
|
||||||
|
@ -13,7 +13,7 @@ import java.text.MessageFormat;
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务
|
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}待处理状态的任务
|
||||||
*
|
*
|
||||||
* @author opensnail
|
* @author opensnail
|
||||||
* @date 2023-10-05 18:29:22
|
* @date 2023-10-05 18:29:22
|
||||||
|
@ -23,6 +23,8 @@ import java.util.Optional;
|
|||||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT;
|
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 客户端执行完成后上报结果
|
||||||
|
*
|
||||||
* @author opensnail
|
* @author opensnail
|
||||||
* @date 2023-09-30 23:01:58
|
* @date 2023-09-30 23:01:58
|
||||||
* @since 2.4.0
|
* @since 2.4.0
|
||||||
|
@ -84,7 +84,7 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
|
|||||||
doHandleSuccess(context);
|
doHandleSuccess(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 开启下一个工作流节点
|
// 开启下一个工作流节点(如果需要)
|
||||||
openNextWorkflowNode(context);
|
openNextWorkflowNode(context);
|
||||||
|
|
||||||
boolean res = updateStatus(context, taskBatchStatus);
|
boolean res = updateStatus(context, taskBatchStatus);
|
||||||
@ -104,6 +104,9 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
|
|||||||
workflowBatchHandler.openNextNode(taskExecuteDTO);
|
workflowBatchHandler.openNextNode(taskExecuteDTO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新任务批次状态
|
||||||
|
*/
|
||||||
protected boolean updateStatus(final JobExecutorResultContext context, final Integer taskBatchStatus) {
|
protected boolean updateStatus(final JobExecutorResultContext context, final Integer taskBatchStatus) {
|
||||||
JobTaskBatch jobTaskBatch = new JobTaskBatch();
|
JobTaskBatch jobTaskBatch = new JobTaskBatch();
|
||||||
jobTaskBatch.setId(context.getTaskBatchId());
|
jobTaskBatch.setId(context.getTaskBatchId());
|
||||||
@ -135,6 +138,9 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
|
|||||||
instanceInterrupt.stop(stopJobContext);
|
instanceInterrupt.stop(stopJobContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 成功(除MapReduce任务外啥也没干)
|
||||||
|
*/
|
||||||
protected abstract void doHandleSuccess(final JobExecutorResultContext context);
|
protected abstract void doHandleSuccess(final JobExecutorResultContext context);
|
||||||
|
|
||||||
protected abstract void doHandleStop(final JobExecutorResultContext context);
|
protected abstract void doHandleStop(final JobExecutorResultContext context);
|
||||||
|
@ -18,6 +18,7 @@ import java.text.MessageFormat;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 工作流任务超时检查任务
|
||||||
* @author opensnail
|
* @author opensnail
|
||||||
* @date 2024-05-20 22:25:12
|
* @date 2024-05-20 22:25:12
|
||||||
* @since sj_1.0.0
|
* @since sj_1.0.0
|
||||||
|
Loading…
Reference in New Issue
Block a user