feat(sj_1.0.0): 定时任务客户端接入告警
This commit is contained in:
parent
e98946580e
commit
d5b5c1cef2
@ -17,7 +17,7 @@ import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
|
||||
*/
|
||||
public interface NettyClient {
|
||||
|
||||
@Mapping(method = RequestMethod.GET, path = HTTP_PATH.CONFIG)
|
||||
@Mapping(method = RequestMethod.GET, path = HTTP_PATH.SYNC_CONFIG)
|
||||
Result getConfig(Integer version);
|
||||
|
||||
@Mapping(method = RequestMethod.GET, path = HTTP_PATH.BEAT)
|
||||
|
@ -8,16 +8,12 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.common.core.model.NettyResult;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
|
||||
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
@ -25,7 +21,6 @@ import java.util.Set;
|
||||
*/
|
||||
@Component
|
||||
@Order
|
||||
@Slf4j
|
||||
public class GroupVersionCache implements Lifecycle {
|
||||
|
||||
private static ConfigDTO CONFIG;
|
||||
@ -60,10 +55,22 @@ public class GroupVersionCache implements Lifecycle {
|
||||
return SystemConstants.DEFAULT_DDL;
|
||||
}
|
||||
|
||||
public static ConfigDTO.Notify getNotifyAttribute(Integer notifyScene) {
|
||||
public static ConfigDTO.Notify getRetryNotifyAttribute(Integer notifyScene) {
|
||||
List<ConfigDTO.Notify> notifyList = CONFIG.getNotifyList();
|
||||
for (ConfigDTO.Notify notify : notifyList) {
|
||||
if (notify.getNotifyScene().equals(notifyScene)) {
|
||||
if (notify.getRetryNotifyScene().equals(notifyScene)) {
|
||||
return notify;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public static ConfigDTO.Notify getJobNotifyAttribute(Integer notifyScene) {
|
||||
List<ConfigDTO.Notify> notifyList = CONFIG.getNotifyList();
|
||||
for (ConfigDTO.Notify notify : notifyList) {
|
||||
if (notify.getJobNotifyScene().equals(notifyScene)) {
|
||||
return notify;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,30 @@
|
||||
package com.aizuda.snailjob.client.common.client;
|
||||
|
||||
import com.aizuda.snailjob.client.common.annotation.Mapping;
|
||||
import com.aizuda.snailjob.client.common.annotation.SnailEndPoint;
|
||||
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
|
||||
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
|
||||
import com.aizuda.snailjob.common.core.model.Result;
|
||||
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.SYNC_CONFIG;
|
||||
|
||||
/**
|
||||
* SnailJob 通用EndPoint
|
||||
*
|
||||
* @author: opensnail
|
||||
* @date : 2022-03-09 16:33
|
||||
*/
|
||||
@SnailEndPoint
|
||||
public class SnailJobCommonEndPoint {
|
||||
|
||||
/**
|
||||
* 同步版本
|
||||
*/
|
||||
@Mapping(path = SYNC_CONFIG, method = RequestMethod.POST)
|
||||
public Result syncVersion(ConfigDTO configDTO) {
|
||||
GroupVersionCache.setConfig(configDTO);
|
||||
return new Result();
|
||||
}
|
||||
|
||||
}
|
@ -172,11 +172,6 @@ public class SnailJobProperties {
|
||||
private SlidingWindowConfig reportSlidingWindow = new SlidingWindowConfig();
|
||||
}
|
||||
|
||||
public static String getGroup() {
|
||||
SnailJobProperties properties = SpringContext.getBean(SnailJobProperties.class);
|
||||
return Objects.requireNonNull(properties).group;
|
||||
}
|
||||
|
||||
/**
|
||||
* 邮件配置
|
||||
*/
|
||||
|
@ -174,7 +174,7 @@ public class NettyChannel {
|
||||
.set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes())
|
||||
.set(HeadersEnum.HOST_ID.getKey(), HOST_ID)
|
||||
.set(HeadersEnum.HOST_IP.getKey(), getClientHost())
|
||||
.set(HeadersEnum.GROUP_NAME.getKey(), SnailJobProperties.getGroup())
|
||||
.set(HeadersEnum.GROUP_NAME.getKey(), snailJobProperties.getGroup())
|
||||
.set(HeadersEnum.HOST_PORT.getKey(), getClientPort())
|
||||
.set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion())
|
||||
.set(HeadersEnum.HOST.getKey(), serverConfig.getHost())
|
||||
|
@ -29,6 +29,9 @@ import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2023-09-27 16:33
|
||||
@ -36,7 +39,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||
@SnailEndPoint
|
||||
public class JobEndPoint {
|
||||
|
||||
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = JOB_DISPATCH, method = RequestMethod.POST)
|
||||
public Result<Boolean> dispatchJob(DispatchJobRequest dispatchJob) {
|
||||
|
||||
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
|
||||
@ -116,7 +119,7 @@ public class JobEndPoint {
|
||||
return jobContext;
|
||||
}
|
||||
|
||||
@Mapping(path = "/job/stop/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = JOB_STOP, method = RequestMethod.POST)
|
||||
public Result<Boolean> stopJob(StopJobDTO interruptJob) {
|
||||
|
||||
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
|
||||
|
@ -1,23 +0,0 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* @author opensnail
|
||||
* @date 2023-10-08 22:48:44
|
||||
* @since 2.4.0
|
||||
*/
|
||||
public class JobExecutorCallable implements Callable<ExecuteResult> {
|
||||
|
||||
public JobExecutorCallable(ExecuteResult executeResult) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteResult call() throws Exception {
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,5 +1,8 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
|
||||
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
|
||||
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
|
||||
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
|
||||
import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache;
|
||||
@ -7,19 +10,30 @@ import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
|
||||
import com.aizuda.snailjob.client.job.core.log.JobLogMeta;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest;
|
||||
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
|
||||
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
|
||||
import com.aizuda.snailjob.common.core.context.SpringContext;
|
||||
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
|
||||
import com.aizuda.snailjob.common.core.util.NetUtil;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
import com.aizuda.snailjob.common.core.model.NettyResult;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
|
||||
import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
|
||||
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
|
||||
import com.aizuda.snailjob.server.model.dto.ConfigDTO.Notify.Recipient;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CancellationException;
|
||||
|
||||
/**
|
||||
@ -30,9 +44,20 @@ import java.util.concurrent.CancellationException;
|
||||
@Slf4j
|
||||
public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult> {
|
||||
|
||||
private static final String TEXT_MESSAGE_FORMATTER = """
|
||||
<font face="微软雅黑" color=#ff0000 size=4>{}环境 执行结果上报异常</font> \s
|
||||
> IP:{} \s
|
||||
> 空间ID:{} \s
|
||||
> 名称:{} \s
|
||||
> 时间:{} \s
|
||||
> 结果:{} \s
|
||||
> 异常:{} \s
|
||||
\s""";
|
||||
|
||||
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
|
||||
.client(JobNettyClient.class)
|
||||
.callback(nettyResult -> SnailJobLog.LOCAL.info("Job execute result report successfully requestId:[{}]", nettyResult.getRequestId())).build();
|
||||
.client(JobNettyClient.class)
|
||||
.callback(nettyResult -> SnailJobLog.LOCAL.info("Job execute result report successfully requestId:[{}]",
|
||||
nettyResult.getRequestId())).build();
|
||||
|
||||
private final JobContext jobContext;
|
||||
|
||||
@ -65,6 +90,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus));
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
|
||||
sendMessage(result, e);
|
||||
} finally {
|
||||
SnailJobLogManager.removeLogMeta();
|
||||
stopThreadPool();
|
||||
@ -73,7 +99,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
|
||||
@Override
|
||||
public void onFailure(final Throwable t) {
|
||||
|
||||
ExecuteResult failure = ExecuteResult.failure();
|
||||
try {
|
||||
// 初始化调度信息(日志上报LogUtil)
|
||||
initLogContext();
|
||||
@ -81,7 +107,6 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
// 上报执行失败
|
||||
SnailJobLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
|
||||
|
||||
ExecuteResult failure = ExecuteResult.failure();
|
||||
if (t instanceof CancellationException) {
|
||||
failure.setMessage("任务被取消");
|
||||
} else {
|
||||
@ -93,6 +118,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
);
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
|
||||
sendMessage(failure, e);
|
||||
} finally {
|
||||
SnailJobLogManager.removeLogMeta();
|
||||
stopThreadPool();
|
||||
@ -132,4 +158,38 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
|
||||
return dispatchJobRequest;
|
||||
}
|
||||
|
||||
private void sendMessage(final ExecuteResult result, Exception e) {
|
||||
|
||||
try {
|
||||
SnailJobProperties snailJobProperties = SpringContext.getBean(SnailJobProperties.class);
|
||||
if (Objects.isNull(snailJobProperties)) {
|
||||
return;
|
||||
}
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getJobNotifyAttribute(
|
||||
JobNotifySceneEnum.JOB_CLIENT_ERROR.getNotifyScene());
|
||||
if (Objects.nonNull(notify)) {
|
||||
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
|
||||
for (final Recipient recipient : recipients) {
|
||||
AlarmContext context = AlarmContext.build()
|
||||
.text(TEXT_MESSAGE_FORMATTER,
|
||||
EnvironmentUtils.getActiveProfile(),
|
||||
NetUtil.getLocalIpStr(),
|
||||
snailJobProperties.getNamespace(),
|
||||
snailJobProperties.getGroup(),
|
||||
LocalDateTime.now().format(DatePattern.NORM_DATETIME_FORMATTER),
|
||||
JsonUtil.toJsonString(result),
|
||||
e.getMessage())
|
||||
.title("定时任务执行结果上报异常:[{}]", snailJobProperties.getGroup())
|
||||
.notifyAttribute(recipient.getNotifyAttribute());
|
||||
|
||||
Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context));
|
||||
}
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
SnailJobLog.LOCAL.error("Client failed to send component exception alert.", e1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import com.aizuda.snailjob.client.model.DispatchRetryDTO;
|
||||
import com.aizuda.snailjob.client.model.DispatchRetryResultDTO;
|
||||
import com.aizuda.snailjob.client.model.GenerateRetryIdempotentIdDTO;
|
||||
import com.aizuda.snailjob.client.model.RetryCallbackDTO;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
|
||||
import com.aizuda.snailjob.common.core.context.SpringContext;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryResultStatusEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
|
||||
@ -46,6 +47,10 @@ import java.lang.reflect.Method;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_CALLBACK;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_DISPATCH;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_GENERATE_IDEM_ID;
|
||||
|
||||
/**
|
||||
* 服务端调调用客户端进行重试流量下发、配置变更通知等操作
|
||||
*
|
||||
@ -62,7 +67,7 @@ public class SnailRetryEndPoint {
|
||||
/**
|
||||
* 服务端调度重试入口
|
||||
*/
|
||||
@Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = RETRY_DISPATCH, method = RequestMethod.POST)
|
||||
public Result<DispatchRetryResultDTO> dispatch(DispatchRetryDTO executeReqDto) {
|
||||
|
||||
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
|
||||
@ -147,16 +152,8 @@ public class SnailRetryEndPoint {
|
||||
return new Result<>(executeRespDto);
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步版本
|
||||
*/
|
||||
@Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST)
|
||||
public Result syncVersion(ConfigDTO configDTO) {
|
||||
GroupVersionCache.setConfig(configDTO);
|
||||
return new Result();
|
||||
}
|
||||
|
||||
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
|
||||
public Result callback(RetryCallbackDTO callbackDTO) {
|
||||
|
||||
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
|
||||
@ -274,7 +271,7 @@ public class SnailRetryEndPoint {
|
||||
* @param generateRetryIdempotentIdDTO 生成idempotentId模型
|
||||
* @return idempotentId
|
||||
*/
|
||||
@Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST)
|
||||
public Result<String> idempotentIdGenerate(
|
||||
GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {
|
||||
|
||||
|
@ -10,7 +10,6 @@ import com.aizuda.snailjob.client.core.cache.RetryerInfoCache;
|
||||
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
|
||||
import com.aizuda.snailjob.client.core.retryer.RetryerResultContext;
|
||||
import com.aizuda.snailjob.client.core.strategy.RetryStrategy;
|
||||
import com.aizuda.snailjob.common.core.alarm.Alarm;
|
||||
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
|
||||
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
|
||||
import com.aizuda.snailjob.common.core.context.SpringContext;
|
||||
@ -52,18 +51,18 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
|
||||
|
||||
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
private static String retryErrorMoreThresholdTextMessageFormatter =
|
||||
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试组件异常</font> \n" +
|
||||
"> IP:{} \n" +
|
||||
"> 空间ID:{} \n" +
|
||||
"> 名称:{} \n" +
|
||||
"> 时间:{} \n" +
|
||||
"> 异常:{} \n";
|
||||
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试组件异常</font> \n" +
|
||||
"> IP:{} \n" +
|
||||
"> 空间ID:{} \n" +
|
||||
"> 名称:{} \n" +
|
||||
"> 时间:{} \n" +
|
||||
"> 异常:{} \n";
|
||||
|
||||
private final StandardEnvironment standardEnvironment;
|
||||
private final RetryStrategy retryStrategy;
|
||||
|
||||
public SnailRetryInterceptor(StandardEnvironment standardEnvironment,
|
||||
RetryStrategy localRetryStrategies) {
|
||||
RetryStrategy localRetryStrategies) {
|
||||
this.standardEnvironment = standardEnvironment;
|
||||
this.retryStrategy = localRetryStrategies;
|
||||
}
|
||||
@ -165,7 +164,8 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
|
||||
} else if (!validate(throwable, RetryerInfoCache.get(retryable.scene(), executorClassName))) {
|
||||
SnailJobLog.LOCAL.debug("Exception mismatch. traceId:[{}]", traceId);
|
||||
} else {
|
||||
SnailJobLog.LOCAL.debug("Unknown situations do not enable local retry scenarios. traceId:[{}]", traceId);
|
||||
SnailJobLog.LOCAL.debug("Unknown situations do not enable local retry scenarios. traceId:[{}]",
|
||||
traceId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -184,9 +184,11 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
|
||||
RetryerResultContext context = retryStrategy.openRetry(retryable.scene(), executorClassName,
|
||||
point.getArguments());
|
||||
if (RetryResultStatusEnum.SUCCESS.getStatus().equals(context.getRetryResultStatusEnum().getStatus())) {
|
||||
SnailJobLog.LOCAL.debug("local retry successful. traceId:[{}] result:[{}]", traceId, context.getResult());
|
||||
SnailJobLog.LOCAL.debug("local retry successful. traceId:[{}] result:[{}]", traceId,
|
||||
context.getResult());
|
||||
} else {
|
||||
SnailJobLog.LOCAL.debug("local retry result. traceId:[{}] throwable:[{}]", traceId, context.getThrowable());
|
||||
SnailJobLog.LOCAL.debug("local retry result. traceId:[{}] throwable:[{}]", traceId,
|
||||
context.getThrowable());
|
||||
}
|
||||
|
||||
return context;
|
||||
@ -215,24 +217,28 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
|
||||
private void sendMessage(Exception e) {
|
||||
|
||||
try {
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(
|
||||
RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute(
|
||||
RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
|
||||
if (Objects.nonNull(notify)) {
|
||||
SnailJobProperties snailJobProperties = SpringContext.getBean(SnailJobProperties.class);
|
||||
if (Objects.isNull(snailJobProperties)) {
|
||||
return;
|
||||
}
|
||||
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
|
||||
for (final Recipient recipient : recipients) {
|
||||
AlarmContext context = AlarmContext.build()
|
||||
.text(retryErrorMoreThresholdTextMessageFormatter,
|
||||
EnvironmentUtils.getActiveProfile(),
|
||||
NetUtil.getLocalIpStr(),
|
||||
standardEnvironment.getProperty("snail-job.namespace", StrUtil.EMPTY),
|
||||
SnailJobProperties.getGroup(),
|
||||
snailJobProperties.getNamespace(),
|
||||
snailJobProperties.getGroup(),
|
||||
LocalDateTime.now().format(formatter),
|
||||
e.getMessage())
|
||||
.title("retry component handling exception:[{}]", SnailJobProperties.getGroup())
|
||||
.title("retry component handling exception:[{}]", snailJobProperties.getGroup())
|
||||
.notifyAttribute(recipient.getNotifyAttribute());
|
||||
|
||||
Alarm<AlarmContext> alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType());
|
||||
alarmType.asyncSendMessage(context);
|
||||
Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType()))
|
||||
.ifPresent(alarm -> alarm.asyncSendMessage(context));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,7 +14,9 @@ import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.common.core.model.IdempotentIdContext;
|
||||
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
@ -28,6 +30,8 @@ import java.lang.reflect.Method;
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractReport implements Report {
|
||||
@Autowired
|
||||
protected SnailJobProperties snailJobProperties;
|
||||
|
||||
@Override
|
||||
public boolean report(String scene, final String targetClassName, final Object[] params) {
|
||||
@ -76,7 +80,7 @@ public abstract class AbstractReport implements Report {
|
||||
retryTaskDTO.setIdempotentId(idempotentId);
|
||||
retryTaskDTO.setExecutorName(targetClassName);
|
||||
retryTaskDTO.setArgsStr(serialize);
|
||||
retryTaskDTO.setGroupName(SnailJobProperties.getGroup());
|
||||
retryTaskDTO.setGroupName(snailJobProperties.getGroup());
|
||||
retryTaskDTO.setSceneName(scene);
|
||||
|
||||
String expression = retryerInfo.getBizNo();
|
||||
|
@ -20,9 +20,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* @since 1.3.0
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class AsyncReport extends AbstractReport implements Lifecycle {
|
||||
private final SnailJobProperties snailJobProperties;
|
||||
private SlidingWindow<RetryTaskDTO> slidingWindow;
|
||||
@Override
|
||||
public boolean supports(boolean async) {
|
||||
|
@ -7,7 +7,6 @@ import com.aizuda.snailjob.client.core.RetryExecutorParameter;
|
||||
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
|
||||
import com.aizuda.snailjob.client.common.client.NettyClient;
|
||||
import com.aizuda.snailjob.client.core.executor.GuavaRetryExecutor;
|
||||
import com.aizuda.snailjob.common.core.alarm.Alarm;
|
||||
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
|
||||
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
|
||||
import com.aizuda.snailjob.common.core.context.SpringContext;
|
||||
@ -110,12 +109,15 @@ public class ReportListener implements Listener<RetryTaskDTO> {
|
||||
private void sendMessage(Throwable e) {
|
||||
|
||||
try {
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
|
||||
if (Objects.isNull(notify)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SnailJobProperties properties = SpringContext.getBean(SnailJobProperties.class);
|
||||
if (Objects.isNull(properties)) {
|
||||
return;
|
||||
}
|
||||
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
|
||||
for (final Recipient recipient : recipients) {
|
||||
AlarmContext context = AlarmContext.build()
|
||||
@ -123,14 +125,12 @@ public class ReportListener implements Listener<RetryTaskDTO> {
|
||||
EnvironmentUtils.getActiveProfile(),
|
||||
NetUtil.getLocalIpStr(),
|
||||
properties.getNamespace(),
|
||||
SnailJobProperties.getGroup(),
|
||||
properties.getGroup(),
|
||||
LocalDateTime.now().format(formatter),
|
||||
e.getMessage())
|
||||
.title("上报异常:[{}]", SnailJobProperties.getGroup())
|
||||
.title("上报异常:[{}]", properties.getGroup())
|
||||
.notifyAttribute(recipient.getNotifyAttribute());
|
||||
|
||||
Alarm<AlarmContext> alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType());
|
||||
alarmType.asyncSendMessage(context);
|
||||
Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context));
|
||||
}
|
||||
|
||||
} catch (Exception e1) {
|
||||
|
@ -5,10 +5,8 @@ import com.aizuda.snailjob.client.common.config.SnailJobProperties;
|
||||
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
|
||||
import com.aizuda.snailjob.client.common.client.NettyClient;
|
||||
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
|
||||
import com.aizuda.snailjob.common.core.alarm.Alarm;
|
||||
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
|
||||
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
|
||||
import com.aizuda.snailjob.common.core.context.SpringContext;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.common.core.model.NettyResult;
|
||||
@ -95,7 +93,7 @@ public class SyncReport extends AbstractReport {
|
||||
private void sendMessage(Throwable e) {
|
||||
|
||||
try {
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute(RetryNotifySceneEnum.CLIENT_REPORT_ERROR.getNotifyScene());
|
||||
if (Objects.isNull(notify)) {
|
||||
return;
|
||||
}
|
||||
@ -107,14 +105,13 @@ public class SyncReport extends AbstractReport {
|
||||
EnvironmentUtils.getActiveProfile(),
|
||||
NetUtil.getLocalIpStr(),
|
||||
snailJobProperties.getNamespace(),
|
||||
SnailJobProperties.getGroup(),
|
||||
snailJobProperties.getGroup(),
|
||||
LocalDateTime.now().format(formatter),
|
||||
e.getMessage())
|
||||
.title("同步上报异常:[{}]", SnailJobProperties.getGroup())
|
||||
.title("同步上报异常:[{}]", snailJobProperties.getGroup())
|
||||
.notifyAttribute(recipient.getNotifyAttribute());
|
||||
|
||||
Alarm<AlarmContext> alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType());
|
||||
alarmType.asyncSendMessage(context);
|
||||
Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context));
|
||||
}
|
||||
|
||||
} catch (Exception e1) {
|
||||
|
@ -11,7 +11,6 @@ import com.aizuda.snailjob.client.core.intercepter.RetrySiteSnapshot;
|
||||
import com.aizuda.snailjob.client.core.loader.SnailRetrySpiLoader;
|
||||
import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
|
||||
import com.aizuda.snailjob.client.core.retryer.RetryerResultContext;
|
||||
import com.aizuda.snailjob.common.core.alarm.Alarm;
|
||||
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
|
||||
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
|
||||
@ -42,7 +41,7 @@ import java.util.function.Consumer;
|
||||
@Slf4j
|
||||
public abstract class AbstractRetryStrategies implements RetryStrategy {
|
||||
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
private static String retryErrorMoreThresholdTextMessageFormatter =
|
||||
private static String TEXT_MESSAGE_FORMATTER =
|
||||
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试组件异常</font> \n" +
|
||||
"> IP:{} \n" +
|
||||
"> 空间ID:{} \n" +
|
||||
@ -176,23 +175,22 @@ public abstract class AbstractRetryStrategies implements RetryStrategy {
|
||||
private void sendMessage(Exception e) {
|
||||
|
||||
try {
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getNotifyAttribute(RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
|
||||
ConfigDTO.Notify notify = GroupVersionCache.getRetryNotifyAttribute(RetryNotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
|
||||
if (Objects.nonNull(notify)) {
|
||||
List<Recipient> recipients = Optional.ofNullable(notify.getRecipients()).orElse(Lists.newArrayList());
|
||||
|
||||
for (final Recipient recipient : recipients) {
|
||||
AlarmContext context = AlarmContext.build()
|
||||
.text(retryErrorMoreThresholdTextMessageFormatter,
|
||||
.text(TEXT_MESSAGE_FORMATTER,
|
||||
EnvironmentUtils.getActiveProfile(),
|
||||
NetUtil.getLocalIpStr(),
|
||||
snailJobProperties.getNamespace(),
|
||||
SnailJobProperties.getGroup(),
|
||||
snailJobProperties.getGroup(),
|
||||
LocalDateTime.now().format(formatter),
|
||||
e.getMessage())
|
||||
.title("retry component handling exception:[{}]", SnailJobProperties.getGroup())
|
||||
.title("retry component handling exception:[{}]", snailJobProperties.getGroup())
|
||||
.notifyAttribute(recipient.getNotifyAttribute());
|
||||
Alarm<AlarmContext> alarmType = SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType());
|
||||
alarmType.asyncSendMessage(context);
|
||||
Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipient.getNotifyType())).ifPresent(alarm -> alarm.asyncSendMessage(context));
|
||||
}
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
|
@ -55,11 +55,6 @@ public interface SystemConstants {
|
||||
*/
|
||||
String BEAT = "/beat";
|
||||
|
||||
/**
|
||||
* 同步配置
|
||||
*/
|
||||
String CONFIG = "/config";
|
||||
|
||||
/**
|
||||
* 批量上报
|
||||
*/
|
||||
@ -74,6 +69,37 @@ public interface SystemConstants {
|
||||
* 上报job的运行结果
|
||||
*/
|
||||
String REPORT_JOB_DISPATCH_RESULT = "/report/dispatch/result";
|
||||
|
||||
/**
|
||||
* 执行任务
|
||||
*/
|
||||
String JOB_DISPATCH = "/job/dispatch/v1";
|
||||
|
||||
/**
|
||||
* 停止任务
|
||||
*/
|
||||
String JOB_STOP = "/job/stop/v1";
|
||||
|
||||
/**
|
||||
* 同步配置
|
||||
*/
|
||||
String SYNC_CONFIG = "/sync/version";
|
||||
|
||||
/**
|
||||
* 重试分发
|
||||
*/
|
||||
String RETRY_DISPATCH = "/retry/dispatch/v1";
|
||||
|
||||
/**
|
||||
* 重试回调
|
||||
*/
|
||||
String RETRY_CALLBACK = "/retry/callback/v1";
|
||||
|
||||
/**
|
||||
* 获取重试幂等id
|
||||
*/
|
||||
String RETRY_GENERATE_IDEM_ID = "/retry/generate/idempotent-id/v1";
|
||||
|
||||
}
|
||||
|
||||
String LOGO = """
|
||||
|
@ -2,10 +2,10 @@ package com.aizuda.snailjob.server.model.dto;
|
||||
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.AlarmTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
|
||||
import lombok.Data;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 同步的配置数据结构
|
||||
@ -59,9 +59,14 @@ public class ConfigDTO {
|
||||
private Integer notifyThreshold;
|
||||
|
||||
/**
|
||||
* 场景场景 {@link RetryNotifySceneEnum}
|
||||
* 重试通知场景 {@link RetryNotifySceneEnum}
|
||||
*/
|
||||
private Integer notifyScene;
|
||||
private Integer retryNotifyScene;
|
||||
|
||||
/**
|
||||
* 定时任务&工作流通知场景 {@link JobNotifySceneEnum}
|
||||
*/
|
||||
private Integer jobNotifyScene;
|
||||
|
||||
@Data
|
||||
public static class Recipient {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.aizuda.snailjob.template.datasource.access.config;
|
||||
|
||||
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
@ -169,16 +170,18 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
|
||||
for (NotifyConfig notifyConfig : notifyList) {
|
||||
|
||||
// 只选择客户端的通知配置即可
|
||||
RetryNotifySceneEnum notifyScene = RetryNotifySceneEnum.getNotifyScene(notifyConfig.getNotifyScene(),
|
||||
RetryNotifySceneEnum retryNotifyScene = RetryNotifySceneEnum.getNotifyScene(notifyConfig.getNotifyScene(),
|
||||
NodeTypeEnum.CLIENT);
|
||||
if (Objects.isNull(notifyScene)) {
|
||||
JobNotifySceneEnum jobNotifyScene = JobNotifySceneEnum.getJobNotifyScene(notifyConfig.getNotifyScene(),
|
||||
NodeTypeEnum.CLIENT);
|
||||
if (Objects.isNull(retryNotifyScene) && Objects.isNull(jobNotifyScene)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String recipientIds = notifyConfig.getRecipientIds();
|
||||
List<NotifyRecipient> notifyRecipients = notifyRecipientMapper.selectBatchIds(
|
||||
JsonUtil.parseList(recipientIds, Long.class));
|
||||
notifies.add(getNotify(notifyConfig, notifyRecipients));
|
||||
notifies.add(getNotify(notifyConfig, notifyRecipients, retryNotifyScene, jobNotifyScene));
|
||||
}
|
||||
|
||||
configDTO.setNotifyList(notifies);
|
||||
@ -197,7 +200,8 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
|
||||
return configDTO;
|
||||
}
|
||||
|
||||
private static Notify getNotify(final NotifyConfig notifyConfig, final List<NotifyRecipient> notifyRecipients) {
|
||||
private static Notify getNotify(final NotifyConfig notifyConfig, final List<NotifyRecipient> notifyRecipients,
|
||||
final RetryNotifySceneEnum retryNotifyScene, final JobNotifySceneEnum jobNotifyScene) {
|
||||
List<Recipient> recipients = new ArrayList<>();
|
||||
for (final NotifyRecipient notifyRecipient : notifyRecipients) {
|
||||
Recipient recipient = new Recipient();
|
||||
@ -207,7 +211,14 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
|
||||
}
|
||||
|
||||
Notify notify = new Notify();
|
||||
notify.setNotifyScene(notifyConfig.getNotifyScene());
|
||||
if (Objects.nonNull(retryNotifyScene)) {
|
||||
notify.setRetryNotifyScene(retryNotifyScene.getNotifyScene());
|
||||
}
|
||||
|
||||
if (Objects.nonNull(jobNotifyScene)) {
|
||||
notify.setJobNotifyScene(jobNotifyScene.getNotifyScene());
|
||||
}
|
||||
|
||||
notify.setNotifyThreshold(notifyConfig.getNotifyThreshold());
|
||||
notify.setRecipients(recipients);
|
||||
return notify;
|
||||
|
@ -0,0 +1,23 @@
|
||||
package com.aizuda.snailjob.server.common.client;
|
||||
|
||||
import com.aizuda.snailjob.common.core.model.Result;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.RequestMethod;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.annotation.Body;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
|
||||
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.SYNC_CONFIG;
|
||||
|
||||
/**
|
||||
* 调用客户端接口
|
||||
*
|
||||
* @author: opensnail
|
||||
* @date : 2023-06-19 15:40
|
||||
* @since sj_1.0.0
|
||||
*/
|
||||
public interface CommonRpcClient {
|
||||
|
||||
@Mapping(path = SYNC_CONFIG, method = RequestMethod.POST)
|
||||
Result syncConfig(@Body ConfigDTO configDTO);
|
||||
|
||||
}
|
@ -78,14 +78,6 @@ public class SystemProperties {
|
||||
*/
|
||||
private Callback callback = new Callback();
|
||||
|
||||
/**
|
||||
* 系统模式:
|
||||
* RETRY: 分布式重试重
|
||||
* JOB: 分布式定时任务
|
||||
* ALL: 分布式重试重 && 分布式定时任务
|
||||
*/
|
||||
private SystemModeEnum mode = SystemModeEnum.ALL;
|
||||
|
||||
/**
|
||||
* 回调配置
|
||||
*/
|
||||
|
@ -1,4 +1,4 @@
|
||||
package com.aizuda.snailjob.server.retry.task.dto;
|
||||
package com.aizuda.snailjob.server.common.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
@ -1,14 +1,14 @@
|
||||
package com.aizuda.snailjob.server.retry.task.support.handler;
|
||||
package com.aizuda.snailjob.server.common.handler;
|
||||
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.common.core.model.Result;
|
||||
import com.aizuda.snailjob.server.common.Lifecycle;
|
||||
import com.aizuda.snailjob.server.common.client.CommonRpcClient;
|
||||
import com.aizuda.snailjob.server.common.dto.ConfigSyncTask;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
|
||||
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
|
||||
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
|
||||
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
|
||||
import com.aizuda.snailjob.server.retry.task.client.RetryRpcClient;
|
||||
import com.aizuda.snailjob.server.retry.task.dto.ConfigSyncTask;
|
||||
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -61,9 +61,9 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable {
|
||||
// 同步版本到每个客户端节点
|
||||
for (final RegisterNodeInfo registerNodeInfo : serverNodeSet) {
|
||||
ConfigDTO configDTO = accessTemplate.getGroupConfigAccess().getConfigInfo(groupName, namespaceId);
|
||||
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
|
||||
CommonRpcClient rpcClient = RequestBuilder.<CommonRpcClient, Result>newBuilder()
|
||||
.nodeInfo(registerNodeInfo)
|
||||
.client(RetryRpcClient.class)
|
||||
.client(CommonRpcClient.class)
|
||||
.build();
|
||||
SnailJobLog.LOCAL.info("同步结果 [{}]", rpcClient.syncConfig(configDTO));
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package com.aizuda.snailjob.server.common.rpc.server.handler;
|
||||
|
||||
import cn.hutool.core.net.url.UrlQuery;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.handler.GetHttpRequestHandler;
|
||||
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
|
||||
@ -14,8 +15,6 @@ import io.netty.handler.codec.http.HttpMethod;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.CONFIG;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2022-03-07 16:29
|
||||
@ -28,7 +27,7 @@ public class ConfigHttpRequestHandler extends GetHttpRequestHandler {
|
||||
|
||||
@Override
|
||||
public boolean supports(String path) {
|
||||
return CONFIG.equals(path);
|
||||
return HTTP_PATH.SYNC_CONFIG.equals(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,6 +1,5 @@
|
||||
package com.aizuda.snailjob.server.job.task;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@ -11,7 +10,6 @@ import org.springframework.context.annotation.Configuration;
|
||||
*/
|
||||
@Configuration
|
||||
@ComponentScan("com.aizuda.snailjob.server.job.task.*")
|
||||
@ConditionalOnExpression("'${snail-job.mode}'.equals('job') or '${snail-job.mode}'.equals('all')")
|
||||
public class SnailJobServerJobTaskStarter {
|
||||
|
||||
}
|
||||
|
@ -9,6 +9,9 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.annotation.Body;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP;
|
||||
|
||||
/**
|
||||
* 调用客户端接口
|
||||
*
|
||||
@ -18,10 +21,10 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
|
||||
*/
|
||||
public interface JobRpcClient {
|
||||
|
||||
@Mapping(path = "/job/stop/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = JOB_STOP, method = RequestMethod.POST)
|
||||
Result<Boolean> stop(@Body StopJobDTO stopJobDTO);
|
||||
|
||||
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = JOB_DISPATCH, method = RequestMethod.POST)
|
||||
Result<Boolean> dispatch(@Body DispatchJobRequest dispatchJobRequest);
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
package com.aizuda.snailjob.server.retry.task;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@ -11,7 +10,6 @@ import org.springframework.context.annotation.Configuration;
|
||||
*/
|
||||
@Configuration
|
||||
@ComponentScan("com.aizuda.snailjob.server.retry.task.*")
|
||||
@ConditionalOnExpression("'${snail-job.mode}'.equals('retry') or '${snail-job.mode}'.equals('all')")
|
||||
public class SnailJobServerRetryTaskStarter {
|
||||
|
||||
}
|
||||
|
@ -15,6 +15,10 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Body;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.annotation.Header;
|
||||
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
|
||||
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_CALLBACK;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_DISPATCH;
|
||||
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.RETRY_GENERATE_IDEM_ID;
|
||||
|
||||
/**
|
||||
* 调用客户端接口
|
||||
*
|
||||
@ -24,16 +28,13 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
|
||||
*/
|
||||
public interface RetryRpcClient {
|
||||
|
||||
@Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = RETRY_DISPATCH, method = RequestMethod.POST)
|
||||
Result<DispatchRetryResultDTO> dispatch(@Body DispatchRetryDTO dispatchRetryDTO, @Header SnailJobHeaders headers);
|
||||
|
||||
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = RETRY_CALLBACK, method = RequestMethod.POST)
|
||||
Result callback(@Body RetryCallbackDTO retryCallbackDTO);
|
||||
|
||||
@Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST)
|
||||
@Mapping(path = RETRY_GENERATE_IDEM_ID, method = RequestMethod.POST)
|
||||
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);
|
||||
|
||||
@Mapping(path = "/retry/sync/version/v1", method = RequestMethod.POST)
|
||||
Result syncConfig(@Body ConfigDTO configDTO);
|
||||
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.enums.TaskGeneratorSceneEnum;
|
||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||
import com.aizuda.snailjob.server.common.handler.ConfigVersionSyncHandler;
|
||||
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
|
||||
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
|
||||
import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext;
|
||||
|
@ -65,47 +65,51 @@ public class ConsumerBucketActor extends AbstractActor {
|
||||
return;
|
||||
}
|
||||
|
||||
if (SystemModeEnum.isJob(systemProperties.getMode())) {
|
||||
// 扫描job && workflow
|
||||
doScanJobAndWorkflow(consumerBucket);
|
||||
|
||||
ScanTask scanTask = new ScanTask();
|
||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||
// 扫描重试
|
||||
doScanRetry(consumerBucket);
|
||||
}
|
||||
|
||||
// 扫描定时任务数据
|
||||
ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, SyetemTaskTypeEnum.JOB);
|
||||
scanJobActorRef.tell(scanTask, scanJobActorRef);
|
||||
|
||||
// 扫描DAG工作流任务数据
|
||||
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, SyetemTaskTypeEnum.WORKFLOW);
|
||||
scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef);
|
||||
private void doScanRetry(final ConsumerBucket consumerBucket) {
|
||||
List<GroupConfig> groupConfigs = null;
|
||||
try {
|
||||
// 查询桶对应组信息
|
||||
groupConfigs = accessTemplate.getGroupConfigAccess().list(
|
||||
new LambdaQueryWrapper<GroupConfig>()
|
||||
.select(GroupConfig::getGroupName, GroupConfig::getGroupPartition, GroupConfig::getNamespaceId)
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
|
||||
);
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.error("生成重试任务异常.", e);
|
||||
}
|
||||
|
||||
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
||||
List<GroupConfig> groupConfigs = null;
|
||||
try {
|
||||
// 查询桶对应组信息
|
||||
groupConfigs = accessTemplate.getGroupConfigAccess().list(
|
||||
new LambdaQueryWrapper<GroupConfig>()
|
||||
.select(GroupConfig::getGroupName, GroupConfig::getGroupPartition, GroupConfig::getNamespaceId)
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
|
||||
);
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.error("生成重试任务异常.", e);
|
||||
}
|
||||
|
||||
if (!CollectionUtils.isEmpty(groupConfigs)) {
|
||||
for (final GroupConfig groupConfig : groupConfigs) {
|
||||
CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName(), groupConfig.getNamespaceId());
|
||||
ScanTask scanTask = new ScanTask();
|
||||
scanTask.setNamespaceId(groupConfig.getNamespaceId());
|
||||
scanTask.setGroupName(groupConfig.getGroupName());
|
||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||
scanTask.setGroupPartition(groupConfig.getGroupPartition());
|
||||
produceScanActorTask(scanTask);
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(groupConfigs)) {
|
||||
for (final GroupConfig groupConfig : groupConfigs) {
|
||||
CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName(), groupConfig.getNamespaceId());
|
||||
ScanTask scanTask = new ScanTask();
|
||||
scanTask.setNamespaceId(groupConfig.getNamespaceId());
|
||||
scanTask.setGroupName(groupConfig.getGroupName());
|
||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||
scanTask.setGroupPartition(groupConfig.getGroupPartition());
|
||||
produceScanActorTask(scanTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doScanJobAndWorkflow(final ConsumerBucket consumerBucket) {
|
||||
ScanTask scanTask = new ScanTask();
|
||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||
|
||||
// 扫描定时任务数据
|
||||
ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, SyetemTaskTypeEnum.JOB);
|
||||
scanJobActorRef.tell(scanTask, scanJobActorRef);
|
||||
|
||||
// 扫描DAG工作流任务数据
|
||||
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, SyetemTaskTypeEnum.WORKFLOW);
|
||||
scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,5 +74,4 @@ snail-job:
|
||||
callback: # 回调配置
|
||||
max-count: 288 #回调最大执行次数
|
||||
trigger-interval: 900 #间隔时间
|
||||
mode: all
|
||||
retry-max-pull-count: 10
|
||||
|
@ -7,9 +7,8 @@ import cn.hutool.core.util.ReUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.server.common.config.SystemProperties;
|
||||
import com.aizuda.snailjob.server.common.enums.IdGeneratorModeEnum;
|
||||
import com.aizuda.snailjob.server.common.enums.SystemModeEnum;
|
||||
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
|
||||
import com.aizuda.snailjob.server.retry.task.support.handler.ConfigVersionSyncHandler;
|
||||
import com.aizuda.snailjob.server.common.handler.ConfigVersionSyncHandler;
|
||||
import com.aizuda.snailjob.server.web.model.base.PageResult;
|
||||
import com.aizuda.snailjob.server.web.model.request.GroupConfigQueryVO;
|
||||
import com.aizuda.snailjob.server.web.model.request.GroupConfigRequestVO;
|
||||
@ -33,15 +32,11 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.SequenceAlloc;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
|
||||
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
|
||||
import com.aizuda.snailjob.server.retry.task.support.handler.ConfigVersionSyncHandler;
|
||||
import com.aizuda.snailjob.server.web.service.convert.GroupConfigConverter;
|
||||
import com.baomidou.mybatisplus.autoconfigure.MybatisPlusProperties;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.jdbc.BadSqlGrammarException;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
@ -159,15 +154,12 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
() -> new SnailJobServerException("exception occurred while adding group. groupConfigVO[{}]",
|
||||
groupConfigRequestVO));
|
||||
|
||||
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
||||
// 同步版本, 版本为0代表需要同步到客户端
|
||||
boolean add = configVersionSyncHandler.addSyncTask(groupName, namespaceId, 0);
|
||||
// 若添加失败则强制发起同步
|
||||
if (!add) {
|
||||
configVersionSyncHandler.syncVersion(groupName, namespaceId);
|
||||
}
|
||||
// 同步版本, 版本为0代表需要同步到客户端
|
||||
boolean add = configVersionSyncHandler.addSyncTask(groupName, namespaceId, 0);
|
||||
// 若添加失败则强制发起同步
|
||||
if (!add) {
|
||||
configVersionSyncHandler.syncVersion(groupName, namespaceId);
|
||||
}
|
||||
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,6 @@ public class SystemUserServiceImpl implements SystemUserService {
|
||||
|
||||
SystemUserResponseVO systemUserResponseVO = SystemUserResponseVOConverter.INSTANCE.convert(systemUser);
|
||||
systemUserResponseVO.setToken(token);
|
||||
systemUserResponseVO.setMode(systemProperties.getMode().name());
|
||||
|
||||
getPermission(systemUser.getRole(), systemUser.getId(), systemUserResponseVO);
|
||||
|
||||
@ -107,7 +106,6 @@ public class SystemUserServiceImpl implements SystemUserService {
|
||||
@Override
|
||||
public SystemUserResponseVO getUserInfo(UserSessionVO systemUser) {
|
||||
SystemUserResponseVO systemUserResponseVO = SystemUserResponseVOConverter.INSTANCE.convert(systemUser);
|
||||
systemUserResponseVO.setMode(systemProperties.getMode().name());
|
||||
|
||||
getPermission(systemUser.getRole(), systemUser.getId(), systemUserResponseVO);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user