Pre Merge pull request !83 from Srzou/1.2.0-beta1
This commit is contained in:
commit
60fd79cb86
@ -183,6 +183,15 @@ public class WorkflowBatchHandler {
|
|||||||
operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
|
operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 关闭已经触发的任务
|
||||||
|
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||||
|
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
|
||||||
|
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
|
||||||
|
|
||||||
|
if (CollUtil.isEmpty(jobTaskBatches)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
WorkflowTaskBatch workflowTaskBatch = new WorkflowTaskBatch();
|
WorkflowTaskBatch workflowTaskBatch = new WorkflowTaskBatch();
|
||||||
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
|
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
|
||||||
workflowTaskBatch.setOperationReason(operationReason);
|
workflowTaskBatch.setOperationReason(operationReason);
|
||||||
@ -193,15 +202,6 @@ public class WorkflowBatchHandler {
|
|||||||
workflowTaskBatchId));
|
workflowTaskBatchId));
|
||||||
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
|
SnailSpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId));
|
||||||
|
|
||||||
// 关闭已经触发的任务
|
|
||||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
|
||||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)
|
|
||||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId));
|
|
||||||
|
|
||||||
if (CollUtil.isEmpty(jobTaskBatches)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(jobTaskBatches, JobTaskBatch::getJobId));
|
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(jobTaskBatches, JobTaskBatch::getJobId));
|
||||||
|
|
||||||
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
|
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
|
||||||
|
@ -60,6 +60,9 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
|
|||||||
|
|
||||||
if (context.isNeedUpdateTaskStatus()) {
|
if (context.isNeedUpdateTaskStatus()) {
|
||||||
for (final JobTask jobTask : jobTasks) {
|
for (final JobTask jobTask : jobTasks) {
|
||||||
|
if (jobTask.getTaskStatus() == JobTaskStatusEnum.SUCCESS.getStatus()){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(jobTask);
|
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(jobTask);
|
||||||
jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.STOP.getStatus());
|
jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.STOP.getStatus());
|
||||||
jobExecutorResultDTO.setMessage("任务停止成功");
|
jobExecutorResultDTO.setMessage("任务停止成功");
|
||||||
|
@ -93,7 +93,9 @@ public class JobHandler {
|
|||||||
String wfContext = getWfContext(workflowTaskBatchId);
|
String wfContext = getWfContext(workflowTaskBatchId);
|
||||||
|
|
||||||
for (JobTask jobTask : jobTasks) {
|
for (JobTask jobTask : jobTasks) {
|
||||||
if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()) {
|
// 增加Map及MapReduce重试任务的状态判断,防止重复执行
|
||||||
|
if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()
|
||||||
|
|| jobTask.getTaskStatus() == JobTaskStatusEnum.SUCCESS.getStatus()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user