feat:2.4.0

1. 修复重试详情404问题
2. 优化获取下次触发时间的值
This commit is contained in:
byteblogs168 2023-10-30 23:23:51 +08:00
parent 171951f09a
commit efe7a5d13b
14 changed files with 70 additions and 60 deletions

View File

@ -86,4 +86,14 @@ public interface SystemConstants {
String JOB_SHARDING_VALUE_SEPARATOR = "#=@"; String JOB_SHARDING_VALUE_SEPARATOR = "#=@";
String JOB_SHARDING_ARGS_SEPARATOR = "#;@"; String JOB_SHARDING_ARGS_SEPARATOR = "#;@";
/**
* 调度时长
*/
Long SCHEDULE_PERIOD = 10L;
/**
* 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance
*/
Long SCHEDULE_INITIAL_DELAY = 30L;
} }

View File

@ -78,7 +78,7 @@ public class JdbcLockProvider extends AbstractLockProvider {
distributedLock.setUpdateDt(now); distributedLock.setUpdateDt(now);
return distributedLockMapper.insert(distributedLock) > 0; return distributedLockMapper.insert(distributedLock) > 0;
} catch (DuplicateKeyException | ConcurrencyFailureException | TransactionSystemException e) { } catch (DuplicateKeyException | ConcurrencyFailureException | TransactionSystemException e) {
LogUtils.warn(log,"Duplicate key. lockName:[{}]", lockConfig.getLockName()); // LogUtils.warn(log,"Duplicate key. lockName:[{}]", lockConfig.getLockName());
return false; return false;
} catch (DataIntegrityViolationException | BadSqlGrammarException | UncategorizedSQLException e) { } catch (DataIntegrityViolationException | BadSqlGrammarException | UncategorizedSQLException e) {
LogUtils.error(log,"Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e); LogUtils.error(log,"Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e);

View File

@ -2,7 +2,10 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
@ -125,7 +128,14 @@ public class ScanJobTaskActor extends AbstractActor {
WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); waitStrategyContext.setTriggerType(partitionTask.getTriggerType());
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
LocalDateTime now = LocalDateTime.now();
LocalDateTime nextTriggerAt = partitionTask.getNextTriggerAt();
if (nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) {
nextTriggerAt = now;
}
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
return waitStrategy.computeRetryTime(waitStrategyContext); return waitStrategy.computeRetryTime(waitStrategyContext);
} }
@ -136,7 +146,7 @@ public class ScanJobTaskActor extends AbstractActor {
new LambdaQueryWrapper<Job>() new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus()) .eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.in(Job::getBucketIndex, scanTask.getBuckets()) .in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))
.eq(Job::getDeleted, StatusEnum.NO.getStatus()) .eq(Job::getDeleted, StatusEnum.NO.getStatus())
.ge(Job::getId, startId) .ge(Job::getId, startId)
).getRecords(); ).getRecords();

View File

@ -122,12 +122,8 @@ public class WaitStrategies {
public LocalDateTime computeRetryTime(WaitStrategyContext context) { public LocalDateTime computeRetryTime(WaitStrategyContext context) {
long triggerInterval = Long.parseLong(context.triggerInterval); long triggerInterval = Long.parseLong(context.triggerInterval);
LocalDateTime nextTriggerAt = context.getNextTriggerAt();
if (nextTriggerAt.isBefore(LocalDateTime.now())) {
nextTriggerAt = LocalDateTime.now();
}
return nextTriggerAt.plusSeconds(triggerInterval); return context.nextTriggerAt.plusSeconds(triggerInterval);
} }
} }
@ -139,14 +135,9 @@ public class WaitStrategies {
@Override @Override
public LocalDateTime computeRetryTime(WaitStrategyContext context) { public LocalDateTime computeRetryTime(WaitStrategyContext context) {
LocalDateTime nextTriggerAt = context.getNextTriggerAt();
if (nextTriggerAt.isBefore(LocalDateTime.now())) {
nextTriggerAt = LocalDateTime.now();
}
Date nextValidTime; Date nextValidTime;
try { try {
ZonedDateTime zdt = nextTriggerAt.atZone(ZoneOffset.ofHours(8)); ZonedDateTime zdt = context.nextTriggerAt.atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));
} catch (ParseException e) { } catch (ParseException e) {
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.common.IdempotentStrategy;
@ -89,7 +90,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
// 计算循环拉取的次数 // 计算循环拉取的次数
if (preCostTime.get() > 0) { if (preCostTime.get() > 0) {
long loopCount = Math.max((10 * 1000) / preCostTime.get(), 1); long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime.get(), 1);
// TODO 最大拉取次数支持可配置 // TODO 最大拉取次数支持可配置
loopCount = Math.min(loopCount, 10); loopCount = Math.min(loopCount, 10);
pullCount.set(loopCount); pullCount.set(loopCount);
@ -99,24 +100,24 @@ public abstract class AbstractScanGroup extends AbstractActor {
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]", log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]",
groupName, lastId, pullCount.get(), preCostTime.get()); groupName, lastId, pullCount.get(), preCostTime.get());
AtomicInteger count = new AtomicInteger(0); AtomicInteger count = new AtomicInteger(0);
PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()), PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> { partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> {
if (CollectionUtils.isEmpty(partitionTasks)) { if (CollectionUtils.isEmpty(partitionTasks)) {
putLastId(scanTask.getGroupName(), 0L); putLastId(scanTask.getGroupName(), 0L);
return Boolean.TRUE; return Boolean.TRUE;
} }
// 超过最大的拉取次数则中断 // 超过最大的拉取次数则中断
if (count.getAndIncrement() > pullCount.get()) { if (count.getAndIncrement() > pullCount.get()) {
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
return Boolean.TRUE; return Boolean.TRUE;
} }
return false; return false;
}, lastId); }, lastId);
} }
@ -136,10 +137,10 @@ public abstract class AbstractScanGroup extends AbstractActor {
accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask); accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask);
long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis(); - System.currentTimeMillis();
RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask),
delay, delay,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
} }
protected abstract TaskExecutorSceneEnum taskActuatorScene(); protected abstract TaskExecutorSceneEnum taskActuatorScene();
@ -154,14 +155,14 @@ public abstract class AbstractScanGroup extends AbstractActor {
public List<RetryPartitionTask> listAvailableTasks(String groupName, Long lastId, Integer taskType) { public List<RetryPartitionTask> listAvailableTasks(String groupName, Long lastId, Integer taskType) {
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess() List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess()
.listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
new LambdaQueryWrapper<RetryTask>() new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType)
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))
.gt(RetryTask::getId, lastId) .gt(RetryTask::getId, lastId)
.orderByAsc(RetryTask::getId)) .orderByAsc(RetryTask::getId))
.getRecords(); .getRecords();
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks); return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks);
} }

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
@ -57,7 +58,14 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
.getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName()); .getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
LocalDateTime now = LocalDateTime.now();
LocalDateTime nextTriggerAt = partitionTask.getNextTriggerAt();
if (nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) {
nextTriggerAt = now;
}
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setTriggerCount(partitionTask.getRetryCount() + 1); waitStrategyContext.setTriggerCount(partitionTask.getRetryCount() + 1);
// 更新触发时间, 任务进入时间轮 // 更新触发时间, 任务进入时间轮

