feat(sj_1.0.0): 优化客户端group配置和enabled加载顺序

1. 优化类名handler -> 优化handle
2. 客户EnableSnailJob#group配置可以不强制指定,支持在配置文件中配置,具体见注解
3. 是否开启enabled配置,调整为用户的配置> 默认配置
This commit is contained in:
opensnail 2024-05-25 09:29:10 +08:00
parent 01127611fa
commit 08cf9e9909
14 changed files with 93 additions and 66 deletions

View File

@ -33,6 +33,20 @@ import java.lang.annotation.*;
@Import(SnailJobClientsRegistrar.class) @Import(SnailJobClientsRegistrar.class)
public @interface EnableSnailJob { public @interface EnableSnailJob {
/**
* 请在服务端提前配置好组,并设置在这里
* group的配置支持注解和配置文件两种形式
* 配置顺序为注解 > yml
* : 如果注解内不配置默认取环境变量中的group配置
* 比如:
* <p>
* snail-job.group = snail_job_demo_group
* </p>
*
* @return group
*/
String group() default "";
/** /**
* 控制多个Aop的执行顺序, * 控制多个Aop的执行顺序,
* 需要注意的是这里顺序要比事务的Aop要提前 * 需要注意的是这里顺序要比事务的Aop要提前

View File

@ -10,6 +10,7 @@ import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.*; import org.springframework.context.annotation.*;
import org.springframework.core.Ordered;
import org.springframework.core.env.StandardEnvironment; import org.springframework.core.env.StandardEnvironment;
@Configuration @Configuration
@ -29,7 +30,10 @@ public class SnailJobClientRetryCoreAutoConfiguration {
@Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public MethodInterceptor snailJobInterceptor(StandardEnvironment standardEnvironment, public MethodInterceptor snailJobInterceptor(StandardEnvironment standardEnvironment,
@Lazy RetryStrategy localRetryStrategies) { @Lazy RetryStrategy localRetryStrategies) {
return new SnailRetryInterceptor(standardEnvironment, localRetryStrategies); Integer order = standardEnvironment
.getProperty(SnailJobClientsRegistrar.AOP_ORDER_CONFIG, Integer.class, Ordered.HIGHEST_PRECEDENCE);
return new SnailRetryInterceptor(order, localRetryStrategies);
} }
} }

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.client.starter; package com.aizuda.snailjob.client.starter;
import cn.hutool.core.util.StrUtil;
import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware; import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
@ -8,6 +9,7 @@ import org.springframework.core.env.StandardEnvironment;
import org.springframework.core.type.AnnotationMetadata; import org.springframework.core.type.AnnotationMetadata;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* Snail Job 客户端注册器 * Snail Job 客户端注册器
@ -17,21 +19,37 @@ import java.util.Map;
*/ */
public class SnailJobClientsRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware { public class SnailJobClientsRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private static final String ENABLED_CONFIG = "snail-job.enabled";
private static final String GROUP_CONFIG = "snail-job.group";
protected static final String AOP_ORDER_CONFIG = "snail-job.aop.order";
protected static final String GROUP_ATTR = "group";
protected static final String ORDER_ATTR = "order";
private StandardEnvironment standardEnvironment; private StandardEnvironment standardEnvironment;
@Override @Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Map<String, Object> attrs = importingClassMetadata.getAnnotationAttributes(EnableSnailJob.class.getName()); Map<String, Object> attrs = importingClassMetadata.getAnnotationAttributes(EnableSnailJob.class.getName());
Map<String, Object> systemEnvironment = standardEnvironment.getSystemProperties(); Map<String, Object> systemEnvironment = standardEnvironment.getSystemProperties();
systemEnvironment.put("snail-job.aop.order", attrs.get("order")); systemEnvironment.put(AOP_ORDER_CONFIG, attrs.get(ORDER_ATTR));
Object group = attrs.get(GROUP_ATTR);
// 如果注解内不配置默认取环境变量中的group配置
if (Objects.nonNull(group) && StrUtil.isNotBlank((CharSequence) group)) {
systemEnvironment.put(GROUP_CONFIG, group);
}
} }
@Override @Override
public void setEnvironment(Environment environment) { public void setEnvironment(Environment env) {
StandardEnvironment standardEnvironment = (StandardEnvironment) environment; this.standardEnvironment = (StandardEnvironment) env;
this.standardEnvironment = standardEnvironment;
Map<String, Object> systemEnvironment = standardEnvironment.getSystemProperties(); Map<String, Object> systemEnvironment = standardEnvironment.getSystemProperties();
// 若是用户需要自定义enabled的值那么以用户的为主
if (Objects.nonNull(standardEnvironment.getProperty(ENABLED_CONFIG))) {
return;
}
// 添加了 EnableSnailJob 默认就是开启无需手动配置 // 添加了 EnableSnailJob 默认就是开启无需手动配置
systemEnvironment.put("snail-job.enabled", true); systemEnvironment.put(ENABLED_CONFIG, Boolean.TRUE);
} }
} }

