!83 fix(sj_1.2.0-beta1):修复Map/MapReduce重试问题,及手动暂停相关逻辑
* fix(sj_1.2.0-beta1):修复Map/MapReduce重试问题,及手动暂停相关逻辑。
This commit is contained in:
		
							parent
							
								
									ab15e0b220
								
							
						
					
					
						commit
						ad054b8999
					
				@ -183,6 +183,15 @@ public class WorkflowBatchHandler {
 | 
			
		||||
            operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 关闭已经触发的任务
 | 
			
		||||
        List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
 | 
			
		||||
                .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
 | 
			
		||||
                .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
 | 
			
		||||
 | 
			
		||||
        if (CollUtil.isEmpty(jobTaskBatches)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        WorkflowTaskBatch workflowTaskBatch = new WorkflowTaskBatch();
 | 
			
		||||
        workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
 | 
			
		||||
        workflowTaskBatch.setOperationReason(operationReason);
 | 
			
		||||
@ -193,15 +202,6 @@ public class WorkflowBatchHandler {
 | 
			
		||||
                        workflowTaskBatchId));
 | 
			
		||||
        SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
 | 
			
		||||
 | 
			
		||||
        // 关闭已经触发的任务
 | 
			
		||||
        List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
 | 
			
		||||
                .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
 | 
			
		||||
                .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
 | 
			
		||||
 | 
			
		||||
        if (CollUtil.isEmpty(jobTaskBatches)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(jobTaskBatches, JobTaskBatch::getJobId));
 | 
			
		||||
 | 
			
		||||
        Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
 | 
			
		||||
 | 
			
		||||
@ -60,6 +60,9 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
 | 
			
		||||
 | 
			
		||||
        if (context.isNeedUpdateTaskStatus()) {
 | 
			
		||||
            for (final JobTask jobTask : jobTasks) {
 | 
			
		||||
                if (jobTask.getTaskStatus() == JobTaskStatusEnum.SUCCESS.getStatus()){
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
                JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(jobTask);
 | 
			
		||||
                jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.STOP.getStatus());
 | 
			
		||||
                jobExecutorResultDTO.setMessage("任务停止成功");
 | 
			
		||||
 | 
			
		||||
@ -93,7 +93,9 @@ public class JobHandler {
 | 
			
		||||
        String wfContext = getWfContext(workflowTaskBatchId);
 | 
			
		||||
 | 
			
		||||
        for (JobTask jobTask : jobTasks) {
 | 
			
		||||
            if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()) {
 | 
			
		||||
            // 增加Map及MapReduce重试任务的状态判断,防止重复执行
 | 
			
		||||
            if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()
 | 
			
		||||
                    || jobTask.getTaskStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user