diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java index cbc3300a8..2f5f3790f 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java @@ -86,4 +86,14 @@ public interface SystemConstants { String JOB_SHARDING_VALUE_SEPARATOR = "#=@"; String JOB_SHARDING_ARGS_SEPARATOR = "#;@"; + + /** + * 调度时长 + */ + Long SCHEDULE_PERIOD = 10L; + + /** + * 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance + */ + Long SCHEDULE_INITIAL_DELAY = 30L; } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java index 621e27825..fe18521b8 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java @@ -78,7 +78,7 @@ public class JdbcLockProvider extends AbstractLockProvider { distributedLock.setUpdateDt(now); return distributedLockMapper.insert(distributedLock) > 0; } catch (DuplicateKeyException | ConcurrencyFailureException | TransactionSystemException e) { - LogUtils.warn(log,"Duplicate key. lockName:[{}]", lockConfig.getLockName()); +// LogUtils.warn(log,"Duplicate key. lockName:[{}]", lockConfig.getLockName()); return false; } catch (DataIntegrityViolationException | BadSqlGrammarException | UncategorizedSQLException e) { LogUtils.error(log,"Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index faf787820..68ea22ffb 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -2,7 +2,10 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; @@ -125,7 +128,14 @@ public class ScanJobTaskActor extends AbstractActor { WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt()); + + LocalDateTime now = LocalDateTime.now(); + LocalDateTime nextTriggerAt = partitionTask.getNextTriggerAt(); + if (nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) { + nextTriggerAt = now; + } + + waitStrategyContext.setNextTriggerAt(nextTriggerAt); return waitStrategy.computeRetryTime(waitStrategyContext); } @@ -136,7 +146,7 @@ public class ScanJobTaskActor extends AbstractActor { new LambdaQueryWrapper() .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) .in(Job::getBucketIndex, scanTask.getBuckets()) - .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) + .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD)) .eq(Job::getDeleted, StatusEnum.NO.getStatus()) .ge(Job::getId, startId) ).getRecords(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java index 41a8d7ec6..b592d5390 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java @@ -122,12 +122,8 @@ public class WaitStrategies { public LocalDateTime computeRetryTime(WaitStrategyContext context) { long triggerInterval = Long.parseLong(context.triggerInterval); - LocalDateTime nextTriggerAt = context.getNextTriggerAt(); - if (nextTriggerAt.isBefore(LocalDateTime.now())) { - nextTriggerAt = LocalDateTime.now(); - } - return nextTriggerAt.plusSeconds(triggerInterval); + return context.nextTriggerAt.plusSeconds(triggerInterval); } } @@ -139,14 +135,9 @@ public class WaitStrategies { @Override public LocalDateTime computeRetryTime(WaitStrategyContext context) { - LocalDateTime nextTriggerAt = context.getNextTriggerAt(); - if (nextTriggerAt.isBefore(LocalDateTime.now())) { - nextTriggerAt = LocalDateTime.now(); - } - Date nextValidTime; try { - ZonedDateTime zdt = nextTriggerAt.atZone(ZoneOffset.ofHours(8)); + ZonedDateTime zdt = context.nextTriggerAt.atZone(ZoneOffset.ofHours(8)); nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); } catch (ParseException e) { throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index fb644dcd7..819950cb2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; import akka.actor.AbstractActor; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.IdempotentStrategy; @@ -89,7 +90,7 @@ public abstract class AbstractScanGroup extends AbstractActor { // 计算循环拉取的次数 if (preCostTime.get() > 0) { - long loopCount = Math.max((10 * 1000) / preCostTime.get(), 1); + long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime.get(), 1); // TODO 最大拉取次数支持可配置 loopCount = Math.min(loopCount, 10); pullCount.set(loopCount); @@ -99,24 +100,24 @@ public abstract class AbstractScanGroup extends AbstractActor { Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]", - groupName, lastId, pullCount.get(), preCostTime.get()); + groupName, lastId, pullCount.get(), preCostTime.get()); AtomicInteger count = new AtomicInteger(0); PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()), - partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> { - if (CollectionUtils.isEmpty(partitionTasks)) { - putLastId(scanTask.getGroupName(), 0L); - return Boolean.TRUE; - } + partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> { + if (CollectionUtils.isEmpty(partitionTasks)) { + putLastId(scanTask.getGroupName(), 0L); + return Boolean.TRUE; + } - // 超过最大的拉取次数则中断 - if (count.getAndIncrement() > pullCount.get()) { - putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); - return Boolean.TRUE; - } + // 超过最大的拉取次数则中断 + if (count.getAndIncrement() > pullCount.get()) { + putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); + return Boolean.TRUE; + } - return false; - }, lastId); + return false; + }, lastId); } @@ -136,10 +137,10 @@ public abstract class AbstractScanGroup extends AbstractActor { accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask); long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - - System.currentTimeMillis(); + - System.currentTimeMillis(); RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), - delay, - TimeUnit.MILLISECONDS); + delay, + TimeUnit.MILLISECONDS); } protected abstract TaskExecutorSceneEnum taskActuatorScene(); @@ -154,14 +155,14 @@ public abstract class AbstractScanGroup extends AbstractActor { public List listAvailableTasks(String groupName, Long lastId, Integer taskType) { List retryTasks = accessTemplate.getRetryTaskAccess() - .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), - new LambdaQueryWrapper() - .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) - .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) - .gt(RetryTask::getId, lastId) - .orderByAsc(RetryTask::getId)) - .getRecords(); + .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), + new LambdaQueryWrapper() + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) + .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) + .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD)) + .gt(RetryTask::getId, lastId) + .orderByAsc(RetryTask::getId)) + .getRecords(); return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index c0aff9c98..f8da21d68 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; @@ -57,7 +58,14 @@ public class ScanRetryTaskActor extends AbstractScanGroup { .getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName()); WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); - waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt()); + + LocalDateTime now = LocalDateTime.now(); + LocalDateTime nextTriggerAt = partitionTask.getNextTriggerAt(); + if (nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) { + nextTriggerAt = now; + } + + waitStrategyContext.setNextTriggerAt(nextTriggerAt); waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); waitStrategyContext.setTriggerCount(partitionTask.getRetryCount() + 1); // 更新触发时间, 任务进入时间轮 diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java index 12e963070..3a2058dae 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java @@ -172,17 +172,6 @@ public class WaitStrategies { @Override public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) { -// if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { -// // 回调失败的默认15分钟执行一次重试 -// SystemProperties systemProperties = SpringContext.CONTEXT.getBean(SystemProperties.class); -// triggerInterval = systemProperties.getCallback().getTriggerInterval(); -// } else { -// AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class); -// SceneConfig sceneConfig = -// accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); -// triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval()); -// } - return retryContext.getNextTriggerAt().plusSeconds(Integer.parseInt(retryContext.getTriggerInterval())); } } diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java index 3ca8cbcab..cf8239c17 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.starter.dispatch; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; @@ -35,12 +36,12 @@ public class DispatchService implements Lifecycle { /** * 调度时长 */ - public static final Long PERIOD = 10L; + public static final Long PERIOD = SystemConstants.SCHEDULE_PERIOD; /** * 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance */ - public static final Long INITIAL_DELAY = 30L; + public static final Long INITIAL_DELAY = SystemConstants.SCHEDULE_INITIAL_DELAY; @Override public void start() { diff --git a/frontend/src/config/router.config.js b/frontend/src/config/router.config.js index 27e83532f..945a7272b 100644 --- a/frontend/src/config/router.config.js +++ b/frontend/src/config/router.config.js @@ -56,7 +56,7 @@ export const asyncRouterMap = [ name: 'RetryTask', component: RouteView, redirect: '/retry/list', - meta: { title: '重试任务管理', icon: 'schedule', hideChildrenInMenu: true, keepAlive: true, permission: ['retryTask'] }, + meta: { title: '重试任务管理', icon: 'schedule', permission: ['retryTask'] }, children: [ { path: '/retry/scene/list', diff --git a/frontend/src/views/job/JobTaskList.vue b/frontend/src/views/job/JobTaskList.vue index e8544448e..662a8784b 100644 --- a/frontend/src/views/job/JobTaskList.vue +++ b/frontend/src/views/job/JobTaskList.vue @@ -223,8 +223,6 @@ export default { methods: { loadData (record) { const foundItem = this.logData.filter(item => item.taskId === record.id) - console.log(record) - console.log(foundItem) return foundItem }, handleChange (value) { diff --git a/frontend/src/views/task/RetryDeadLetterList.vue b/frontend/src/views/task/RetryDeadLetterList.vue index 59888c7dd..daa15e067 100644 --- a/frontend/src/views/task/RetryDeadLetterList.vue +++ b/frontend/src/views/task/RetryDeadLetterList.vue @@ -263,7 +263,7 @@ export default { this.advanced = !this.advanced }, handleInfo (record) { - this.$router.push({ path: '/retry-dead-letter/info', query: { id: record.id, groupName: record.groupName } }) + this.$router.push({ path: '/retry/dead-letter/info', query: { id: record.id, groupName: record.groupName } }) }, onClick ({ key }) { if (key === '1') { diff --git a/frontend/src/views/task/RetryLogList.vue b/frontend/src/views/task/RetryLogList.vue index 25173d09a..fcc43e0b3 100644 --- a/frontend/src/views/task/RetryLogList.vue +++ b/frontend/src/views/task/RetryLogList.vue @@ -253,7 +253,8 @@ export default { this.advanced = !this.advanced }, handleInfo (record) { - this.$router.push({ path: '/retry-log/info', query: { id: record.id } }) + console.log(record) + this.$router.push({ path: '/retry/log/info', query: { id: record.id } }) } } } diff --git a/frontend/src/views/task/RetryTaskInfo.vue b/frontend/src/views/task/RetryTaskInfo.vue index fa430f386..101b7a33a 100644 --- a/frontend/src/views/task/RetryTaskInfo.vue +++ b/frontend/src/views/task/RetryTaskInfo.vue @@ -85,6 +85,7 @@ export default { } }, created () { + console.log('111') const id = this.$route.query.id const groupName = this.$route.query.groupName if (id && groupName) { @@ -97,7 +98,7 @@ export default { this.$refs.retryTaskLogMessageListRef.refreshTable(this.queryParam) }) } else { - this.$router.push({ path: '/404' }) + // this.$router.push({ path: '/404' }) } }, methods: { diff --git a/frontend/src/views/task/RetryTaskList.vue b/frontend/src/views/task/RetryTaskList.vue index 5631ccf11..c5c8477c9 100644 --- a/frontend/src/views/task/RetryTaskList.vue +++ b/frontend/src/views/task/RetryTaskList.vue @@ -339,7 +339,7 @@ export default { this.advanced = !this.advanced }, handleInfo (record) { - this.$router.push({ path: '/retry-task/info', query: { id: record.id, groupName: record.groupName } }) + this.$router.push({ path: '/retry/info', query: { id: record.id, groupName: record.groupName } }) }, handleOk (record) {}, handleSuspend (record) {