View File

@ -172,17 +172,6 @@ public class WaitStrategies {
@Override @Override
public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) { public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) {
// if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
// // 回调失败的默认15分钟执行一次重试
// SystemProperties systemProperties = SpringContext.CONTEXT.getBean(SystemProperties.class);
// triggerInterval = systemProperties.getCallback().getTriggerInterval();
// } else {
// AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class);
// SceneConfig sceneConfig =
// accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
// triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval());
// }
return retryContext.getNextTriggerAt().plusSeconds(Integer.parseInt(retryContext.getTriggerInterval())); return retryContext.getNextTriggerAt().plusSeconds(Integer.parseInt(retryContext.getTriggerInterval()));
} }
} }

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.starter.dispatch; package com.aizuda.easy.retry.server.starter.dispatch;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
@ -35,12 +36,12 @@ public class DispatchService implements Lifecycle {
/** /**
* 调度时长 * 调度时长
*/ */
public static final Long PERIOD = 10L; public static final Long PERIOD = SystemConstants.SCHEDULE_PERIOD;
/** /**
* 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance * 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance
*/ */
public static final Long INITIAL_DELAY = 30L; public static final Long INITIAL_DELAY = SystemConstants.SCHEDULE_INITIAL_DELAY;
@Override @Override
public void start() { public void start() {

View File

@ -56,7 +56,7 @@ export const asyncRouterMap = [
name: 'RetryTask', name: 'RetryTask',
component: RouteView, component: RouteView,
redirect: '/retry/list', redirect: '/retry/list',
meta: { title: '重试任务管理', icon: 'schedule', hideChildrenInMenu: true, keepAlive: true, permission: ['retryTask'] }, meta: { title: '重试任务管理', icon: 'schedule', permission: ['retryTask'] },
children: [ children: [
{ {
path: '/retry/scene/list', path: '/retry/scene/list',

View File

@ -223,8 +223,6 @@ export default {
methods: { methods: {
loadData (record) { loadData (record) {
const foundItem = this.logData.filter(item => item.taskId === record.id) const foundItem = this.logData.filter(item => item.taskId === record.id)
console.log(record)
console.log(foundItem)
return foundItem return foundItem
}, },
handleChange (value) { handleChange (value) {

View File

@ -263,7 +263,7 @@ export default {
this.advanced = !this.advanced this.advanced = !this.advanced
}, },
handleInfo (record) { handleInfo (record) {
this.$router.push({ path: '/retry-dead-letter/info', query: { id: record.id, groupName: record.groupName } }) this.$router.push({ path: '/retry/dead-letter/info', query: { id: record.id, groupName: record.groupName } })
}, },
onClick ({ key }) { onClick ({ key }) {
if (key === '1') { if (key === '1') {

View File

@ -253,7 +253,8 @@ export default {
this.advanced = !this.advanced this.advanced = !this.advanced
}, },
handleInfo (record) { handleInfo (record) {
this.$router.push({ path: '/retry-log/info', query: { id: record.id } }) console.log(record)
this.$router.push({ path: '/retry/log/info', query: { id: record.id } })
} }
} }
} }

View File

@ -85,6 +85,7 @@ export default {
} }
}, },
created () { created () {
console.log('111')
const id = this.$route.query.id const id = this.$route.query.id
const groupName = this.$route.query.groupName const groupName = this.$route.query.groupName
if (id && groupName) { if (id && groupName) {
@ -97,7 +98,7 @@ export default {
this.$refs.retryTaskLogMessageListRef.refreshTable(this.queryParam) this.$refs.retryTaskLogMessageListRef.refreshTable(this.queryParam)
}) })
} else { } else {
this.$router.push({ path: '/404' }) // this.$router.push({ path: '/404' })
} }
}, },
methods: { methods: {

View File

@ -339,7 +339,7 @@ export default {
this.advanced = !this.advanced this.advanced = !this.advanced
}, },
handleInfo (record) { handleInfo (record) {
this.$router.push({ path: '/retry-task/info', query: { id: record.id, groupName: record.groupName } }) this.$router.push({ path: '/retry/info', query: { id: record.id, groupName: record.groupName } })
}, },
handleOk (record) {}, handleOk (record) {},
handleSuspend (record) { handleSuspend (record) {