feat(sj_1.0.0): 1、优化定时任务时间轮幂等逻辑 2、优化重试幂等逻辑

This commit is contained in:
opensnail 2024-05-23 12:11:23 +08:00
parent 945c77bdc0
commit 03ee9cce17
28 changed files with 228 additions and 264 deletions

View File

@ -6,14 +6,12 @@ package com.aizuda.snailjob.server.common;
* @author: opensnail
* @date : 2021-11-23 09:20
*/
public interface IdempotentStrategy<T, V> {
public interface IdempotentStrategy<T> {
boolean set(T key, V value);
boolean set(T key);
V get(T t);
boolean isExist(T key);
boolean isExist(T key, V value);
boolean clear(T key, V value);
boolean clear(T key);
}

View File

@ -1,4 +1,4 @@
package com.aizuda.snailjob.server.job.task.support.timer;
package com.aizuda.snailjob.server.common;
/**
* @author: opensnail
@ -7,5 +7,5 @@ package com.aizuda.snailjob.server.job.task.support.timer;
*/
public interface TimerTask<T> extends io.netty.util.TimerTask {
T getUniqueId();
T idempotentKey();
}

View File

@ -32,6 +32,7 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimeoutCheckTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
@ -54,6 +55,7 @@ import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
@ -152,7 +154,7 @@ public class JobExecutorActor extends AbstractActor {
@Override
public void afterCompletion(int status) {
// 清除时间轮的缓存
JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId());
JobTimerWheel.clearCache(MessageFormat.format(JobTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getTaskBatchId()));
if (JobTaskBatchStatusEnum.RUNNING.getStatus() == finalTaskStatus) {
@ -160,10 +162,6 @@ public class JobExecutorActor extends AbstractActor {
JobTimerWheel.registerWithJob(() -> new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
// 加500ms是为了让尽量保证客户端自己先超时中断防止客户端上报成功但是服务端已触发超时中断
Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500));
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId(),
// new JobTimeoutCheckTask(taskExecute.getTaskBatchId(), job.getId()),
// job.getExecutorTimeout(), TimeUnit.SECONDS);
}
//方法内容
@ -241,9 +239,11 @@ public class JobExecutorActor extends AbstractActor {
// 获取时间差的毫秒数
long milliseconds = nextTriggerAt - preTriggerAt;
log.debug("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000);
log.info("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000));
JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}

View File

@ -25,6 +25,7 @@ import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExe
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimeoutCheckTask;
import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.*;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -36,6 +37,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
@ -89,13 +91,10 @@ public class WorkflowExecutorActor extends AbstractActor {
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason());
Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId());
JobTimerWheel.clearCache(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId());
JobTimerWheel.clearCache(MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getWorkflowTaskBatchId()));
JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()),
Duration.ofSeconds(workflow.getExecutorTimeout()));
// 超时检查
// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), taskExecute.getWorkflowTaskBatchId(),
// new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), workflow.getExecutorTimeout(), TimeUnit.SECONDS);
}
// 获取DAG图

View File

