From d5b5c1cef2b669bf926d8777324eda546f2cb6e0 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Wed, 8 May 2024 15:34:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.0.0):=20=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=8E=A5=E5=85=A5=E5=91=8A?= =?UTF-8?q?=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../snailjob/client/common/NettyClient.java | 2 +- .../common/cache/GroupVersionCache.java | 21 ++++-- .../common/client/SnailJobCommonEndPoint.java | 30 ++++++++ .../common/config/SnailJobProperties.java | 5 -- .../common/rpc/client/NettyChannel.java | 2 +- .../client/job/core/client/JobEndPoint.java | 7 +- .../core/executor/JobExecutorCallable.java | 23 ------ .../executor/JobExecutorFutureCallback.java | 70 ++++++++++++++++-- .../core/client/SnailRetryEndPoint.java | 19 +++-- .../intercepter/SnailRetryInterceptor.java | 42 ++++++----- .../client/core/report/AbstractReport.java | 6 +- .../client/core/report/AsyncReport.java | 2 - .../client/core/report/ReportListener.java | 14 ++-- .../client/core/report/SyncReport.java | 11 ++- .../strategy/AbstractRetryStrategies.java | 14 ++-- .../common/core/constant/SystemConstants.java | 36 ++++++++-- .../snailjob/server/model/dto/ConfigDTO.java | 11 ++- .../access/config/AbstractConfigAccess.java | 21 ++++-- .../server/common/client/CommonRpcClient.java | 23 ++++++ .../common/config/SystemProperties.java | 8 --- .../server/common}/dto/ConfigSyncTask.java | 2 +- .../handler/ConfigVersionSyncHandler.java | 10 +-- .../handler/ConfigHttpRequestHandler.java | 5 +- .../task/SnailJobServerJobTaskStarter.java | 2 - .../server/job/task/client/JobRpcClient.java | 7 +- .../task/SnailJobServerRetryTaskStarter.java | 2 - .../retry/task/client/RetryRpcClient.java | 13 ++-- .../ReportRetryInfoHttpRequestHandler.java | 1 + .../starter/dispatch/ConsumerBucketActor.java | 72 ++++++++++--------- .../src/main/resources/application.yml | 1 - .../service/impl/GroupConfigServiceImpl.java | 20 ++---- .../service/impl/SystemUserServiceImpl.java | 2 - 32 files changed, 313 insertions(+), 191 deletions(-) create mode 100644 snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/client/SnailJobCommonEndPoint.java delete mode 100644 snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorCallable.java create mode 100644 snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java rename snail-job-server/{snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task => snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common}/dto/ConfigSyncTask.java (82%) rename snail-job-server/{snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support => snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common}/handler/ConfigVersionSyncHandler.java (91%) diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/NettyClient.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/NettyClient.java index d19319af..a2d58097 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/NettyClient.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/NettyClient.java @@ -17,7 +17,7 @@ import com.aizuda.snailjob.client.common.rpc.client.RequestMethod; */ public interface NettyClient { - @Mapping(method = RequestMethod.GET, path = HTTP_PATH.CONFIG) + @Mapping(method = RequestMethod.GET, path = HTTP_PATH.SYNC_CONFIG) Result getConfig(Integer version); @Mapping(method = RequestMethod.GET, path = HTTP_PATH.BEAT) diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/cache/GroupVersionCache.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/cache/GroupVersionCache.java index 4ce9540e..1430c5a2 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/cache/GroupVersionCache.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/cache/GroupVersionCache.java @@ -8,16 +8,12 @@ import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.server.model.dto.ConfigDTO; -import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; -import lombok.extern.slf4j.Slf4j; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Set; /** * @author: opensnail @@ -25,7 +21,6 @@ import java.util.Set; */ @Component @Order -@Slf4j public class GroupVersionCache implements Lifecycle { private static ConfigDTO CONFIG; @@ -60,10 +55,22 @@ public class GroupVersionCache implements Lifecycle { return SystemConstants.DEFAULT_DDL; } - public static ConfigDTO.Notify getNotifyAttribute(Integer notifyScene) { + public static ConfigDTO.Notify getRetryNotifyAttribute(Integer notifyScene) { List notifyList = CONFIG.getNotifyList(); for (ConfigDTO.Notify notify : notifyList) { - if (notify.getNotifyScene().equals(notifyScene)) { + if (notify.getRetryNotifyScene().equals(notifyScene)) { + return notify; + } + } + + return null; + } + + + public static ConfigDTO.Notify getJobNotifyAttribute(Integer notifyScene) { + List notifyList = CONFIG.getNotifyList(); + for (ConfigDTO.Notify notify : notifyList) { + if (notify.getJobNotifyScene().equals(notifyScene)) { return notify; } } diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/client/SnailJobCommonEndPoint.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/client/SnailJobCommonEndPoint.java new file mode 100644 index 00000000..335b9bd9 --- /dev/null +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/client/SnailJobCommonEndPoint.java @@ -0,0 +1,30 @@ +package com.aizuda.snailjob.client.common.client; + +import com.aizuda.snailjob.client.common.annotation.Mapping; +import com.aizuda.snailjob.client.common.annotation.SnailEndPoint; +import com.aizuda.snailjob.client.common.cache.GroupVersionCache; +import com.aizuda.snailjob.client.common.rpc.client.RequestMethod; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.server.model.dto.ConfigDTO; + +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.SYNC_CONFIG; + +/** + * SnailJob 通用EndPoint + * + * @author: opensnail + * @date : 2022-03-09 16:33 + */ +@SnailEndPoint +public class SnailJobCommonEndPoint { + + /** + * 同步版本 + */ + @Mapping(path = SYNC_CONFIG, method = RequestMethod.POST) + public Result syncVersion(ConfigDTO configDTO) { + GroupVersionCache.setConfig(configDTO); + return new Result(); + } + +} diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/config/SnailJobProperties.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/config/SnailJobProperties.java index dd2b96d3..c555629d 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/config/SnailJobProperties.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/config/SnailJobProperties.java @@ -172,11 +172,6 @@ public class SnailJobProperties { private SlidingWindowConfig reportSlidingWindow = new SlidingWindowConfig(); } - public static String getGroup() { - SnailJobProperties properties = SpringContext.getBean(SnailJobProperties.class); - return Objects.requireNonNull(properties).group; - } - /** * 邮件配置 */ diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java index cfd6eddd..2e9b34bc 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java @@ -174,7 +174,7 @@ public class NettyChannel { .set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()) .set(HeadersEnum.HOST_ID.getKey(), HOST_ID) .set(HeadersEnum.HOST_IP.getKey(), getClientHost()) - .set(HeadersEnum.GROUP_NAME.getKey(), SnailJobProperties.getGroup()) + .set(HeadersEnum.GROUP_NAME.getKey(), snailJobProperties.getGroup()) .set(HeadersEnum.HOST_PORT.getKey(), getClientPort()) .set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion()) .set(HeadersEnum.HOST.getKey(), serverConfig.getHost()) diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 8df8923b..52a85c13 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -29,6 +29,9 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP; + /** * @author: opensnail * @date : 2023-09-27 16:33 @@ -36,7 +39,7 @@ import java.util.concurrent.ThreadPoolExecutor; @SnailEndPoint public class JobEndPoint { - @Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST) + @Mapping(path = JOB_DISPATCH, method = RequestMethod.POST) public Result dispatchJob(DispatchJobRequest dispatchJob) { ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); @@ -116,7 +119,7 @@ public class JobEndPoint { return jobContext; } - @Mapping(path = "/job/stop/v1", method = RequestMethod.POST) + @Mapping(path = JOB_STOP, method = RequestMethod.POST) public Result stopJob(StopJobDTO interruptJob) { ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorCallable.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorCallable.java deleted file mode 100644 index e6f81852..00000000 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorCallable.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.aizuda.snailjob.client.job.core.executor; - -import com.aizuda.snailjob.client.model.ExecuteResult; - -import java.util.concurrent.Callable; - -/** - * @author opensnail - * @date 2023-10-08 22:48:44 - * @since 2.4.0 - */ -public class JobExecutorCallable implements Callable { - - public JobExecutorCallable(ExecuteResult executeResult) { - - } - - @Override - public ExecuteResult call() throws Exception { - - return null; - } -} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java index a5d2f16d..cd50f907 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java @@ -1,5 +1,8 @@ package com.aizuda.snailjob.client.job.core.executor; +import cn.hutool.core.date.DatePattern; +import com.aizuda.snailjob.client.common.cache.GroupVersionCache; +import com.aizuda.snailjob.client.common.config.SnailJobProperties; import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager; import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache; @@ -7,19 +10,30 @@ import com.aizuda.snailjob.client.job.core.client.JobNettyClient; import com.aizuda.snailjob.client.job.core.log.JobLogMeta; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest; +import com.aizuda.snailjob.common.core.alarm.AlarmContext; +import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; +import com.aizuda.snailjob.common.core.context.SpringContext; +import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.util.EnvironmentUtils; +import com.aizuda.snailjob.common.core.util.NetUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.enums.LogTypeEnum; -import com.aizuda.snailjob.client.job.core.client.JobNettyClient; +import com.aizuda.snailjob.server.model.dto.ConfigDTO; +import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify.Recipient; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import lombok.extern.slf4j.Slf4j; +import java.time.LocalDateTime; +import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CancellationException; /** @@ -30,9 +44,20 @@ import java.util.concurrent.CancellationException; @Slf4j public class JobExecutorFutureCallback implements FutureCallback { + private static final String TEXT_MESSAGE_FORMATTER = """ + {}环境 执行结果上报异常 \s + > IP:{} \s + > 空间ID:{} \s + > 名称:{} \s + > 时间:{} \s + > 结果:{} \s + > 异常:{} \s + \s"""; + private static final JobNettyClient CLIENT = RequestBuilder.newBuilder() - .client(JobNettyClient.class) - .callback(nettyResult -> SnailJobLog.LOCAL.info("Job execute result report successfully requestId:[{}]", nettyResult.getRequestId())).build(); + .client(JobNettyClient.class) + .callback(nettyResult -> SnailJobLog.LOCAL.info("Job execute result report successfully requestId:[{}]", + nettyResult.getRequestId())).build(); private final JobContext jobContext; @@ -65,6 +90,7 @@ public class JobExecutorFutureCallback implements FutureCallback CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus)); } catch (Exception e) { SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); + sendMessage(result, e); } finally { SnailJobLogManager.removeLogMeta(); stopThreadPool(); @@ -73,7 +99,7 @@ public class JobExecutorFutureCallback implements FutureCallback @Override public void onFailure(final Throwable t) { - + ExecuteResult failure = ExecuteResult.failure(); try { // 初始化调度信息(日志上报LogUtil) initLogContext(); @@ -81,7 +107,6 @@ public class JobExecutorFutureCallback implements FutureCallback // 上报执行失败 SnailJobLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t); - ExecuteResult failure = ExecuteResult.failure(); if (t instanceof CancellationException) { failure.setMessage("任务被取消"); } else { @@ -93,6 +118,7 @@ public class JobExecutorFutureCallback implements FutureCallback ); } catch (Exception e) { SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); + sendMessage(failure, e); } finally { SnailJobLogManager.removeLogMeta(); stopThreadPool(); @@ -132,4 +158,38 @@ public class JobExecutorFutureCallback implements FutureCallback dispatchJobRequest.setRetryScene(jobContext.getRetryScene()); return dispatchJobRequest; } + + private void sendMessage(final ExecuteResult result, Exception e) { + + try { + SnailJobProperties snailJobProperties = SpringContext.getBean(SnailJobProperties.class); + if (Objects.isNull(snailJobProperties)) { + return; + } + ConfigDTO.Notify notify = GroupVersionCache.getJobNotifyAttribute( + JobNotifySceneEnum.JOB_CLIENT_ERROR.getNotifyScene()); + if (Objects.nonNull(notify)) { + List recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList()); + for (final Recipient recipient : recipients) { + AlarmContext context = AlarmContext.build() + .text(TEXT_MESSAGE_FORMATTER, + EnvironmentUtils.getActiveProfile(), + NetUtil.getLocalIpStr(), + snailJobProperties.getNamespace(), + snailJobProperties.getGroup(), + LocalDateTime.now().format(DatePattern.NORM_DATETIME_FORMATTER), + JsonUtil.toJsonString(result), + e.getMessage()) + .title("定时任务执行结果上报异常:[{}]", snailJobProperties.getGroup()) + .notifyAttribute(recipient.getNotifyAttribute()); + + Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context)); + } + } + } catch (Exception e1) { + SnailJobLog.LOCAL.error("Client failed to send component exception alert.", e1); + } + + } + } diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java index 0979d902..94e0cc30 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/client/SnailRetryEndPoint.java @@ -22,6 +22,7 @@ import com.aizuda.snailjob.client.model.DispatchRetryDTO; import com.aizuda.snailjob.client.model.DispatchRetryResultDTO; import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO; import com.aizuda.snailjob.client.model.RetryCallbackDTO; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; @@ -46,6 +47,10 @@ import java.lang.reflect.Method; import java.util.Objects; import java.util.Set; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_CALLBACK; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_DISPATCH; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_GENERATE_IDEM_ID; + /** * 服务端调调用客户端进行重试流量下发、配置变更通知等操作 * @@ -62,7 +67,7 @@ public class SnailRetryEndPoint { /** * 服务端调度重试入口 */ - @Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST) + @Mapping(path = RETRY_DISPATCH, method = RequestMethod.POST) public Result dispatch(DispatchRetryDTO executeReqDto) { ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); @@ -147,16 +152,8 @@ public class SnailRetryEndPoint { return new Result<>(executeRespDto); } - /** - * 同步版本 - */ - @Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST) - public Result syncVersion(ConfigDTO configDTO) { - GroupVersionCache.setConfig(configDTO); - return new Result(); - } - @Mapping(path = "/retry/callback/v1", method = RequestMethod.POST) + @Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST) public Result callback(RetryCallbackDTO callbackDTO) { ValidatorFactory vf = Validation.buildDefaultValidatorFactory(); @@ -274,7 +271,7 @@ public class SnailRetryEndPoint { * @param generateRetryIdempotentIdDTO 生成idempotentId模型 * @return idempotentId */ - @Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST) + @Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST) public Result idempotentIdGenerate( GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) { diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/intercepter/SnailRetryInterceptor.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/intercepter/SnailRetryInterceptor.java index a8589c28..6ac909c9 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/intercepter/SnailRetryInterceptor.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/intercepter/SnailRetryInterceptor.java @@ -10,7 +10,6 @@ import com.aizuda.snailjob.client.core.cache.RetryerInfoCache; import com.aizuda.snailjob.client.core.retryer.RetryerInfo; import com.aizuda.snailjob.client.core.retryer.RetryerResultContext; import com.aizuda.snailjob.client.core.strategy.RetryStrategy; -import com.aizuda.snailjob.common.core.alarm.Alarm; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; import com.aizuda.snailjob.common.core.context.SpringContext; @@ -52,18 +51,18 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static String retryErrorMoreThresholdTextMessageFormatter = - "{}环境 重试组件异常 \n" + - "> IP:{} \n" + - "> 空间ID:{} \n" + - "> 名称:{} \n" + - "> 时间:{} \n" + - "> 异常:{} \n"; + "{}环境 重试组件异常 \n" + + "> IP:{} \n" + + "> 空间ID:{} \n" + + "> 名称:{} \n" + + "> 时间:{} \n" + + "> 异常:{} \n"; private final StandardEnvironment standardEnvironment; private final RetryStrategy retryStrategy; public SnailRetryInterceptor(StandardEnvironment standardEnvironment, - RetryStrategy localRetryStrategies) { + RetryStrategy localRetryStrategies) { this.standardEnvironment = standardEnvironment; this.retryStrategy = localRetryStrategies; } @@ -165,7 +164,8 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se } else if (!validate(throwable, RetryerInfoCache.get(retryable.scene(), executorClassName))) { SnailJobLog.LOCAL.debug("Exception mismatch. traceId:[{}]", traceId); } else { - SnailJobLog.LOCAL.debug("Unknown situations do not enable local retry scenarios. traceId:[{}]", traceId); + SnailJobLog.LOCAL.debug("Unknown situations do not enable local retry scenarios. traceId:[{}]", + traceId); } return null; } @@ -184,9 +184,11 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se RetryerResultContext context = retryStrategy.openRetry(retryable.scene(), executorClassName, point.getArguments()); if (RetryResultStatusEnum.SUCCESS.getStatus().equals(context.getRetryResultStatusEnum().getStatus())) { - SnailJobLog.LOCAL.debug("local retry successful. traceId:[{}] result:[{}]", traceId, context.getResult()); + SnailJobLog.LOCAL.debug("local retry successful. traceId:[{}] result:[{}]", traceId, + context.getResult()); } else { - SnailJobLog.LOCAL.debug("local retry result. traceId:[{}] throwable:[{}]", traceId, context.getThrowable()); + SnailJobLog.LOCAL.debug("local retry result. traceId:[{}] throwable:[{}]", traceId, + context.getThrowable()); } return context; @@ -215,24 +217,28 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se private void sendMessage(Exception e) { try { - ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute( - RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene()); + ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute( + RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene()); if (Objects.nonNull(notify)) { + SnailJobProperties snailJobProperties = SpringContext.getBean(SnailJobProperties.class); + if (Objects.isNull(snailJobProperties)) { + return; + } List recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList()); for (final Recipient recipient : recipients) { AlarmContext context = AlarmContext.build() .text(retryErrorMoreThresholdTextMessageFormatter, EnvironmentUtils.getActiveProfile(), NetUtil.getLocalIpStr(), - standardEnvironment.getProperty("snail-job.namespace", StrUtil.EMPTY), - SnailJobProperties.getGroup(), + snailJobProperties.getNamespace(), + snailJobProperties.getGroup(), LocalDateTime.now().format(formatter), e.getMessage()) - .title("retry component handling exception:[{}]", SnailJobProperties.getGroup()) + .title("retry component handling exception:[{}]", snailJobProperties.getGroup()) .notifyAttribute(recipient.getNotifyAttribute()); - Alarm alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType()); - alarmType.asyncSendMessage(context); + Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())) + .ifPresent(alarm -> alarm.asyncSendMessage(context)); } } diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AbstractReport.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AbstractReport.java index 2725e6ec..a7cd6771 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AbstractReport.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AbstractReport.java @@ -14,7 +14,9 @@ import com.aizuda.snailjob.client.core.retryer.RetryerInfo; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.IdempotentIdContext; import com.aizuda.snailjob.server.model.dto.RetryTaskDTO; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.ReflectionUtils; import java.lang.reflect.Method; @@ -28,6 +30,8 @@ import java.lang.reflect.Method; */ @Slf4j public abstract class AbstractReport implements Report { + @Autowired + protected SnailJobProperties snailJobProperties; @Override public boolean report(String scene, final String targetClassName, final Object[] params) { @@ -76,7 +80,7 @@ public abstract class AbstractReport implements Report { retryTaskDTO.setIdempotentId(idempotentId); retryTaskDTO.setExecutorName(targetClassName); retryTaskDTO.setArgsStr(serialize); - retryTaskDTO.setGroupName(SnailJobProperties.getGroup()); + retryTaskDTO.setGroupName(snailJobProperties.getGroup()); retryTaskDTO.setSceneName(scene); String expression = retryerInfo.getBizNo(); diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AsyncReport.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AsyncReport.java index 9d8c3a61..b97d23b3 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AsyncReport.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/AsyncReport.java @@ -20,9 +20,7 @@ import java.util.concurrent.TimeUnit; * @since 1.3.0 */ @Component -@RequiredArgsConstructor public class AsyncReport extends AbstractReport implements Lifecycle { - private final SnailJobProperties snailJobProperties; private SlidingWindow slidingWindow; @Override public boolean supports(boolean async) { diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/ReportListener.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/ReportListener.java index 5ebe9677..c3da0edc 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/ReportListener.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/ReportListener.java @@ -7,7 +7,6 @@ import com.aizuda.snailjob.client.core.RetryExecutorParameter; import com.aizuda.snailjob.client.common.cache.GroupVersionCache; import com.aizuda.snailjob.client.common.client.NettyClient; import com.aizuda.snailjob.client.core.executor.GuavaRetryExecutor; -import com.aizuda.snailjob.common.core.alarm.Alarm; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; import com.aizuda.snailjob.common.core.context.SpringContext; @@ -110,12 +109,15 @@ public class ReportListener implements Listener { private void sendMessage(Throwable e) { try { - ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene()); + ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene()); if (Objects.isNull(notify)) { return; } SnailJobProperties properties = SpringContext.getBean(SnailJobProperties.class); + if (Objects.isNull(properties)) { + return; + } List recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList()); for (final Recipient recipient : recipients) { AlarmContext context = AlarmContext.build() @@ -123,14 +125,12 @@ public class ReportListener implements Listener { EnvironmentUtils.getActiveProfile(), NetUtil.getLocalIpStr(), properties.getNamespace(), - SnailJobProperties.getGroup(), + properties.getGroup(), LocalDateTime.now().format(formatter), e.getMessage()) - .title("上报异常:[{}]", SnailJobProperties.getGroup()) + .title("上报异常:[{}]", properties.getGroup()) .notifyAttribute(recipient.getNotifyAttribute()); - - Alarm alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType()); - alarmType.asyncSendMessage(context); + Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context)); } } catch (Exception e1) { diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/SyncReport.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/SyncReport.java index 88fcf717..698130cc 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/SyncReport.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/report/SyncReport.java @@ -5,10 +5,8 @@ import com.aizuda.snailjob.client.common.config.SnailJobProperties; import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.client.common.client.NettyClient; import com.aizuda.snailjob.client.core.retryer.RetryerInfo; -import com.aizuda.snailjob.common.core.alarm.Alarm; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; -import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.NettyResult; @@ -95,7 +93,7 @@ public class SyncReport extends AbstractReport { private void sendMessage(Throwable e) { try { - ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene()); + ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene()); if (Objects.isNull(notify)) { return; } @@ -107,14 +105,13 @@ public class SyncReport extends AbstractReport { EnvironmentUtils.getActiveProfile(), NetUtil.getLocalIpStr(), snailJobProperties.getNamespace(), - SnailJobProperties.getGroup(), + snailJobProperties.getGroup(), LocalDateTime.now().format(formatter), e.getMessage()) - .title("同步上报异常:[{}]", SnailJobProperties.getGroup()) + .title("同步上报异常:[{}]", snailJobProperties.getGroup()) .notifyAttribute(recipient.getNotifyAttribute()); - Alarm alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType()); - alarmType.asyncSendMessage(context); + Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context)); } } catch (Exception e1) { diff --git a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/strategy/AbstractRetryStrategies.java b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/strategy/AbstractRetryStrategies.java index 3c661ff3..df666efe 100644 --- a/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/strategy/AbstractRetryStrategies.java +++ b/snail-job-client/snail-job-client-retry-core/src/main/java/com/aizuda/snailjob/client/core/strategy/AbstractRetryStrategies.java @@ -11,7 +11,6 @@ import com.aizuda.snailjob.client.core.intercepter.RetrySiteSnapshot; import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader; import com.aizuda.snailjob.client.core.retryer.RetryerInfo; import com.aizuda.snailjob.client.core.retryer.RetryerResultContext; -import com.aizuda.snailjob.common.core.alarm.Alarm; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; @@ -42,7 +41,7 @@ import java.util.function.Consumer; @Slf4j public abstract class AbstractRetryStrategies implements RetryStrategy { private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static String retryErrorMoreThresholdTextMessageFormatter = + private static String TEXT_MESSAGE_FORMATTER = "{}环境 重试组件异常 \n" + "> IP:{} \n" + "> 空间ID:{} \n" + @@ -176,23 +175,22 @@ public abstract class AbstractRetryStrategies implements RetryStrategy { private void sendMessage(Exception e) { try { - ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene()); + ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute(RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene()); if (Objects.nonNull(notify)) { List recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList()); for (final Recipient recipient : recipients) { AlarmContext context = AlarmContext.build() - .text(retryErrorMoreThresholdTextMessageFormatter, + .text(TEXT_MESSAGE_FORMATTER, EnvironmentUtils.getActiveProfile(), NetUtil.getLocalIpStr(), snailJobProperties.getNamespace(), - SnailJobProperties.getGroup(), + snailJobProperties.getGroup(), LocalDateTime.now().format(formatter), e.getMessage()) - .title("retry component handling exception:[{}]", SnailJobProperties.getGroup()) + .title("retry component handling exception:[{}]", snailJobProperties.getGroup()) .notifyAttribute(recipient.getNotifyAttribute()); - Alarm alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType()); - alarmType.asyncSendMessage(context); + Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context)); } } } catch (Exception e1) { diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index dc2117ac..55562d2a 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -55,11 +55,6 @@ public interface SystemConstants { */ String BEAT = "/beat"; - /** - * 同步配置 - */ - String CONFIG = "/config"; - /** * 批量上报 */ @@ -74,6 +69,37 @@ public interface SystemConstants { * 上报job的运行结果 */ String REPORT_JOB_DISPATCH_RESULT = "/report/dispatch/result"; + + /** + * 执行任务 + */ + String JOB_DISPATCH = "/job/dispatch/v1"; + + /** + * 停止任务 + */ + String JOB_STOP = "/job/stop/v1"; + + /** + * 同步配置 + */ + String SYNC_CONFIG = "/sync/version"; + + /** + * 重试分发 + */ + String RETRY_DISPATCH = "/retry/dispatch/v1"; + + /** + * 重试回调 + */ + String RETRY_CALLBACK = "/retry/callback/v1"; + + /** + * 获取重试幂等id + */ + String RETRY_GENERATE_IDEM_ID = "/retry/generate/idempotent-id/v1"; + } String LOGO = """ diff --git a/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/ConfigDTO.java b/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/ConfigDTO.java index cdfe5436..ed2e6752 100644 --- a/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/ConfigDTO.java +++ b/snail-job-common/snail-job-common-server-api/src/main/java/com/aizuda/snailjob/server/model/dto/ConfigDTO.java @@ -2,10 +2,10 @@ package com.aizuda.snailjob.server.model.dto; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.AlarmTypeEnum; +import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; import lombok.Data; import java.util.List; -import java.util.Set; /** * 同步的配置数据结构 @@ -59,9 +59,14 @@ public class ConfigDTO { private Integer notifyThreshold; /** - * 场景场景 {@link RetryNotifySceneEnum} + * 重试通知场景 {@link RetryNotifySceneEnum} */ - private Integer notifyScene; + private Integer retryNotifyScene; + + /** + * 定时任务&工作流通知场景 {@link JobNotifySceneEnum} + */ + private Integer jobNotifyScene; @Data public static class Recipient { diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java index 9cc31707..193673af 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/access/config/AbstractConfigAccess.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.template.datasource.access.config; +import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum; import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -169,16 +170,18 @@ public abstract class AbstractConfigAccess implements ConfigAccess { for (NotifyConfig notifyConfig : notifyList) { // 只选择客户端的通知配置即可 - RetryNotifySceneEnum notifyScene = RetryNotifySceneEnum.getNotifyScene(notifyConfig.getNotifyScene(), + RetryNotifySceneEnum retryNotifyScene = RetryNotifySceneEnum.getNotifyScene(notifyConfig.getNotifyScene(), NodeTypeEnum.CLIENT); - if (Objects.isNull(notifyScene)) { + JobNotifySceneEnum jobNotifyScene = JobNotifySceneEnum.getJobNotifyScene(notifyConfig.getNotifyScene(), + NodeTypeEnum.CLIENT); + if (Objects.isNull(retryNotifyScene) && Objects.isNull(jobNotifyScene)) { continue; } String recipientIds = notifyConfig.getRecipientIds(); List notifyRecipients = notifyRecipientMapper.selectBatchIds( JsonUtil.parseList(recipientIds, Long.class)); - notifies.add(getNotify(notifyConfig, notifyRecipients)); + notifies.add(getNotify(notifyConfig, notifyRecipients, retryNotifyScene, jobNotifyScene)); } configDTO.setNotifyList(notifies); @@ -197,7 +200,8 @@ public abstract class AbstractConfigAccess implements ConfigAccess { return configDTO; } - private static Notify getNotify(final NotifyConfig notifyConfig, final List notifyRecipients) { + private static Notify getNotify(final NotifyConfig notifyConfig, final List notifyRecipients, + final RetryNotifySceneEnum retryNotifyScene, final JobNotifySceneEnum jobNotifyScene) { List recipients = new ArrayList<>(); for (final NotifyRecipient notifyRecipient : notifyRecipients) { Recipient recipient = new Recipient(); @@ -207,7 +211,14 @@ public abstract class AbstractConfigAccess implements ConfigAccess { } Notify notify = new Notify(); - notify.setNotifyScene(notifyConfig.getNotifyScene()); + if (Objects.nonNull(retryNotifyScene)) { + notify.setRetryNotifyScene(retryNotifyScene.getNotifyScene()); + } + + if (Objects.nonNull(jobNotifyScene)) { + notify.setJobNotifyScene(jobNotifyScene.getNotifyScene()); + } + notify.setNotifyThreshold(notifyConfig.getNotifyThreshold()); notify.setRecipients(recipients); return notify; diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java new file mode 100644 index 00000000..164e4996 --- /dev/null +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java @@ -0,0 +1,23 @@ +package com.aizuda.snailjob.server.common.client; + +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.server.common.rpc.client.RequestMethod; +import com.aizuda.snailjob.server.common.rpc.client.annotation.Body; +import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; +import com.aizuda.snailjob.server.model.dto.ConfigDTO; + +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.SYNC_CONFIG; + +/** + * 调用客户端接口 + * + * @author: opensnail + * @date : 2023-06-19 15:40 + * @since sj_1.0.0 + */ +public interface CommonRpcClient { + + @Mapping(path = SYNC_CONFIG, method = RequestMethod.POST) + Result syncConfig(@Body ConfigDTO configDTO); + +} diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java index 07a72f30..a515f538 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java @@ -78,14 +78,6 @@ public class SystemProperties { */ private Callback callback = new Callback(); - /** - * 系统模式: - * RETRY: 分布式重试重 - * JOB: 分布式定时任务 - * ALL: 分布式重试重 && 分布式定时任务 - */ - private SystemModeEnum mode = SystemModeEnum.ALL; - /** * 回调配置 */ diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/ConfigSyncTask.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ConfigSyncTask.java similarity index 82% rename from snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/ConfigSyncTask.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ConfigSyncTask.java index 9de46b7f..5ea1ded5 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/ConfigSyncTask.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ConfigSyncTask.java @@ -1,4 +1,4 @@ -package com.aizuda.snailjob.server.retry.task.dto; +package com.aizuda.snailjob.server.common.dto; import lombok.Data; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/ConfigVersionSyncHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ConfigVersionSyncHandler.java similarity index 91% rename from snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/ConfigVersionSyncHandler.java rename to snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ConfigVersionSyncHandler.java index a697af17..dfc8a459 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/ConfigVersionSyncHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/handler/ConfigVersionSyncHandler.java @@ -1,14 +1,14 @@ -package com.aizuda.snailjob.server.retry.task.support.handler; +package com.aizuda.snailjob.server.common.handler; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.common.client.CommonRpcClient; +import com.aizuda.snailjob.server.common.dto.ConfigSyncTask; import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; import com.aizuda.snailjob.server.model.dto.ConfigDTO; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; -import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient; -import com.aizuda.snailjob.server.retry.task.dto.ConfigSyncTask; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -61,9 +61,9 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable { // 同步版本到每个客户端节点 for (final RegisterNodeInfo registerNodeInfo : serverNodeSet) { ConfigDTO configDTO = accessTemplate.getGroupConfigAccess().getConfigInfo(groupName, namespaceId); - RetryRpcClient rpcClient = RequestBuilder.newBuilder() + CommonRpcClient rpcClient = RequestBuilder.newBuilder() .nodeInfo(registerNodeInfo) - .client(RetryRpcClient.class) + .client(CommonRpcClient.class) .build(); SnailJobLog.LOCAL.info("同步结果 [{}]", rpcClient.syncConfig(configDTO)); } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/ConfigHttpRequestHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/ConfigHttpRequestHandler.java index 4a5cce92..5f0da2da 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/ConfigHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/ConfigHttpRequestHandler.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.common.rpc.server.handler; import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.handler.GetHttpRequestHandler; import com.aizuda.snailjob.server.model.dto.ConfigDTO; @@ -14,8 +15,6 @@ import io.netty.handler.codec.http.HttpMethod; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.CONFIG; - /** * @author: opensnail * @date : 2022-03-07 16:29 @@ -28,7 +27,7 @@ public class ConfigHttpRequestHandler extends GetHttpRequestHandler { @Override public boolean supports(String path) { - return CONFIG.equals(path); + return HTTP_PATH.SYNC_CONFIG.equals(path); } @Override diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/SnailJobServerJobTaskStarter.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/SnailJobServerJobTaskStarter.java index dc31364c..2ea7f27d 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/SnailJobServerJobTaskStarter.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/SnailJobServerJobTaskStarter.java @@ -1,6 +1,5 @@ package com.aizuda.snailjob.server.job.task; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -11,7 +10,6 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @ComponentScan("com.aizuda.snailjob.server.job.task.*") -@ConditionalOnExpression("'${snail-job.mode}'.equals('job') or '${snail-job.mode}'.equals('all')") public class SnailJobServerJobTaskStarter { } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java index bfce6d00..62627086 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java @@ -9,6 +9,9 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; import com.aizuda.snailjob.server.common.rpc.client.annotation.Body; import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP; + /** * 调用客户端接口 * @@ -18,10 +21,10 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; */ public interface JobRpcClient { - @Mapping(path = "/job/stop/v1", method = RequestMethod.POST) + @Mapping(path = JOB_STOP, method = RequestMethod.POST) Result stop(@Body StopJobDTO stopJobDTO); - @Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST) + @Mapping(path = JOB_DISPATCH, method = RequestMethod.POST) Result dispatch(@Body DispatchJobRequest dispatchJobRequest); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/SnailJobServerRetryTaskStarter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/SnailJobServerRetryTaskStarter.java index 8b3d388d..eca55f31 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/SnailJobServerRetryTaskStarter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/SnailJobServerRetryTaskStarter.java @@ -1,6 +1,5 @@ package com.aizuda.snailjob.server.retry.task; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -11,7 +10,6 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @ComponentScan("com.aizuda.snailjob.server.retry.task.*") -@ConditionalOnExpression("'${snail-job.mode}'.equals('retry') or '${snail-job.mode}'.equals('all')") public class SnailJobServerRetryTaskStarter { } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java index 9a26417f..5e2faebf 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/client/RetryRpcClient.java @@ -15,6 +15,10 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Body; import com.aizuda.snailjob.server.common.rpc.client.annotation.Header; import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_CALLBACK; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_DISPATCH; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_GENERATE_IDEM_ID; + /** * 调用客户端接口 * @@ -24,16 +28,13 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; */ public interface RetryRpcClient { - @Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST) + @Mapping(path = RETRY_DISPATCH, method = RequestMethod.POST) Result dispatch(@Body DispatchRetryDTO dispatchRetryDTO, @Header SnailJobHeaders headers); - @Mapping(path = "/retry/callback/v1", method = RequestMethod.POST) + @Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST) Result callback(@Body RetryCallbackDTO retryCallbackDTO); - @Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST) + @Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST) Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO); - @Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST) - Result syncConfig(@Body ConfigDTO configDTO); - } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/ReportRetryInfoHttpRequestHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/ReportRetryInfoHttpRequestHandler.java index 04f53881..08d898ba 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/ReportRetryInfoHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/handler/ReportRetryInfoHttpRequestHandler.java @@ -11,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.enums.TaskGeneratorSceneEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; +import com.aizuda.snailjob.server.common.handler.ConfigVersionSyncHandler; import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; import com.aizuda.snailjob.server.model.dto.RetryTaskDTO; import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext; diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java index a221b1d0..67cc2f24 100644 --- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java @@ -65,47 +65,51 @@ public class ConsumerBucketActor extends AbstractActor { return; } - if (SystemModeEnum.isJob(systemProperties.getMode())) { + // 扫描job && workflow + doScanJobAndWorkflow(consumerBucket); - ScanTask scanTask = new ScanTask(); - scanTask.setBuckets(consumerBucket.getBuckets()); + // 扫描重试 + doScanRetry(consumerBucket); + } - // 扫描定时任务数据 - ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, SyetemTaskTypeEnum.JOB); - scanJobActorRef.tell(scanTask, scanJobActorRef); - - // 扫描DAG工作流任务数据 - ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, SyetemTaskTypeEnum.WORKFLOW); - scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef); + private void doScanRetry(final ConsumerBucket consumerBucket) { + List groupConfigs = null; + try { + // 查询桶对应组信息 + groupConfigs = accessTemplate.getGroupConfigAccess().list( + new LambdaQueryWrapper() + .select(GroupConfig::getGroupName, GroupConfig::getGroupPartition, GroupConfig::getNamespaceId) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + .in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()) + ); + } catch (Exception e) { + SnailJobLog.LOCAL.error("生成重试任务异常.", e); } - if (SystemModeEnum.isRetry(systemProperties.getMode())) { - List groupConfigs = null; - try { - // 查询桶对应组信息 - groupConfigs = accessTemplate.getGroupConfigAccess().list( - new LambdaQueryWrapper() - .select(GroupConfig::getGroupName, GroupConfig::getGroupPartition, GroupConfig::getNamespaceId) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) - .in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()) - ); - } catch (Exception e) { - SnailJobLog.LOCAL.error("生成重试任务异常.", e); - } - - if (!CollectionUtils.isEmpty(groupConfigs)) { - for (final GroupConfig groupConfig : groupConfigs) { - CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName(), groupConfig.getNamespaceId()); - ScanTask scanTask = new ScanTask(); - scanTask.setNamespaceId(groupConfig.getNamespaceId()); - scanTask.setGroupName(groupConfig.getGroupName()); - scanTask.setBuckets(consumerBucket.getBuckets()); - scanTask.setGroupPartition(groupConfig.getGroupPartition()); - produceScanActorTask(scanTask); - } + if (!CollectionUtils.isEmpty(groupConfigs)) { + for (final GroupConfig groupConfig : groupConfigs) { + CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName(), groupConfig.getNamespaceId()); + ScanTask scanTask = new ScanTask(); + scanTask.setNamespaceId(groupConfig.getNamespaceId()); + scanTask.setGroupName(groupConfig.getGroupName()); + scanTask.setBuckets(consumerBucket.getBuckets()); + scanTask.setGroupPartition(groupConfig.getGroupPartition()); + produceScanActorTask(scanTask); } } + } + private void doScanJobAndWorkflow(final ConsumerBucket consumerBucket) { + ScanTask scanTask = new ScanTask(); + scanTask.setBuckets(consumerBucket.getBuckets()); + + // 扫描定时任务数据 + ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, SyetemTaskTypeEnum.JOB); + scanJobActorRef.tell(scanTask, scanJobActorRef); + + // 扫描DAG工作流任务数据 + ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, SyetemTaskTypeEnum.WORKFLOW); + scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef); } /** diff --git a/snail-job-server/snail-job-server-starter/src/main/resources/application.yml b/snail-job-server/snail-job-server-starter/src/main/resources/application.yml index fab22bc8..9b0e787a 100644 --- a/snail-job-server/snail-job-server-starter/src/main/resources/application.yml +++ b/snail-job-server/snail-job-server-starter/src/main/resources/application.yml @@ -74,5 +74,4 @@ snail-job: callback: # 回调配置 max-count: 288 #回调最大执行次数 trigger-interval: 900 #间隔时间 - mode: all retry-max-pull-count: 10 diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/GroupConfigServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/GroupConfigServiceImpl.java index 9375fee1..d73662a5 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/GroupConfigServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/GroupConfigServiceImpl.java @@ -7,9 +7,8 @@ import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.enums.IdGeneratorModeEnum; -import com.aizuda.snailjob.server.common.enums.SystemModeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.retry.task.support.handler.ConfigVersionSyncHandler; +import com.aizuda.snailjob.server.common.handler.ConfigVersionSyncHandler; import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.request.GroupConfigQueryVO; import com.aizuda.snailjob.server.web.model.request.GroupConfigRequestVO; @@ -33,15 +32,11 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.aizuda.snailjob.template.datasource.persistence.po.SequenceAlloc; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; import com.aizuda.snailjob.template.datasource.utils.DbUtils; -import com.aizuda.snailjob.server.retry.task.support.handler.ConfigVersionSyncHandler; -import com.aizuda.snailjob.server.web.service.convert.GroupConfigConverter; -import com.baomidou.mybatisplus.autoconfigure.MybatisPlusProperties; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.core.JdbcTemplate; @@ -159,15 +154,12 @@ public class GroupConfigServiceImpl implements GroupConfigService { () -> new SnailJobServerException("exception occurred while adding group. groupConfigVO[{}]", groupConfigRequestVO)); - if (SystemModeEnum.isRetry(systemProperties.getMode())) { - // 同步版本, 版本为0代表需要同步到客户端 - boolean add = configVersionSyncHandler.addSyncTask(groupName, namespaceId, 0); - // 若添加失败则强制发起同步 - if (!add) { - configVersionSyncHandler.syncVersion(groupName, namespaceId); - } + // 同步版本, 版本为0代表需要同步到客户端 + boolean add = configVersionSyncHandler.addSyncTask(groupName, namespaceId, 0); + // 若添加失败则强制发起同步 + if (!add) { + configVersionSyncHandler.syncVersion(groupName, namespaceId); } - return Boolean.TRUE; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SystemUserServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SystemUserServiceImpl.java index 32805196..71beff18 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SystemUserServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/SystemUserServiceImpl.java @@ -68,7 +68,6 @@ public class SystemUserServiceImpl implements SystemUserService { SystemUserResponseVO systemUserResponseVO = SystemUserResponseVOConverter.INSTANCE.convert(systemUser); systemUserResponseVO.setToken(token); - systemUserResponseVO.setMode(systemProperties.getMode().name()); getPermission(systemUser.getRole(), systemUser.getId(), systemUserResponseVO); @@ -107,7 +106,6 @@ public class SystemUserServiceImpl implements SystemUserService { @Override public SystemUserResponseVO getUserInfo(UserSessionVO systemUser) { SystemUserResponseVO systemUserResponseVO = SystemUserResponseVOConverter.INSTANCE.convert(systemUser); - systemUserResponseVO.setMode(systemProperties.getMode().name()); getPermission(systemUser.getRole(), systemUser.getId(), systemUserResponseVO);