feat:2.4.0

1. 修复并行重复产生新任务问题
2. 修复客户端继承的模式时报错问题
This commit is contained in:
byteblogs168 2023-10-29 00:00:24 +08:00
parent 6a5c318369
commit c20ccd618c
9 changed files with 27 additions and 13 deletions

View File

@ -45,7 +45,7 @@ public class JobEndPoint {
// 选择执行器 // 选择执行器
Object executor = jobExecutorInfo.getExecutor(); Object executor = jobExecutorInfo.getExecutor();
IJobExecutor jobExecutor; IJobExecutor jobExecutor;
if (executor.getClass().isAssignableFrom(IJobExecutor.class)) { if (IJobExecutor.class.isAssignableFrom(executor.getClass())) {
jobExecutor = (AbstractJobExecutor) executor; jobExecutor = (AbstractJobExecutor) executor;
} else { } else {
jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);

View File

@ -58,7 +58,7 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
String executorClassName = bean.getClass().getName(); String executorClassName = bean.getClass().getName();
// 通过实现接口进行注册 // 通过实现接口进行注册
if (bean.getClass().isAssignableFrom(IJobExecutor.class)) { if (IJobExecutor.class.isAssignableFrom(bean.getClass())) {
if (!JobExecutorInfoCache.isExisted(executorClassName)) { if (!JobExecutorInfoCache.isExisted(executorClassName)) {
retryerInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean)); retryerInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean));
} }

View File

@ -33,6 +33,8 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -97,7 +99,15 @@ public class JobExecutorActor extends AbstractActor {
context.setJobId(job.getId()); context.setJobId(job.getId());
jobExecutor.execute(context); jobExecutor.execute(context);
} finally { } finally {
doHandlerResidentTask(job, taskExecute); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
// 清除时间轮的缓存
JobTimerWheel.clearCache(taskExecute.getTaskBatchId());
//方法内容
doHandlerResidentTask(job, taskExecute);
}
});
} }
} }

View File

@ -16,6 +16,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -39,7 +40,7 @@ public class JobTaskBatchGenerator {
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setJobId(context.getJobId()); jobTaskBatch.setJobId(context.getJobId());
jobTaskBatch.setGroupName(context.getGroupName()); jobTaskBatch.setGroupName(context.getGroupName());
jobTaskBatch.setCreateDt(LocalDateTime.now());
// 无执行的节点 // 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) { if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());

View File

@ -1,10 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.idempotent; package com.aizuda.easy.retry.server.job.task.support.idempotent;
import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import scala.collection.immutable.Stream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
/** /**

View File

@ -28,9 +28,6 @@ public class JobTimerTask implements TimerTask {
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
try { try {
// 清除时间轮的缓存
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId());
taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName()); taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName());

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.idempotent;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -12,6 +13,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* @date 2023-10-19 21:54:57 * @date 2023-10-19 21:54:57
* @since 2.4.0 * @since 2.4.0
*/ */
@Slf4j
public class TimerIdempotent implements IdempotentStrategy<String, String> { public class TimerIdempotent implements IdempotentStrategy<String, String> {
private static final CopyOnWriteArraySet<String> cache = new CopyOnWriteArraySet<>(); private static final CopyOnWriteArraySet<String> cache = new CopyOnWriteArraySet<>();
@ -28,6 +30,9 @@ public class TimerIdempotent implements IdempotentStrategy<String, String> {
@Override @Override
public boolean isExist(String key, String value) { public boolean isExist(String key, String value) {
if (key == null || value == null) {
log.error("异常监控. key:[{}] value:[{}]", key, value);
}
return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))); return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)));
} }

View File

@ -2,7 +2,7 @@ FROM openjdk:8-jdk-alpine
MAINTAINER www.byteblogs.com MAINTAINER www.byteblogs.com
LABEL server-name=easy-retry-server LABEL server-name=easy-retry-server
ADD ../target/easy-retry-server.jar easy-retry-server.jar ADD ./target/easy-retry-server.jar easy-retry-server.jar
#对外暴漏的端口号 #对外暴漏的端口号
EXPOSE 8080 EXPOSE 8080

View File

@ -1,9 +1,11 @@
package com.aizuda.easy.retry.server.web.service.impl; package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.CronExpression; import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext; import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext;
@ -43,7 +45,8 @@ import java.util.Objects;
public class JobServiceImpl implements JobService { public class JobServiceImpl implements JobService {
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Autowired
private SystemProperties systemProperties;
@Autowired @Autowired
private JobMapper jobMapper; private JobMapper jobMapper;
@ -107,7 +110,7 @@ public class JobServiceImpl implements JobService {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>() LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>()
.select(Job::getId, Job::getJobName); .select(Job::getId, Job::getJobName);
if (StrUtil.isNotBlank(keywords)) { if (StrUtil.isNotBlank(keywords)) {
queryWrapper.like(Job::getJobName, keywords.trim() + "%"); queryWrapper.like(Job::getJobName, keywords.trim() + "%");
} }
if (Objects.nonNull(jobId)) { if (Objects.nonNull(jobId)) {
@ -130,6 +133,7 @@ public class JobServiceImpl implements JobService {
// 判断常驻任务 // 判断常驻任务
Job job = updateJobResident(jobRequestVO); Job job = updateJobResident(jobRequestVO);
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal());
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
return 1 == jobMapper.insert(job); return 1 == jobMapper.insert(job);
} }