View File

@ -54,12 +54,10 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
"> 时间:{} \n" + "> 时间:{} \n" +
"> 异常:{} \n"; "> 异常:{} \n";
private final StandardEnvironment standardEnvironment;
private final RetryStrategy retryStrategy; private final RetryStrategy retryStrategy;
private final int order;
public SnailRetryInterceptor(StandardEnvironment standardEnvironment, public SnailRetryInterceptor(int order, RetryStrategy localRetryStrategies) {
RetryStrategy localRetryStrategies) { this.order = order;
this.standardEnvironment = standardEnvironment;
this.retryStrategy = localRetryStrategies; this.retryStrategy = localRetryStrategies;
} }
@ -271,9 +269,7 @@ public class SnailRetryInterceptor implements MethodInterceptor, AfterAdvice, Se
@Override @Override
public int getOrder() { public int getOrder() {
String order = standardEnvironment return order;
.getProperty("snail-job.aop.order", String.valueOf(Ordered.HIGHEST_PRECEDENCE));
return Integer.parseInt(order);
} }

View File

@ -15,6 +15,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapp
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -38,23 +39,17 @@ import java.util.stream.Collectors;
* @since 1.6.0 * @since 1.6.0
*/ */
@Component @Component
@Slf4j @RequiredArgsConstructor
public class ServerNodeBalance implements Lifecycle, Runnable { public class ServerNodeBalance implements Lifecycle, Runnable {
/** /**
* 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance * 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance
*/ */
public static final Long INITIAL_DELAY = 10L; public static final Long INITIAL_DELAY = 10L;
private final ServerNodeMapper serverNodeMapper;
private final SystemProperties systemProperties;
@Autowired
protected AccessTemplate accessTemplate;
private Thread thread = null; private Thread thread = null;
@Autowired
protected ServerNodeMapper serverNodeMapper;
@Autowired
protected SystemProperties systemProperties;
private List<Integer> bucketList; private List<Integer> bucketList;
public void doBalance() { public void doBalance() {
@ -79,7 +74,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
List<Integer> allocate = new AllocateMessageQueueAveragely() List<Integer> allocate = new AllocateMessageQueueAveragely()
.allocate(ServerRegister.CURRENT_CID, bucketList, new ArrayList<>(podIpSet)); .allocate(ServerRegister.CURRENT_CID, bucketList, new ArrayList<>(podIpSet));
// 重新覆盖本地分配的组信息 // 重新覆盖本地分配的bucket
DistributeInstance.INSTANCE.setConsumerBucket(allocate); DistributeInstance.INSTANCE.setConsumerBucket(allocate);
SnailJobLog.LOCAL.info("rebalance complete. allocate:[{}]", allocate); SnailJobLog.LOCAL.info("rebalance complete. allocate:[{}]", allocate);

View File

@ -7,9 +7,9 @@ import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
* @date 2023-10-02 09:34:00 * @date 2023-10-02 09:34:00
* @since 2.4.0 * @since 2.4.0
*/ */
public interface JobPrePareHandler { public interface JobPrepareHandler {
boolean matches(Integer status); boolean matches(Integer status);
void handler(JobTaskPrepareDTO jobPrepareDTO); void handle(JobTaskPrepareDTO jobPrepareDTO);
} }

View File

@ -89,7 +89,7 @@ public class JobExecutorActor extends AbstractActor {
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("job executor exception. [{}]", taskExecute, e); SnailJobLog.LOCAL.error("job executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason()); handleTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecute.getTaskBatchId())); SpringContext.getContext().publishEvent(new JobTaskFailAlarmEvent(taskExecute.getTaskBatchId()));
} finally { } finally {
getContext().stop(getSelf()); getContext().stop(getSelf());
@ -126,7 +126,7 @@ public class JobExecutorActor extends AbstractActor {
} }
// 更新状态 // 更新状态
handlerTaskBatch(taskExecute, taskStatus, operationReason); handleTaskBatch(taskExecute, taskStatus, operationReason);
// 不是运行中的不需要生产任务 // 不是运行中的不需要生产任务
if (taskStatus != JobTaskBatchStatusEnum.RUNNING.getStatus()) { if (taskStatus != JobTaskBatchStatusEnum.RUNNING.getStatus()) {
@ -167,7 +167,7 @@ public class JobExecutorActor extends AbstractActor {
} }
//方法内容 //方法内容
doHandlerResidentTask(job, taskExecute); doHandleResidentTask(job, taskExecute);
} }
}); });
} }
@ -185,7 +185,7 @@ public class JobExecutorActor extends AbstractActor {
return context; return context;
} }
private void handlerTaskBatch(TaskExecuteDTO taskExecute, int taskStatus, int operationReason) { private void handleTaskBatch(TaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(taskExecute.getTaskBatchId()); jobTaskBatch.setId(taskExecute.getTaskBatchId());
@ -201,7 +201,7 @@ public class JobExecutorActor extends AbstractActor {
} }
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { private void doHandleResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job) if (Objects.isNull(job)
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())

View File

@ -6,7 +6,7 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobPrePareHandler; import com.aizuda.snailjob.server.job.task.support.JobPrepareHandler;
import com.aizuda.snailjob.server.job.task.support.prepare.job.TerminalJobPrepareHandler; import com.aizuda.snailjob.server.job.task.support.prepare.job.TerminalJobPrepareHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
@ -37,7 +37,7 @@ public class JobTaskPrepareActor extends AbstractActor {
@Autowired @Autowired
private JobTaskBatchMapper jobTaskBatchMapper; private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired @Autowired
private List<JobPrePareHandler> prePareHandlers; private List<JobPrepareHandler> prepareHandlers;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
@ -73,7 +73,7 @@ public class JobTaskPrepareActor extends AbstractActor {
// 说明所以任务已经完成 // 说明所以任务已经完成
if (CollectionUtils.isEmpty(notCompleteJobTaskBatchList)) { if (CollectionUtils.isEmpty(notCompleteJobTaskBatchList)) {
TerminalJobPrepareHandler terminalJobPrepareHandler = SpringContext.getBeanByType(TerminalJobPrepareHandler.class); TerminalJobPrepareHandler terminalJobPrepareHandler = SpringContext.getBeanByType(TerminalJobPrepareHandler.class);
terminalJobPrepareHandler.handler(prepare); terminalJobPrepareHandler.handle(prepare);
} else { } else {
boolean onlyTimeoutCheck = false; boolean onlyTimeoutCheck = false;
@ -83,9 +83,9 @@ public class JobTaskPrepareActor extends AbstractActor {
prepare.setWorkflowTaskBatchId(prepare.getWorkflowTaskBatchId()); prepare.setWorkflowTaskBatchId(prepare.getWorkflowTaskBatchId());
prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId()); prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId());
prepare.setOnlyTimeoutCheck(onlyTimeoutCheck); prepare.setOnlyTimeoutCheck(onlyTimeoutCheck);
for (JobPrePareHandler prePareHandler : prePareHandlers) { for (JobPrepareHandler prepareHandler : prepareHandlers) {
if (prePareHandler.matches(jobTaskBatch.getTaskBatchStatus())) { if (prepareHandler.matches(jobTaskBatch.getTaskBatchStatus())) {
prePareHandler.handler(prepare); prepareHandler.handle(prepare);
break; break;
} }
} }

View File

@ -1,20 +0,0 @@
package com.aizuda.snailjob.server.job.task.support.prepare.job;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobPrePareHandler;
/**
* @author opensnail
* @date 2023-10-02 09:57:55
* @since 2.4.0
*/
public abstract class AbstractJobPrePareHandler implements JobPrePareHandler {
@Override
public void handler(JobTaskPrepareDTO jobPrepareDTO) {
doHandler(jobPrepareDTO);
}
protected abstract void doHandler(JobTaskPrepareDTO jobPrepareDTO);
}

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.server.job.task.support.prepare.job;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobPrepareHandler;
/**
* @author opensnail
* @date 2023-10-02 09:57:55
* @since 2.4.0
*/
public abstract class AbstractJobPrepareHandler implements JobPrepareHandler {
@Override
public void handle(JobTaskPrepareDTO jobPrepareDTO) {
doHandle(jobPrepareDTO);
}
protected abstract void doHandle(JobTaskPrepareDTO jobPrepareDTO);
}

View File

@ -30,7 +30,7 @@ import org.springframework.stereotype.Component;
*/ */
@Component @Component
@Slf4j @Slf4j
public class RunningJobPrepareHandler extends AbstractJobPrePareHandler { public class RunningJobPrepareHandler extends AbstractJobPrepareHandler {
@Autowired @Autowired
private JobTaskBatchHandler jobTaskBatchHandler; private JobTaskBatchHandler jobTaskBatchHandler;
@ -41,7 +41,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
} }
@Override @Override
protected void doHandler(JobTaskPrepareDTO prepare) { protected void doHandle(JobTaskPrepareDTO prepare) {
log.debug("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare)); log.debug("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare));
// 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理 // 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理

View File

@ -22,7 +22,7 @@ import java.util.Objects;
@Order(Ordered.HIGHEST_PRECEDENCE) @Order(Ordered.HIGHEST_PRECEDENCE)
@Component @Component
@Slf4j @Slf4j
public class TerminalJobPrepareHandler extends AbstractJobPrePareHandler { public class TerminalJobPrepareHandler extends AbstractJobPrepareHandler {
@Autowired @Autowired
private JobTaskBatchGenerator jobTaskBatchGenerator; private JobTaskBatchGenerator jobTaskBatchGenerator;
@ -33,7 +33,7 @@ public class TerminalJobPrepareHandler extends AbstractJobPrePareHandler {
} }
@Override @Override
protected void doHandler(JobTaskPrepareDTO jobPrepareDTO) { protected void doHandle(JobTaskPrepareDTO jobPrepareDTO) {
log.debug("无处理中的数据. jobId:[{}]", jobPrepareDTO.getJobId()); log.debug("无处理中的数据. jobId:[{}]", jobPrepareDTO.getJobId());
// 生成任务批次 // 生成任务批次

View File

@ -21,7 +21,7 @@ import java.time.Duration;
*/ */
@Component @Component
@Slf4j @Slf4j
public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { public class WaitJobPrepareHandler extends AbstractJobPrepareHandler {
@Override @Override
public boolean matches(Integer status) { public boolean matches(Integer status) {
@ -29,7 +29,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
} }
@Override @Override
protected void doHandler(JobTaskPrepareDTO jobPrepareDTO) { protected void doHandle(JobTaskPrepareDTO jobPrepareDTO) {
log.debug("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); log.debug("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 若时间轮中数据不存在则重新加入 // 若时间轮中数据不存在则重新加入

View File

@ -13,7 +13,7 @@ import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.CronUtils; import com.aizuda.snailjob.server.common.util.CronUtils;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobPrePareHandler; import com.aizuda.snailjob.server.job.task.support.JobPrepareHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.base.PageResult;
@ -54,7 +54,7 @@ public class JobServiceImpl implements JobService {
private final SystemProperties systemProperties; private final SystemProperties systemProperties;
private final JobMapper jobMapper; private final JobMapper jobMapper;
@Lazy @Lazy
private final JobPrePareHandler terminalJobPrepareHandler; private final JobPrepareHandler terminalJobPrepareHandler;
private final AccessTemplate accessTemplate; private final AccessTemplate accessTemplate;
private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) { private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) {
@ -224,7 +224,7 @@ public class JobServiceImpl implements JobService {
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType()); jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
// 创建批次 // 创建批次
terminalJobPrepareHandler.handler(jobTaskPrepare); terminalJobPrepareHandler.handle(jobTaskPrepare);
return Boolean.TRUE; return Boolean.TRUE;
} }