@ -1,7 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.idempotent;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@ -12,9 +11,9 @@ import java.util.concurrent.TimeUnit;
* @date 2023-10-19 21:54:57
* @since 2.4.0
*/
public class TimerIdempotent implements IdempotentStrategy<Integer, Long> {
public class TimerIdempotent implements IdempotentStrategy<String> {
private static final Cache<Pair<Integer/*任务类型: SyetemTaskTypeEnum*/, Long /*批次id*/>, Long> cache;
private static final Cache<String, String> cache;
static {
cache = CacheBuilder.newBuilder()
@ -25,28 +24,20 @@ public class TimerIdempotent implements IdempotentStrategy<Integer, Long> {
}
@Override
public boolean set(Integer type, Long value) {
cache.put(getKey(type, value), value);
public boolean set(String key) {
cache.put(key, key);
return Boolean.TRUE;
}
@Override
public Long get(Integer s) {
throw new UnsupportedOperationException("不支持此操作");
public boolean isExist(String key) {
return cache.asMap().containsKey(key);
}
@Override
public boolean isExist(Integer type, Long value) {
return cache.asMap().containsKey(getKey(type, value));
}
@Override
public boolean clear(Integer type, Long value) {
cache.invalidate(getKey(type, value));
public boolean clear(String key) {
cache.invalidate(key);
return Boolean.TRUE;
}
private static Pair<Integer, Long> getKey(Integer type, Long value) {
return Pair.of(type, value);
}
}

View File

@ -10,6 +10,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.time.Duration;
/**
@ -33,7 +34,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrepareHandler {
log.debug("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId())) {
if (!JobTimerWheel.isExisted(MessageFormat.format(JobTimerTask.IDEMPOTENT_KEY_PREFIX, jobPrepareDTO.getTaskBatchId()))) {
log.info("存在待处理任务且时间轮中不存在 taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 进入时间轮
@ -43,8 +44,6 @@ public class WaitJobPrepareHandler extends AbstractJobPrepareHandler {
jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId());
JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay));
// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(),
// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -1,7 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.prepare.workflow;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO;
@ -10,6 +9,7 @@ import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Objects;
@ -22,7 +22,7 @@ import java.util.Objects;
*/
@Component
@Slf4j
public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
public class WaitWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
@Override
public boolean matches(Integer status) {
@ -34,7 +34,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
log.debug("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId())) {
if (!JobTimerWheel.isExisted(MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, workflowTaskPrepareDTO.getWorkflowTaskBatchId()))) {
log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 进入时间轮
@ -45,8 +45,6 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
workflowTimerTaskDTO.setTaskExecutorScene(workflowTaskPrepareDTO.getTaskExecutorScene());
JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimerTask(workflowTimerTaskDTO), Duration.ofMillis(delay));
// JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
// new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -4,6 +4,7 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.JobTaskStopHandler;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
@ -16,6 +17,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import java.text.MessageFormat;
import java.util.Objects;
/**
@ -26,7 +28,9 @@ import java.util.Objects;
* @since sj_1.0.0
*/
@AllArgsConstructor
public class JobTimeoutCheckTask implements TimerTask<Long> {
public class JobTimeoutCheckTask implements TimerTask<String> {
private static final String IDEMPOTENT_KEY_PREFIX = "job_timeout_check_{0}";
private final Long taskBatchId;
private final Long jobId;
@ -65,7 +69,7 @@ public class JobTimeoutCheckTask implements TimerTask<Long> {
}
@Override
public Long getUniqueId() {
return taskBatchId;
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, taskBatchId);
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
@ -8,6 +9,7 @@ import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime;
/**
@ -17,8 +19,8 @@ import java.time.LocalDateTime;
*/
@AllArgsConstructor
@Slf4j
public class JobTimerTask implements TimerTask<Long> {
public class JobTimerTask implements TimerTask<String> {
public static final String IDEMPOTENT_KEY_PREFIX = "job_{0}";
private JobTimerTaskDTO jobTimerTaskDTO;
@Override
@ -42,7 +44,7 @@ public class JobTimerTask implements TimerTask<Long> {
}
@Override
public Long getUniqueId() {
return jobTimerTaskDTO.getTaskBatchId();
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobTimerTaskDTO.getTaskBatchId());
}
}

View File

@ -1,7 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.support.idempotent.TimerIdempotent;
import io.netty.util.HashedWheelTimer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
@ -36,60 +36,54 @@ public class JobTimerWheel {
timer.start();
}
// @Deprecated
// public static synchronized void register(Integer taskType, Long uniqueId, TimerTask<Long> task, long delay, TimeUnit unit) {
//
// if (!isExisted(taskType, uniqueId)) {
// SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
// delay = delay < 0 ? 0 : delay;
// try {
// timer.newTimeout(task, delay, unit);
// idempotent.set(taskType, uniqueId);
// } catch (Exception e) {
// SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e);
// }
// }
// }
public static synchronized void registerWithWorkflow(Supplier<TimerTask<Long>> task, Duration delay) {
TimerTask<Long> timerTask = task.get();
register(SyetemTaskTypeEnum.WORKFLOW.getType(), timerTask.getUniqueId(), timerTask, delay);
/**
* 定时任务添加时间轮
*
* @param task 任务
* @param delay 延迟时间
*/
public static synchronized void registerWithWorkflow(Supplier<TimerTask<String>> task, Duration delay) {
TimerTask<String> timerTask = task.get();
register(timerTask.idempotentKey(), timerTask, delay);
}
public static synchronized void registerWithJob(Supplier<TimerTask<Long>> task, Duration delay) {
TimerTask<Long> timerTask = task.get();
register(SyetemTaskTypeEnum.JOB.getType(), timerTask.getUniqueId(), timerTask, delay);
/**
* 工作流任务添加时间轮
* 虽然job和Workflow 添加时间轮方法逻辑一样为了后面做一些不同的逻辑这里兼容分开写
* @param task 任务
* @param delay 延迟时间
*/
public static synchronized void registerWithJob(Supplier<TimerTask<String>> task, Duration delay) {
TimerTask<String> timerTask = task.get();
register(timerTask.idempotentKey(), timerTask, delay);
}
public static synchronized void register(Integer taskType, Long uniqueId, TimerTask<Long> task, Duration delay) {
public static synchronized void register(String idempotentKey, TimerTask<String> task, Duration delay) {
register(taskType, uniqueId, new Consumer<HashedWheelTimer>() {
@Override
public void accept(final HashedWheelTimer hashedWheelTimer) {
SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS);
}
register(idempotentKey, hashedWheelTimer -> {
SnailJobLog.LOCAL.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, idempotentKey);
timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS);
});
}
public static synchronized void register(Integer taskType, Long uniqueId, Consumer<HashedWheelTimer> consumer) {
public static synchronized void register(String idempotentKey, Consumer<HashedWheelTimer> consumer) {
if (!isExisted(taskType, uniqueId)) {
if (!isExisted(idempotentKey)) {
try {
consumer.accept(timer);
idempotent.set(taskType, uniqueId);
idempotent.set(idempotentKey);
} catch (Exception e) {
SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e);
SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", idempotentKey, e);
}
}
}
public static boolean isExisted(Integer taskType, Long uniqueId) {
return idempotent.isExist(taskType, uniqueId);
public static boolean isExisted(String idempotentKey) {
return idempotent.isExist(idempotentKey);
}
public static void clearCache(Integer taskType, Long uniqueId) {
idempotent.clear(taskType, uniqueId);
public static void clearCache(String idempotentKey) {
idempotent.clear(idempotentKey);
}
}

View File

@ -1,25 +1,27 @@
package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
/**
* @author opensnail
* @date 2023-10-20 23:09:13
* @since 2.4.0
*/
@Slf4j
@AllArgsConstructor
public class ResidentJobTimerTask implements TimerTask<Long> {
public class ResidentJobTimerTask implements TimerTask<String> {
private static final String IDEMPOTENT_KEY_PREFIX = " resident_job_{0}";
private JobTimerTaskDTO jobTimerTaskDTO;
private Job job;
@ -28,19 +30,19 @@ public class ResidentJobTimerTask implements TimerTask<Long> {
public void run(Timeout timeout) throws Exception {
try {
// 清除时间轮的缓存
JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId());
JobTimerWheel.clearCache(idempotentKey());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
SnailJobLog.LOCAL.error("任务调度执行失败", e);
}
}
@Override
public Long getUniqueId() {
return jobTimerTaskDTO.getTaskBatchId();
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, jobTimerTaskDTO.getTaskBatchId());
}
}

View File

@ -4,6 +4,7 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -11,6 +12,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import java.text.MessageFormat;
import java.util.Objects;
/**
@ -19,7 +21,9 @@ import java.util.Objects;
* @since sj_1.0.0
*/
@AllArgsConstructor
public class WorkflowTimeoutCheckTask implements TimerTask<Long> {
public class WorkflowTimeoutCheckTask implements TimerTask<String> {
private static final String IDEMPOTENT_KEY_PREFIX = "workflow_timeout_check_{0}";
private final Long workflowTaskBatchId;
@Override
@ -40,7 +44,7 @@ public class WorkflowTimeoutCheckTask implements TimerTask<Long> {
}
@Override
public Long getUniqueId() {
return workflowTaskBatchId;
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, workflowTaskBatchId);
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.timer;
import akka.actor.ActorRef;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO;
@ -9,6 +10,7 @@ import io.netty.util.Timeout;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime;
/**
@ -18,7 +20,8 @@ import java.time.LocalDateTime;
*/
@AllArgsConstructor
@Slf4j
public class WorkflowTimerTask implements TimerTask<Long> {
public class WorkflowTimerTask implements TimerTask<String> {
public static final String IDEMPOTENT_KEY_PREFIX = "workflow_{0}";
private WorkflowTimerTaskDTO workflowTimerTaskDTO;
@ -42,7 +45,7 @@ public class WorkflowTimerTask implements TimerTask<Long> {
}
@Override
public Long getUniqueId() {
return workflowTimerTaskDTO.getWorkflowTaskBatchId();
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, workflowTimerTaskDTO.getWorkflowTaskBatchId());
}
}

View File

@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
@ -10,17 +9,17 @@ import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent;
import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope;
@ -40,23 +39,15 @@ import java.time.LocalDateTime;
*/
@Component(ActorGenerator.FAILURE_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class FailureActor extends AbstractActor {
@Autowired
private ApplicationContext context;
@Autowired
private AccessTemplate accessTemplate;
@Autowired
private CallbackRetryTaskHandler callbackRetryTaskHandler;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private SystemProperties systemProperties;
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
private final IdempotentStrategy<String> idempotentStrategy = IdempotentHolder.getRetryIdempotent();
private final ApplicationContext context;
private final AccessTemplate accessTemplate;
private final CallbackRetryTaskHandler callbackRetryTaskHandler;
private final TransactionTemplate transactionTemplate;
private final SystemProperties systemProperties;
private final RetryTaskLogMapper retryTaskLogMapper;
@Override
public Receive createReceive() {
@ -107,8 +98,8 @@ public class FailureActor extends AbstractActor {
SnailJobLog.LOCAL.error("更新重试任务失败", e);
} finally {
// 清除幂等标识位
idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId());
getContext().stop(getSelf());
idempotentStrategy.clear(
ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString()); getContext().stop(getSelf());
}
}).build();

View File

@ -2,21 +2,20 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -37,21 +36,13 @@ import java.time.LocalDateTime;
*/
@Component(ActorGenerator.FINISH_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class FinishActor extends AbstractActor {
@Autowired
private AccessTemplate accessTemplate;
@Autowired
private CallbackRetryTaskHandler callbackRetryTaskHandler;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
private final IdempotentStrategy<String> idempotentStrategy = IdempotentHolder.getRetryIdempotent();
private final AccessTemplate accessTemplate;
private final CallbackRetryTaskHandler callbackRetryTaskHandler;
private final TransactionTemplate transactionTemplate;
private final RetryTaskLogMapper retryTaskLogMapper;
@Override
public Receive createReceive() {
@ -89,8 +80,8 @@ public class FinishActor extends AbstractActor {
SnailJobLog.LOCAL.error("更新重试任务失败", e);
} finally {
// 清除幂等标识位
idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId());
idempotentStrategy.clear(
ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString());
getContext().stop(getSelf());
}

View File

@ -2,18 +2,17 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -29,14 +28,10 @@ import java.time.LocalDateTime;
*/
@Component(ActorGenerator.NO_RETRY_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class NoRetryActor extends AbstractActor {
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy;
private final IdempotentStrategy<String> idempotentStrategy = IdempotentHolder.getRetryIdempotent();
private final AccessTemplate accessTemplate;
@Override
public Receive createReceive() {
@ -57,8 +52,7 @@ public class NoRetryActor extends AbstractActor {
SnailJobLog.LOCAL.error("更新重试任务失败", e);
} finally {
// 清除幂等标识位
idempotentStrategy.clear(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId());
idempotentStrategy.clear(ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString());
// 更新DB状态
getContext().stop(getSelf());
}

View File

@ -2,7 +2,6 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
@ -24,13 +23,13 @@ import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
import com.aizuda.snailjob.server.common.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -137,12 +136,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
for (PartitionTask partitionTask : partitionTasks) {
RetryPartitionTask retryPartitionTask = (RetryPartitionTask) partitionTask;
long delay = DateUtils.toEpochMilli(retryPartitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100;
RetryTimerWheel.register(
Pair.of(retryPartitionTask.getGroupName(), retryPartitionTask.getNamespaceId()),
retryPartitionTask.getUniqueId(),
timerTask(retryPartitionTask),
delay,
TimeUnit.MILLISECONDS);
TimerTask<String> timerTask = timerTask(retryPartitionTask);
RetryTimerWheel.register(timerTask.idempotentKey(), timerTask, Duration.ofMillis(delay));
}
}
@ -175,7 +170,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask,
final RetrySceneConfig retrySceneConfig);
protected abstract TimerTask timerTask(RetryPartitionTask partitionTask);
protected abstract TimerTask<String> timerTask(RetryPartitionTask partitionTask);
protected abstract AtomicLong preCostTime();

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.scan;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext;
@ -11,7 +12,6 @@ import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorS
import com.aizuda.snailjob.server.retry.task.support.timer.CallbackTimerTask;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
@ -72,7 +72,7 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
}
@Override
protected TimerTask timerTask(final RetryPartitionTask partitionTask) {
protected TimerTask<String> timerTask(final RetryPartitionTask partitionTask) {
RetryTimerContext retryTimerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(partitionTask);
retryTimerContext.setScene(taskActuatorScene());
return new CallbackTimerTask(retryTimerContext);

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.scan;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies.WaitStrategyContext;
@ -12,7 +13,6 @@ import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorS
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
@ -79,7 +79,7 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
}
@Override
protected TimerTask timerTask(final RetryPartitionTask partitionTask) {
protected TimerTask<String> timerTask(final RetryPartitionTask partitionTask) {
RetryTimerContext retryTimerContext = RetryTaskConverter.INSTANCE.toRetryTimerContext(partitionTask);
retryTimerContext.setScene(taskActuatorScene());
return new RetryTimerTask(retryTimerContext);

View File

@ -3,33 +3,30 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.task;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.server.retry.task.support.idempotent.RetryIdempotentStrategyHandler;
import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
/**
* @author opensnail
* @date 2023-09-23 08:02:17
* @since 2.4.0
*/
@Slf4j
public abstract class AbstractTaskExecutor implements TaskExecutor, InitializingBean {
protected final RetryIdempotentStrategyHandler idempotentStrategy = IdempotentHolder.getRetryIdempotent();
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
protected IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy;
@Autowired
protected SystemProperties systemProperties;
@Autowired
@ -59,7 +56,7 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> pair = executor.filter();
if (!pair.getKey()) {
RetryTask retryTask = retryContext.getRetryTask();
log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]",
SnailJobLog.LOCAL.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]",
retryTask.getGroupName(),
retryTask.getUniqueId(), pair.getValue().toString());
@ -83,7 +80,7 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
String groupName = retryTask.getGroupName();
String namespaceId = retryTask.getNamespaceId();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(Pair.of(groupName, namespaceId), retryId);
idempotentStrategy.set(ImmutableTriple.of(groupName, namespaceId, retryId).toString());
ActorRef actorRef = getActorRef();
actorRef.tell(retryExecutor, actorRef);

View File

@ -0,0 +1,25 @@
package com.aizuda.snailjob.server.retry.task.support.idempotent;
/**
* @author: opensnail
* @date : 2024-05-23
* @since : sj_1.0.0
*/
public class IdempotentHolder {
private IdempotentHolder() {
}
public static RetryIdempotentStrategyHandler getRetryIdempotent() {
return SingletonHolder.RETRY_IDEMPOTENT_INSTANCE;
}
public static TimerIdempotent getTimerIdempotent() {
return SingletonHolder.TIMER_IDEMPOTENT;
}
private static class SingletonHolder {
private static final RetryIdempotentStrategyHandler RETRY_IDEMPOTENT_INSTANCE = new RetryIdempotentStrategyHandler();
private static final TimerIdempotent TIMER_IDEMPOTENT = new TimerIdempotent();
}
}

View File

@ -1,13 +1,10 @@
package com.aizuda.snailjob.server.retry.task.support.idempotent;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
/**
@ -17,10 +14,9 @@ import java.util.concurrent.TimeUnit;
* @date : 2021-11-23 09:26
*/
@Component
public class RetryIdempotentStrategyHandler implements IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> {
private static final String KEY_FORMAT = "{0}_{1}_{2}";
public class RetryIdempotentStrategyHandler implements IdempotentStrategy<String> {
private static final Cache<String, Long> cache;
private static final Cache<String, String> cache;
static {
cache = CacheBuilder.newBuilder()
@ -30,28 +26,21 @@ public class RetryIdempotentStrategyHandler implements IdempotentStrategy<Pair<S
}
@Override
public boolean set(Pair<String/*groupName*/, String/*namespaceId*/> pair, Long value) {
cache.put(getKey(pair, value), value);
public boolean set(String key) {
cache.put(key, key);
return Boolean.TRUE;
}
@Override
public Long get(Pair<String/*groupName*/, String/*namespaceId*/> pair) {
throw new SnailJobServerException("不支持的操作");
public boolean isExist(String key) {
return cache.asMap().containsKey(key);
}
@Override
public boolean isExist(Pair<String/*groupName*/, String/*namespaceId*/> pair, Long value) {
return cache.asMap().containsKey(getKey(pair, value));
}
@Override
public boolean clear(Pair<String/*groupName*/, String/*namespaceId*/> pair, Long value) {
cache.invalidate(getKey(pair, value));
public boolean clear(String key) {
cache.invalidate(key);
return Boolean.TRUE;
}
private static String getKey(Pair<String/*groupName*/, String/*namespaceId*/> pair, final Long value) {
return MessageFormat.format(KEY_FORMAT, pair.getKey(), pair.getValue(), value);
}
}

View File

@ -1,12 +1,10 @@
package com.aizuda.snailjob.server.retry.task.support.idempotent;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
/**
@ -14,10 +12,8 @@ import java.util.concurrent.TimeUnit;
* @date 2023-10-19 21:54:57
* @since 2.4.0
*/
@Slf4j
public class TimerIdempotent implements IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, String> {
public class TimerIdempotent implements IdempotentStrategy<String> {
private static final String KEY_FORMAT = "{0}_{1}_{2}";
private static final Cache<String, String> cache;
static {
@ -28,28 +24,19 @@ public class TimerIdempotent implements IdempotentStrategy<Pair<String/*groupNam
}
@Override
public boolean set(Pair<String/*groupName*/, String/*namespaceId*/> pair, String value) {
cache.put(getKey(pair, value), value);
public boolean set(String key) {
cache.put(key, key);
return Boolean.TRUE;
}
private static String getKey(Pair<String/*groupName*/, String/*namespaceId*/> pair, final String value) {
return MessageFormat.format(KEY_FORMAT, pair.getKey(), pair.getValue(), value);
@Override
public boolean isExist(String key) {
return cache.asMap().containsKey(key);
}
@Override
public String get(Pair<String/*groupName*/, String/*namespaceId*/> pair) {
throw new UnsupportedOperationException("不支持此操作");
}
@Override
public boolean isExist(Pair<String/*groupName*/, String/*namespaceId*/> pair, String value) {
return cache.asMap().containsKey(getKey(pair, value));
}
@Override
public boolean clear(Pair<String/*groupName*/, String/*namespaceId*/> pair, String value) {
cache.invalidate(getKey(pair, value));
public boolean clear(String key) {
cache.invalidate(key);
return Boolean.TRUE;
}
}

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.common.IdempotentStrategy;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.cache.CacheGroupRateLimiter;
@ -49,7 +50,7 @@ public class FilterStrategies {
*
* @return {@link BitSetIdempotentFilterStrategies} BitSet幂等的过滤策略
*/
public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy) {
public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy<String> idempotentStrategy) {
return new BitSetIdempotentFilterStrategies(idempotentStrategy);
}
@ -123,17 +124,16 @@ public class FilterStrategies {
*/
private static final class BitSetIdempotentFilterStrategies implements FilterStrategy {
private IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy;
private final IdempotentStrategy<String> idempotentStrategy;
public BitSetIdempotentFilterStrategies(IdempotentStrategy<Pair<String/*groupName*/, String/*namespaceId*/>, Long> idempotentStrategy) {
public BitSetIdempotentFilterStrategies(IdempotentStrategy<String> idempotentStrategy) {
this.idempotentStrategy = idempotentStrategy;
}
@Override
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
RetryTask retryTask = retryContext.getRetryTask();
boolean result = !idempotentStrategy.isExist(Pair.of(retryTask.getGroupName(), retryTask.getNamespaceId()), retryTask.getId());
boolean result = !idempotentStrategy.isExist(ImmutableTriple.of(retryTask.getGroupName(), retryTask.getNamespaceId(), retryTask.getId()).toString());
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("存在执行中的任务.uniqueId:[{0}]", retryTask.getUniqueId()));

View File

@ -1,8 +1,8 @@
package com.aizuda.snailjob.server.retry.task.support.timer;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.server.common.TimerTask;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
@ -13,7 +13,7 @@ import java.time.LocalDateTime;
* @since 2.4.0
*/
@Slf4j
public abstract class AbstractTimerTask implements TimerTask {
public abstract class AbstractTimerTask implements TimerTask<String> {
protected String groupName;
protected String uniqueId;
@ -30,7 +30,7 @@ public abstract class AbstractTimerTask implements TimerTask {
log.error("重试任务执行失败 groupName:[{}] uniqueId:[{}] namespaceId:[{}]", groupName, uniqueId, namespaceId, e);
} finally {
// 先清除时间轮的缓存
RetryTimerWheel.clearCache(Pair.of(groupName, namespaceId), uniqueId);
RetryTimerWheel.clearCache(idempotentKey());
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.timer;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskActuatorFactory;
import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutor;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
@ -9,8 +10,8 @@ import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.util.Timeout;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.Objects;
@ -18,10 +19,10 @@ import java.util.Objects;
* @author: opensnail
* @date : 2023-09-22 17:09
*/
@Slf4j
public class CallbackTimerTask extends AbstractTimerTask {
public static final String IDEMPOTENT_KEY_PREFIX = "callback_{0}_{1}_{2}";
private RetryTimerContext context;
private final RetryTimerContext context;
public CallbackTimerTask(RetryTimerContext context) {
this.context = context;
@ -32,7 +33,7 @@ public class CallbackTimerTask extends AbstractTimerTask {
@Override
protected void doRun(final Timeout timeout) {
log.debug("回调任务执行 {}", LocalDateTime.now());
SnailJobLog.LOCAL.debug("回调任务执行 {}", LocalDateTime.now());
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), context.getNamespaceId(),
@ -48,4 +49,8 @@ public class CallbackTimerTask extends AbstractTimerTask {
taskExecutor.actuator(retryTask);
}
@Override
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, context.getGroupName(), context.getNamespaceId(), context.getUniqueId());
}
}

View File

@ -9,20 +9,18 @@ import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.util.Timeout;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.util.Objects;
/**
* @author: opensnail
* @date : 2023-09-22 17:09
*/
@Data
@Slf4j
public class RetryTimerTask extends AbstractTimerTask {
public static final String IDEMPOTENT_KEY_PREFIX = "retry_{0}_{1}_{2}";
private RetryTimerContext context;
private final RetryTimerContext context;
public RetryTimerTask(RetryTimerContext context) {
this.context = context;
@ -47,4 +45,9 @@ public class RetryTimerTask extends AbstractTimerTask {
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
taskExecutor.actuator(retryTask);
}
@Override
public String idempotentKey() {
return MessageFormat.format(IDEMPOTENT_KEY_PREFIX, context.getGroupName(), context.getNamespaceId(), context.getUniqueId());
}
}

View File

@ -1,15 +1,15 @@
package com.aizuda.snailjob.server.retry.task.support.timer;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.server.retry.task.support.idempotent.TimerIdempotent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -18,9 +18,8 @@ import java.util.concurrent.TimeUnit;
* @author: opensnail
* @date : 2023-09-22 17:03
*/
@Component
@Slf4j
public class RetryTimerWheel implements Lifecycle {
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RetryTimerWheel {
private static final int TICK_DURATION = 500;
private static final String THREAD_NAME_PREFIX = "retry-task-timer-wheel-";
@ -29,39 +28,33 @@ public class RetryTimerWheel implements Lifecycle {
new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX));
private static final TimerIdempotent idempotent = new TimerIdempotent();
private static final TimerIdempotent idempotent = IdempotentHolder.getTimerIdempotent();
@Override
public void start() {
static {
timer = new HashedWheelTimer(
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS, 512,
true, -1, executor);
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS, 512,
true, -1, executor);
timer.start();
}
public static void register(Pair<String/*groupName*/, String/*namespaceId*/> pair, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
public static void register(String idempotentKey, TimerTask task, Duration delay) {
if (!isExisted(pair, uniqueId)) {
delay = delay < 0 ? 0 : delay;
if (!isExisted(idempotentKey)) {
try {
timer.newTimeout(task, delay, unit);
idempotent.set(pair, uniqueId);
timer.newTimeout(task, Math.max(delay.toMillis(), 0), TimeUnit.MILLISECONDS);
idempotent.set(idempotentKey);
} catch (Exception e) {
SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", uniqueId, e);
SnailJobLog.LOCAL.error("加入时间轮失败. uniqueId:[{}]", idempotentKey, e);
}
}
}
public static boolean isExisted(Pair<String/*groupName*/, String/*namespaceId*/> pair, String uniqueId) {
return idempotent.isExist(pair, uniqueId);
public static boolean isExisted(String idempotentKey) {
return idempotent.isExist(idempotentKey);
}
public static void clearCache(Pair<String/*groupName*/, String/*namespaceId*/> pair, String uniqueId) {
idempotent.clear(pair, uniqueId);
public static void clearCache(String idempotentKey) {
idempotent.clear(idempotentKey);
}
@Override
public void close() {
timer.stop();
}
}