From e98946580ef0ac8c4b65a28c79892d980cfa7b04 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Wed, 8 May 2024 09:34:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.0):=20=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E6=8E=A5=E5=85=A5=E5=91=8A=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/common/window/SlidingWindow.java | 6 +++--- .../dataobject/WorkflowBatchResponseDO.java | 5 +++++ .../mapper/WorkflowTaskBatchMapper.java | 2 ++ .../template/mapper/WorkflowTaskBatchMapper.xml | 9 ++++++++- .../server/common/AlarmInfoConverter.java | 3 ++- .../server/common/alarm/AbstractAlarm.java | 2 -- .../server/common/alarm/AbstractJobAlarm.java | 3 --- .../server/common/alarm/AbstractRetryAlarm.java | 3 --- .../server/common/dto/WorkflowAlarmInfo.java | 5 +++++ .../listener/WorkflowTaskFailAlarmListener.java | 17 ++++++++++++++--- .../support/dispatch/WorkflowExecutorActor.java | 2 +- .../workflow/CallbackWorkflowExecutor.java | 3 +++ .../workflow/DecisionWorkflowExecutor.java | 3 +++ .../support/handler/WorkflowBatchHandler.java | 4 ++++ .../workflow/RunningWorkflowPrepareHandler.java | 3 +++ 15 files changed, 53 insertions(+), 17 deletions(-) diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java index 0dd9d9bf..26d9c053 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java @@ -102,7 +102,7 @@ public class SlidingWindow { ConcurrentLinkedQueue list = new ConcurrentLinkedQueue<>(); list.add(data); - SnailJobLog.LOCAL.info("添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size()); + SnailJobLog.LOCAL.debug("添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size()); saveData.put(windowPeriod, list); // 扫描n-1个窗口,是否过期,过期则删除 @@ -213,7 +213,7 @@ public class SlidingWindow { LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit); if (windowPeriod.isBefore(currentTime)) { - SnailJobLog.LOCAL.info("删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime); + SnailJobLog.LOCAL.debug("删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime); saveData.remove(windowPeriod); } @@ -287,7 +287,7 @@ public class SlidingWindow { removeInvalidWindow(windowPeriod); if (windowPeriod.isBefore(condition)) { - SnailJobLog.LOCAL.info("到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData)); + SnailJobLog.LOCAL.debug("到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData)); doHandlerListener(windowPeriod); } } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/WorkflowBatchResponseDO.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/WorkflowBatchResponseDO.java index 12553d9f..efb4de07 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/WorkflowBatchResponseDO.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/dataobject/WorkflowBatchResponseDO.java @@ -14,6 +14,11 @@ public class WorkflowBatchResponseDO { private Long id; + /** + * 命名空间id + */ + private String namespaceId; + /** * 组名称 */ diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/WorkflowTaskBatchMapper.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/WorkflowTaskBatchMapper.java index 61ac2ad3..eeb357e5 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/WorkflowTaskBatchMapper.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/mapper/WorkflowTaskBatchMapper.java @@ -24,4 +24,6 @@ import java.util.List; public interface WorkflowTaskBatchMapper extends BaseMapper { List selectWorkflowBatchPageList(PageDTO pageDTO, @Param("ew") Wrapper wrapper); + + List selectWorkflowBatchPageList(@Param("ew") Wrapper wrapper); } diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/WorkflowTaskBatchMapper.xml b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/WorkflowTaskBatchMapper.xml index 6433b7fc..bbd73b6b 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/WorkflowTaskBatchMapper.xml +++ b/snail-job-datasource/snail-job-datasource-template/src/main/resources/template/mapper/WorkflowTaskBatchMapper.xml @@ -26,5 +26,12 @@ JOIN sj_workflow b ON a.workflow_id = b.id ${ew.customSqlSegment} - + diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java index d892940e..a9f99428 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/AlarmInfoConverter.java @@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO; import com.aizuda.snailjob.template.datasource.persistence.po.JobNotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; @@ -62,6 +63,6 @@ public interface AlarmInfoConverter { List toJobAlarmInfos(List jobBatchResponse); - List toWorkflowAlarmInfos(List jobBatchResponse); + List toWorkflowAlarmInfos(List workflowBatchResponses); } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java index 90fadf77..7a42077a 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractAlarm.java @@ -29,10 +29,8 @@ import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.event.TransactionPhase; -import org.springframework.transaction.event.TransactionalApplicationListener; import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.util.CollectionUtils; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java index 3b335e15..46bec3fb 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractJobAlarm.java @@ -1,13 +1,10 @@ package com.aizuda.snailjob.server.common.alarm; import com.aizuda.snailjob.server.common.dto.JobAlarmInfo; -import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; -import com.aizuda.snailjob.server.common.enums.SystemModeEnum; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; import org.springframework.context.ApplicationEvent; -import java.text.MessageFormat; import java.util.List; import java.util.Map; import java.util.Set; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java index fac087b2..55c84c30 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/alarm/AbstractRetryAlarm.java @@ -1,13 +1,10 @@ package com.aizuda.snailjob.server.common.alarm; -import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; -import com.aizuda.snailjob.server.common.enums.SystemModeEnum; import com.aizuda.snailjob.server.common.triple.ImmutableTriple; import com.aizuda.snailjob.server.common.triple.Triple; import org.springframework.context.ApplicationEvent; -import java.text.MessageFormat; import java.util.List; import java.util.Map; import java.util.Set; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java index 59a3fa21..d92bf359 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/WorkflowAlarmInfo.java @@ -24,4 +24,9 @@ public class WorkflowAlarmInfo extends AlarmInfo { */ private Long workflowId; + /** + * 操作原因 + */ + private Integer operationReason; + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/WorkflowTaskFailAlarmListener.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/WorkflowTaskFailAlarmListener.java index 2fb7ff2d..cbd636ed 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/WorkflowTaskFailAlarmListener.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/alarm/listener/WorkflowTaskFailAlarmListener.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.alarm.listener; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; +import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.util.EnvironmentUtils; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.AlarmInfoConverter; @@ -11,8 +12,11 @@ import com.aizuda.snailjob.server.common.dto.WorkflowAlarmInfo; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; +import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -41,6 +45,7 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm 空间ID:{} \s > 组名称:{} \s > 工作流名称:{} \s + > 失败原因:{} \s > 时间:{}; """; @@ -53,14 +58,19 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm jobTaskBatchIds = Lists.newArrayList(workflowTaskBatchId); - queue.drainTo(jobTaskBatchIds, 200); - List workflowTaskBatches = workflowTaskBatchMapper.selectBatchIds(jobTaskBatchIds); + List workflowTaskBatchIds = Lists.newArrayList(workflowTaskBatchId); + queue.drainTo(workflowTaskBatchIds, 200); + + QueryWrapper wrapper = new QueryWrapper() + .in("a.id", workflowTaskBatchIds).eq("a.deleted", 0); + List workflowTaskBatches = workflowTaskBatchMapper.selectWorkflowBatchPageList(wrapper); return AlarmInfoConverter.INSTANCE.toWorkflowAlarmInfos(workflowTaskBatches); } @Override protected AlarmContext buildAlarmContext(WorkflowAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) { + String desc = JobOperationReasonEnum.getByReason(alarmDTO.getOperationReason()).getDesc(); + // 预警 return AlarmContext.build() .text(MESSAGES_FORMATTER, @@ -68,6 +78,7 @@ public class WorkflowTaskFailAlarmListener extends AbstractWorkflowAlarm new SnailJobServerException("停止工作流批次失败. id:[{}]", workflowTaskBatchId)); + SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(workflowTaskBatchId)); // 关闭已经触发的任务 List jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java index eb2369c0..51c4d749 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/prepare/workflow/RunningWorkflowPrepareHandler.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.job.task.support.prepare.workflow; +import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -7,6 +8,7 @@ import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.support.BlockStrategy; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.block.workflow.WorkflowBlockStrategyContext; import com.aizuda.snailjob.server.job.task.support.block.workflow.WorkflowBlockStrategyFactory; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; @@ -56,6 +58,7 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle prepare.getWorkflowTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout())); // 超时停止任务 workflowBatchHandler.stop(prepare.getWorkflowTaskBatchId(), JobOperationReasonEnum.TASK_EXECUTION_TIMEOUT.getReason()); + SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(prepare.getWorkflowTaskBatchId())); } }