feat: 2.4.0

1. job触发时间改为long
2. 修复分片任务的提交报错
This commit is contained in:
byteblogs168 2023-11-03 15:32:22 +08:00
parent 4abccf25f8
commit d40a06d790
83 changed files with 325 additions and 313 deletions

View File

@ -282,7 +282,7 @@ CREATE TABLE `job_task_batch` (
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`execution_at` datetime DEFAULT NULL COMMENT '任务执行时间',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',

View File

@ -79,14 +79,6 @@ public interface SystemConstants {
" |__/ |__/ \n" +
" :: Easy Retry :: (v{}) \n";
interface DATE_FORMAT {
DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
}
String JOB_SHARDING_VALUE_SEPARATOR = "#=@";
String JOB_SHARDING_ARGS_SEPARATOR = "#;@";
/**
* 调度时长
*/

View File

@ -42,7 +42,7 @@ public class JobBatchResponseDO {
/**
* 任务执行时间
*/
private LocalDateTime executionAt;
private Long executionAt;
/**
* 操作原因

View File

@ -55,7 +55,7 @@ public class JobTaskBatch implements Serializable {
/**
* 任务执行时间
*/
private LocalDateTime executionAt;
private Long executionAt;
/**
* 操作原因

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.common.generator.id;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.enums.IdGeneratorMode;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.SequenceAllocMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.SequenceAlloc;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -63,8 +64,6 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
*/
private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
private static final String TIME_FORMAT = "yyyyMMddHHmmssSSS";
private ThreadPoolExecutor service = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(5000), new UpdateThreadFactory());
@ -309,7 +308,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
@Override
public String idGenerator(String group) {
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern(TIME_FORMAT));
String time = DateUtils.format(DateUtils.toNowLocalDateTime(), DateUtils.PURE_DATETIME_MS_PATTERN);
return time.concat(get(group));
}

View File

@ -5,13 +5,14 @@ import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtil;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.Getter;
import java.text.ParseException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Objects;
import java.util.Random;
@ -47,6 +48,13 @@ public class WaitStrategies {
*/
private Integer delayLevel;
public void setNextTriggerAt(final long nextTriggerAt) {
this.nextTriggerAt = nextTriggerAt;
}
public void setNextTriggerAt(final LocalDateTime nextTriggerAt) {
this.nextTriggerAt = DateUtils.toEpochMilli(nextTriggerAt);
}
}
@Getter
@ -160,7 +168,7 @@ public class WaitStrategies {
@Override
public Long computeTriggerTime(WaitStrategyContext retryContext) {
return retryContext.getNextTriggerAt() + Integer.parseInt(retryContext.getTriggerInterval());
return retryContext.getNextTriggerAt() + DateUtils.toEpochMilli(Integer.parseInt(retryContext.getTriggerInterval()));
}
}
@ -174,7 +182,7 @@ public class WaitStrategies {
try {
Date nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(new Date(context.getNextTriggerAt()));
return DateUtil.toEpochMilli(nextValidTime);
return DateUtils.toEpochMilli(nextValidTime);
} catch (ParseException e) {
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);
}
@ -215,7 +223,7 @@ public class WaitStrategies {
Preconditions.checkArgument(maximum > minimum, "maximum must be > minimum but maximum is %d and minimum is", maximum, minimum);
long t = Math.abs(RANDOM.nextLong()) % (maximum - minimum);
return (t + minimum + System.currentTimeMillis()) / 1000;
return (t + minimum + DateUtils.toNowMilli());
}
}
}

View File

@ -1,8 +1,6 @@
package com.aizuda.easy.retry.server.common.util;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import java.text.ParseException;
import java.time.Duration;
@ -30,7 +28,7 @@ public class CronUtils {
ZonedDateTime zdt = now.atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(cron).getNextValidTimeAfter(Date.from(zdt.toInstant()));
now = LocalDateTime.ofEpochSecond(nextValidTime.getTime() / 1000, 0, ZoneOffset.ofHours(8));
list.add(SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS.format(now));
list.add(DateUtils.format(now, DateUtils.NORM_DATETIME_PATTERN));
} catch (ParseException ignored) {
}
}
@ -40,8 +38,8 @@ public class CronUtils {
public static long getExecuteInterval(String cron) {
List<String> executeTimeByCron = getExecuteTimeByCron(cron, 2);
LocalDateTime first = LocalDateTime.parse(executeTimeByCron.get(0), SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS);
LocalDateTime second = LocalDateTime.parse(executeTimeByCron.get(1), SystemConstants.DATE_FORMAT.YYYYMMDDHHMMSS);
LocalDateTime first = LocalDateTime.parse(executeTimeByCron.get(0), DateUtils.NORM_DATETIME_PATTERN);
LocalDateTime second = LocalDateTime.parse(executeTimeByCron.get(1), DateUtils.NORM_DATETIME_PATTERN);
Duration duration = Duration.between(first, second);
return duration.toMillis();
}

View File

@ -1,25 +0,0 @@
package com.aizuda.easy.retry.server.common.util;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
/**
* @author www.byteblogs.com
* @date 2023-11-02 23:42:53
* @since 2.4.0
*/
public class DateUtil extends cn.hutool.core.date.DateUtil {
public static long toEpochMilli(Date date) {
return DateUtil.toLocalDateTime(date).toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
public static long toEpochMilli(LocalDateTime date) {
return date.toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
public static LocalDateTime toEpochMilli(long milli) {
return LocalDateTime.ofEpochSecond(milli / 1000, 0, ZoneOffset.ofHours(8));
}
}

View File

@ -0,0 +1,57 @@
package com.aizuda.easy.retry.server.common.util;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Date;
/**
* @author www.byteblogs.com
* @date 2023-11-02 23:42:53
* @since 2.4.0
*/
public class DateUtils {
public static final DateTimeFormatter NORM_DATETIME_PATTERN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static final DateTimeFormatter PURE_DATETIME_MS_PATTERN = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
private static final ZoneOffset zoneOffset = ZoneOffset.of("+8");
private DateUtils() {
}
public static long toEpochMilli(Date date) {
return toLocalDateTime(date.getTime()).toInstant(zoneOffset).toEpochMilli();
}
public static long toEpochMilli(LocalDateTime date) {
return date.toInstant(zoneOffset).toEpochMilli();
}
public static LocalDateTime toLocalDateTime(long milli) {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(milli), zoneOffset);
}
public static long toNowMilli() {
return System.currentTimeMillis();
}
public static LocalDateTime toNowLocalDateTime() {
return LocalDateTime.now();
}
public static String format(LocalDateTime time, DateTimeFormatter dateFormatter) {
return time.format(dateFormatter);
}
public static String toNowFormat( DateTimeFormatter dateFormatter) {
return format(toNowLocalDateTime(), dateFormatter);
}
public static long toEpochMilli(long second) {
return second * 1000L;
}
}

View File

@ -51,7 +51,7 @@ public class JobTaskPrepareDTO {
/**
* 任务执行时间
*/
private LocalDateTime executionAt;
private Long executionAt;
private boolean onlyTimeoutCheck;

View File

@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
@ -37,7 +38,6 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -129,7 +129,7 @@ public class JobExecutorActor extends AbstractActor {
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(taskExecute.getTaskBatchId());
jobTaskBatch.setExecutionAt(LocalDateTime.now());
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
@ -165,10 +165,10 @@ public class JobExecutorActor extends AbstractActor {
// 获取时间差的毫秒数
long milliseconds = nextTriggerAt - preTriggerAt;
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000);
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - System.currentTimeMillis() % 1000, TimeUnit.MILLISECONDS);
JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}

View File

@ -24,7 +24,7 @@ import static com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum.NOT
*
* @author www.byteblogs.com
* @date 2023-09-25 22:20:53
* @since
* @since 2.4.0
*/
@Component(ActorGenerator.JOB_TASK_PREPARE_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

View File

@ -13,6 +13,7 @@ import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
@ -97,7 +98,7 @@ public class ScanJobTaskActor extends AbstractActor {
triggerTask = Objects.isNull(nextTriggerAt);
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
long now = System.currentTimeMillis();
if (Objects.isNull(nextTriggerAt) || (nextTriggerAt + SystemConstants.SCHEDULE_PERIOD * 1000) < now) {
if (Objects.isNull(nextTriggerAt) || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
nextTriggerAt = now;
}
}
@ -127,7 +128,7 @@ public class ScanJobTaskActor extends AbstractActor {
long now = System.currentTimeMillis();
long nextTriggerAt = partitionTask.getNextTriggerAt();
if ((nextTriggerAt + SystemConstants.SCHEDULE_PERIOD * 1000) < now) {
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
nextTriggerAt = now;
}

View File

@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
@ -59,8 +60,7 @@ public class JobTaskBatchGenerator {
}
// 进入时间轮
long delay = context.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis();
long delay = context.getNextTriggerAt() - DateUtils.toNowMilli();
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
jobTimerTaskDTO.setGroupName(context.getGroupName());

View File

@ -25,7 +25,7 @@ public class JobTaskBatchGeneratorContext {
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
private Long nextTriggerAt;
/**
* 操作原因

View File

@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@ -54,22 +55,33 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
}
String argsStr = context.getArgsStr();
Map<String, String> split = Splitter.on(SystemConstants.JOB_SHARDING_ARGS_SEPARATOR).omitEmptyStrings().withKeyValueSeparator(SystemConstants.JOB_SHARDING_VALUE_SEPARATOR).split(argsStr);
if (StrUtil.isBlank(argsStr)) {
log.error("切片参数为空. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
List<String> argsStrs;
try {
argsStrs = JsonUtil.parseList(argsStr, String.class);
} catch (Exception e) {
log.error("切片参数解析失败. jobId:[{}]", context.getJobId(), e);
return Lists.newArrayList();
}
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
List<JobTask> jobTasks = new ArrayList<>(split.size());
split.forEach((key, value) -> {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(Integer.parseInt(key) % serverNodes.size());
List<JobTask> jobTasks = new ArrayList<>(argsStrs.size());
for (int index = 0; index < argsStrs.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(value);
jobTask.setArgsStr(argsStrs.get(index));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
jobTasks.add(jobTask);
});
}
return jobTasks;
}

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.prepare;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
@ -16,8 +17,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.ZoneId;
/**
* 处理处于{@link JobTaskBatchStatusEnum::RUNNING}状态的任务
*
@ -48,11 +47,11 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间
long delay = System.currentTimeMillis() - prepare.getExecutionAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
long delay = DateUtils.toNowMilli() - prepare.getExecutionAt();
// 计算超时时间到达超时时间中断任务
if (delay > prepare.getExecutorTimeout() * 1000) {
log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, prepare.getExecutorTimeout() * 1000);
if (delay > DateUtils.toEpochMilli(prepare.getExecutorTimeout())) {
log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, DateUtils.toEpochMilli(prepare.getExecutorTimeout()));
// 超时停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(prepare.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(prepare);

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.prepare;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
@ -8,7 +9,6 @@ import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
/**
@ -36,8 +36,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
log.info("存在待处理任务且时间轮中不存在 taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 进入时间轮
long delay = jobPrepareDTO.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis();
long delay = jobPrepareDTO.getNextTriggerAt() - DateUtils.toNowMilli();
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId());
jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId());

View File

@ -65,7 +65,7 @@ public class BlockStrategies {
/**
* 下次触发时间
*/
private LocalDateTime nextTriggerAt;
private Long nextTriggerAt;
private Integer operationReason;

View File

@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.generator.id.IdGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext.TaskInfo;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
@ -136,7 +137,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
retryTask.setNextTriggerAt(waitStrategy.computeTriggerTime(waitStrategyContext));
retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
waitInsertTasks.add(retryTask);
// 初始化日志

View File

@ -12,21 +12,6 @@ import java.util.List;
*/
public interface RetryService {
/**
* 单个上报接口
*
* @param retryTaskDTO {@link RetryTaskDTO} 重试上报DTO
* @return true- 处理成功 false- 处理失败
*/
Boolean reportRetry(RetryTaskDTO retryTaskDTO);
/**
* 批量上报
*
* @param retryTaskDTOList {@link RetryTaskDTO} 重试上报DTO 列表
* @return true- 全部处理成功 false- 全部处理失败
*/
Boolean batchReportRetry(List<RetryTaskDTO> retryTaskDTOList);
/**
* 迁移到达最大重试次数到死信队列

View File

@ -10,6 +10,7 @@ import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.generator.id.IdGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.retry.task.service.RetryDeadLetterConverter;
import com.aizuda.easy.retry.server.retry.task.service.RetryService;
@ -47,103 +48,6 @@ public class RetryServiceImpl implements RetryService {
@Autowired
private AccessTemplate accessTemplate;
@Autowired
private List<IdGenerator> idGeneratorList;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Transactional
@Override
public Boolean reportRetry(RetryTaskDTO retryTaskDTO) {
LogUtils.info(log, "received report data. <|>{}<|>", JsonUtil.toJsonString(retryTaskDTO));
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
SceneConfig sceneConfig = sceneConfigAccess.getSceneConfigByGroupNameAndSceneName(retryTaskDTO.getGroupName(),
retryTaskDTO.getSceneName());
if (Objects.isNull(sceneConfig)) {
GroupConfig groupConfig = sceneConfigAccess.getGroupConfigByGroupName(retryTaskDTO.getGroupName());
if (Objects.isNull(groupConfig)) {
throw new EasyRetryServerException(
"failed to report data, no group configuration found. groupName:[{}]", retryTaskDTO.getGroupName());
}
if (groupConfig.getInitScene().equals(StatusEnum.NO.getStatus())) {
throw new EasyRetryServerException(
"failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]",
retryTaskDTO.getGroupName(), retryTaskDTO.getSceneName());
} else {
// 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景
initScene(retryTaskDTO);
}
}
// 此处做幂等处理避免客户端重复多次上报
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
long count =retryTaskAccess.count(retryTaskDTO.getGroupName(), new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getIdempotentId, retryTaskDTO.getIdempotentId())
.eq(RetryTask::getGroupName, retryTaskDTO.getGroupName())
.eq(RetryTask::getSceneName, retryTaskDTO.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
);
if (0 < count) {
LogUtils.warn(log, "interrupted reporting in retrying task. [{}]", JsonUtil.toJsonString(retryTaskDTO));
return Boolean.TRUE;
}
LocalDateTime now = LocalDateTime.now();
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTaskDTO);
retryTask.setUniqueId(getIdGenerator(retryTaskDTO.getGroupName()));
retryTask.setTaskType(TaskTypeEnum.RETRY.getType());
retryTask.setCreateDt(now);
retryTask.setUpdateDt(now);
if (StrUtil.isBlank(retryTask.getExtAttrs())) {
retryTask.setExtAttrs(StrUtil.EMPTY);
}
retryTask.setNextTriggerAt(
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null));
Assert.isTrue(1 == retryTaskAccess.insert(retryTaskDTO.getGroupName(), retryTask),
() -> new EasyRetryServerException("failed to report data"));
// 初始化日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
retryTaskLog.setTaskType(TaskTypeEnum.RETRY.getType());
retryTaskLog.setCreateDt(now);
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
() -> new EasyRetryServerException("新增重试日志失败"));
return Boolean.TRUE;
}
/**
* 若配置了默认初始化场景配置则发现上报数据的时候未配置场景默认生成一个场景 backOff(退避策略): 等级策略 maxRetryCount(最大重试次数): 26 triggerInterval(间隔时间): see:
* {@link DelayLevelEnum}
*
* @param retryTaskDTO 重试上报DTO
*/
private void initScene(final RetryTaskDTO retryTaskDTO) {
SceneConfig sceneConfig = new SceneConfig();
sceneConfig.setGroupName(retryTaskDTO.getGroupName());
sceneConfig.setSceneName(retryTaskDTO.getSceneName());
sceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
sceneConfig.setBackOff(WaitStrategies.WaitStrategyEnum.DELAY_LEVEL.getType());
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
sceneConfig.setDescription("自动初始化场景");
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig),
() -> new EasyRetryServerException("init scene error"));
}
@Transactional
@Override
public Boolean batchReportRetry(List<RetryTaskDTO> retryTaskDTOList) {
retryTaskDTOList.forEach(this::reportRetry);
return Boolean.TRUE;
}
@Transactional
@Override
@ -250,21 +154,4 @@ public class RetryServiceImpl implements RetryService {
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
}
/**
* 获取分布式id
*
* @param groupName 组id
* @return 分布式id
*/
private String getIdGenerator(String groupName) {
GroupConfig groupConfig = accessTemplate.getGroupConfigAccess().getGroupConfigByGroupName(groupName);
for (final IdGenerator idGenerator : idGeneratorList) {
if (idGenerator.supports(groupConfig.getIdGeneratorMode())) {
return idGenerator.idGenerator(groupName);
}
}
throw new EasyRetryServerException("id generator mode not configured. [{}]", groupName);
}
}

View File

@ -9,6 +9,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
@ -98,7 +99,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
AtomicInteger count = new AtomicInteger(0);
long total = PartitionTaskUtils.process(
startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()),
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> {
partitionTasks -> processRetryPartitionTasks(partitionTasks), partitionTasks -> {
if (CollectionUtils.isEmpty(partitionTasks)) {
putLastId(scanTask.getGroupName(), 0L);
return Boolean.TRUE;
@ -118,7 +119,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
}
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks) {
for (PartitionTask partitionTask : partitionTasks) {
processRetryTask((RetryPartitionTask) partitionTask);
@ -133,8 +134,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
retryTask.setId(partitionTask.getId());
accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask);
long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis() - System.currentTimeMillis() % 500;
long nowMilli = DateUtils.toNowMilli();
long delay = DateUtils.toEpochMilli(partitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100;
RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask),
delay,
TimeUnit.MILLISECONDS);

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
@ -64,7 +65,7 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval));
// 更新触发时间, 任务进入时间轮
return waitStrategy.computeTriggerTime(waitStrategyContext);
return DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext));
}
@Override

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtil;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
@ -17,7 +17,6 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -71,12 +70,12 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
nextTriggerAt = now;
}
waitStrategyContext.setNextTriggerAt(DateUtil.toEpochMilli(nextTriggerAt));
waitStrategyContext.setNextTriggerAt(DateUtils.toEpochMilli(nextTriggerAt));
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(partitionTask.getRetryCount() + 1);
// 更新触发时间, 任务进入时间轮
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
return DateUtil.toEpochMilli(waitStrategy.computeTriggerTime(waitStrategyContext));
return DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext));
}
@Override

View File

@ -3,10 +3,13 @@ package com.aizuda.easy.retry.server.retry.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtil;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
@ -64,8 +67,13 @@ public class CallbackRetryTaskHandler {
callbackRetryTask.setCreateDt(LocalDateTime.now());
callbackRetryTask.setUpdateDt(LocalDateTime.now());
Long triggerTime = WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null);
callbackRetryTask.setNextTriggerAt(DateUtil.toLocalDateTime(Instant.ofEpochMilli(triggerTime)));
long triggerInterval = systemProperties.getCallback().getTriggerInterval();
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli());
waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval));
callbackRetryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),

View File

@ -3,13 +3,13 @@ package com.aizuda.easy.retry.server.retry.task.support.schedule;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.DATE_FORMAT;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
@ -80,8 +80,8 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
groupConfig.getGroupName(),
now.minusMinutes(30).format(DATE_FORMAT.YYYYMMDDHHMMSS),
now.format(DATE_FORMAT.YYYYMMDDHHMMSS),
DateUtils.format(now.minusMinutes(30), DateUtils.NORM_DATETIME_PATTERN),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN),
count)
.title("组:[{}] 环境重试失败数据监控", groupConfig.getGroupName())
.notifyAttribute(notifyConfig.getNotifyAttribute());

View File

@ -3,7 +3,6 @@ package com.aizuda.easy.retry.server.retry.task.support.schedule;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.DATE_FORMAT;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -11,6 +10,7 @@ import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
@ -76,7 +76,7 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
.text(retryTaskMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
groupConfig.getGroupName(),
LocalDateTime.now().format(DATE_FORMAT.YYYYMMDDHHMMSS),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN),
count)
.title("组:[{}])重试数据过多", groupConfig.getGroupName())
.notifyAttribute(notifyConfig.getNotifyAttribute());

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
(window["webpackJsonp"]=window["webpackJsonp"]||[]).push([["chunk-2d0aa660"],{"119c":function(t,a,e){"use strict";e.r(a);e("b0c0");var o=function(){var t=this,a=t._self._c;return a("div",[a("page-header-wrapper",{staticStyle:{margin:"-24px -1px 0"},on:{back:function(){return t.$router.go(-1)}}},[a("div")]),null!==t.jobBatchInfo?a("a-card",{attrs:{bordered:!1}},[a("a-descriptions",{attrs:{title:"",column:3,bordered:""}},[a("a-descriptions-item",{attrs:{label:"组名称"}},[t._v(" "+t._s(t.jobBatchInfo.groupName)+" ")]),a("a-descriptions-item",{attrs:{label:"任务名称"}},[t._v(" "+t._s(t.jobBatchInfo.jobName)+" ")]),a("a-descriptions-item",{attrs:{label:"状态"}},[a("a-tag",{attrs:{color:t.taskBatchStatus[t.jobBatchInfo.taskBatchStatus].color}},[t._v(" "+t._s(t.taskBatchStatus[t.jobBatchInfo.taskBatchStatus].name)+" ")])],1),a("a-descriptions-item",{attrs:{label:"执行器类型"}},[a("a-tag",{attrs:{color:t.executorType[t.jobBatchInfo.executorType].color}},[t._v(" "+t._s(t.executorType[t.jobBatchInfo.executorType].name)+" ")])],1),a("a-descriptions-item",{attrs:{label:"操作原因"}},[a("a-tag",{attrs:{color:t.operationReason[t.jobBatchInfo.operationReason].color}},[t._v(" "+t._s(t.operationReason[t.jobBatchInfo.operationReason].name)+" ")])],1),a("a-descriptions-item",{attrs:{label:"开始执行时间"}},[t._v(" "+t._s(t.jobBatchInfo.executionAt)+" ")]),a("a-descriptions-item",{attrs:{label:"执行器名称",span:"4"}},[t._v(" "+t._s(t.jobBatchInfo.executorInfo)+" ")]),a("a-descriptions-item",{attrs:{label:"创建时间"}},[t._v(" "+t._s(t.jobBatchInfo.createDt)+" ")])],1)],1):t._e(),a("div",{staticStyle:{margin:"20px 0","border-left":"#f5222d 5px solid","font-size":"medium","font-weight":"bold"}},[t._v("    任务项列表 ")]),a("JobTaskList",{ref:"JobTaskListRef"})],1)},r=[],s=e("3b7a"),n=e("c1df"),c=e.n(n),i=e("38b7"),u=e.n(i),p=e("36e8"),b={name:"JobInfo",components:{JobTaskList:p["default"]},data:function(){return{jobBatchInfo:null,taskBatchStatus:u.a.taskBatchStatus,operationReason:u.a.operationReason,taskType:u.a.taskType,triggerType:u.a.triggerType,blockStrategy:u.a.blockStrategy,executorType:u.a.executorType}},created:function(){var t=this,a=this.$route.query.id,e=this.$route.query.groupName;a&&e?Object(s["d"])(a).then((function(e){t.jobBatchInfo=e.data,t.queryParam={groupName:t.jobBatchInfo.groupName,taskBatchId:a},t.$refs.JobTaskListRef.refreshTable(t.queryParam)})):this.$router.push({path:"/404"})},methods:{jobTaskList:s["h"],parseDate:function(t){return c()(t).format("YYYY-MM-DD HH:mm:ss")}}},l=b,f=e("2877"),h=Object(f["a"])(l,o,r,!1,null,"1a941578",null);a["default"]=h.exports}}]);

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.web.service.convert;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.web.model.response.JobBatchResponseVO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
@ -9,7 +10,9 @@ import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
/**
* @author: shuguang.zhang
@ -24,9 +27,24 @@ public interface JobBatchResponseVOConverter {
List<JobBatchResponseVO> toJobBatchResponseVOs(List<JobBatchResponseDO> jobBatches);
@Mappings({
@Mapping(source = "jobBatch.groupName", target = "groupName"),
@Mapping(source = "jobBatch.id", target = "id"),
@Mapping(source = "jobBatch.createDt", target = "createDt")
@Mapping(target = "executionAt", expression = "java(JobBatchResponseVOConverter.toLocalDateTime(jobBatchResponseDO.getExecutionAt()))")
})
JobBatchResponseVO toJobBatchResponseVO(JobBatchResponseDO jobBatchResponseDO);
@Mappings({
@Mapping(source = "jobBatch.groupName", target = "groupName"),
@Mapping(source = "jobBatch.id", target = "id"),
@Mapping(source = "jobBatch.createDt", target = "createDt"),
@Mapping(target = "executionAt", expression = "java(JobBatchResponseVOConverter.toLocalDateTime(jobBatch.getExecutionAt()))")
})
JobBatchResponseVO toJobBatchResponseVO(JobTaskBatch jobBatch, Job job);
static LocalDateTime toLocalDateTime(Long nextTriggerAt) {
if (Objects.isNull(nextTriggerAt) || nextTriggerAt == 0) {
return null;
}
return DateUtils.toLocalDateTime(nextTriggerAt);
}
}

View File

@ -1,11 +1,16 @@
package com.aizuda.easy.retry.server.web.service.convert;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.web.model.response.JobResponseVO;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
/**
* @author www.byteblogs.com
@ -17,7 +22,21 @@ public interface JobResponseVOConverter {
JobResponseVOConverter INSTANCE = Mappers.getMapper(JobResponseVOConverter.class);
// @Mappings({
// @Mapping(source = "nextTriggerAt", target = "nextTriggerAt", expression = "java(DateUtils.toLocalDateTime())")
// })
List<JobResponseVO> toJobResponseVOs(List<Job> jobs);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(JobResponseVOConverter.toLocalDateTime(job.getNextTriggerAt()))")
})
JobResponseVO toJobResponseVO(Job job);
static LocalDateTime toLocalDateTime(Long nextTriggerAt) {
if (Objects.isNull(nextTriggerAt) || nextTriggerAt == 0) {
return null;
}
return DateUtils.toLocalDateTime(nextTriggerAt);
}
}

View File

@ -9,6 +9,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.CronUtils;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.JobQueryVO;
@ -112,7 +113,7 @@ public class JobServiceImpl implements JobService {
Job job = updateJobResident(jobRequestVO);
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName())
% systemProperties.getBucketTotal());
job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()));
job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
return 1 == jobMapper.insert(job);
}
@ -128,23 +129,23 @@ public class JobServiceImpl implements JobService {
// 非常驻任务 > 非常驻任务
if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(),
StatusEnum.NO.getStatus())) {
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()));
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.NO.getStatus())) {
// 常驻任务的触发时间
LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId()))
.orElse(LocalDateTime.now());
long time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId()))
.orElse(DateUtils.toNowMilli());
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time));
// 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.YES.getStatus())) {
updateJob.setNextTriggerAt(LocalDateTime.now());
updateJob.setNextTriggerAt(DateUtils.toNowMilli());
}
return 1 == jobMapper.updateById(updateJob);
}
private static LocalDateTime calculateNextTriggerAt(final JobRequestVO jobRequestVO, LocalDateTime time) {
private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) {
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());

View File

@ -4,9 +4,13 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.web.service.RetryDeadLetterService;
import com.aizuda.easy.retry.server.web.service.convert.RetryDeadLetterResponseVOConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
@ -16,11 +20,13 @@ import com.aizuda.easy.retry.server.web.model.request.BatchRollBackRetryDeadLett
import com.aizuda.easy.retry.server.web.model.request.RetryDeadLetterQueryVO;
import com.aizuda.easy.retry.server.web.model.response.RetryDeadLetterResponseVO;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.access.ConfigAccess;
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -30,9 +36,13 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -76,17 +86,17 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
}
PageDTO<RetryDeadLetter> retryDeadLetterPageDTO = accessTemplate.getRetryDeadLetterAccess()
.listPage(queryVO.getGroupName(), pageDTO, retryDeadLetterLambdaQueryWrapper);
.listPage(queryVO.getGroupName(), pageDTO, retryDeadLetterLambdaQueryWrapper);
return new PageResult<>(retryDeadLetterPageDTO,
RetryDeadLetterResponseVOConverter.INSTANCE.batchConvert(retryDeadLetterPageDTO.getRecords()));
RetryDeadLetterResponseVOConverter.INSTANCE.batchConvert(retryDeadLetterPageDTO.getRecords()));
}
@Override
public RetryDeadLetterResponseVO getRetryDeadLetterById(String groupName, Long id) {
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
RetryDeadLetter retryDeadLetter = retryDeadLetterAccess.one(groupName,
new LambdaQueryWrapper<RetryDeadLetter>().eq(RetryDeadLetter::getId, id));
new LambdaQueryWrapper<RetryDeadLetter>().eq(RetryDeadLetter::getId, id));
return RetryDeadLetterResponseVOConverter.INSTANCE.convert(retryDeadLetter);
}
@ -98,27 +108,50 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
List<Long> ids = rollBackRetryDeadLetterVO.getIds();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
List<RetryDeadLetter> retryDeadLetterList = retryDeadLetterAccess.list(groupName,
new LambdaQueryWrapper<RetryDeadLetter>().in(RetryDeadLetter::getId, ids));
new LambdaQueryWrapper<RetryDeadLetter>().in(RetryDeadLetter::getId, ids));
Assert.notEmpty(retryDeadLetterList, () -> new EasyRetryServerException("数据不存在"));
ConfigAccess<SceneConfig> sceneConfigAccess = accessTemplate.getSceneConfigAccess();
Set<String> sceneNameSet = retryDeadLetterList.stream().map(RetryDeadLetter::getSceneName)
.collect(Collectors.toSet());
List<SceneConfig> sceneConfigs = sceneConfigAccess.list(new LambdaQueryWrapper<SceneConfig>()
.in(SceneConfig::getSceneName, sceneNameSet));
Map<String, SceneConfig> sceneConfigMap = sceneConfigs.stream().collect(Collectors.toMap((sceneConfig) ->
sceneConfig.getGroupName() + sceneConfig.getSceneName(), Function.identity()));
List<RetryTask> waitRollbackList = new ArrayList<>();
for (RetryDeadLetter retryDeadLetter : retryDeadLetterList) {
SceneConfig sceneConfig = sceneConfigMap.get(
retryDeadLetter.getGroupName() + retryDeadLetter.getSceneName());
Assert.notNull(sceneConfig, () -> new EasyRetryServerException("未查询到场景. [{}]", retryDeadLetter.getSceneName()));
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter);
retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
retryTask.setTaskType(TaskTypeEnum.RETRY.getType());
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null));
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(LocalDateTime.now());
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
retryTask.setCreateDt(LocalDateTime.now());
waitRollbackList.add(retryTask);
}
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
Assert.isTrue(waitRollbackList.size() == retryTaskAccess.batchInsert(groupName, waitRollbackList), () -> new EasyRetryServerException("新增重试任务失败"));
Assert.isTrue(waitRollbackList.size() == retryTaskAccess.batchInsert(groupName, waitRollbackList),
() -> new EasyRetryServerException("新增重试任务失败"));
Set<Long> waitDelRetryDeadLetterIdSet = retryDeadLetterList.stream().map(RetryDeadLetter::getId).collect(Collectors.toSet());
Assert.isTrue(waitDelRetryDeadLetterIdSet.size() == retryDeadLetterAccess.delete(groupName, new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, groupName)
.in(RetryDeadLetter::getId, waitDelRetryDeadLetterIdSet)), () -> new EasyRetryServerException("删除死信队列数据失败"))
Set<Long> waitDelRetryDeadLetterIdSet = retryDeadLetterList.stream().map(RetryDeadLetter::getId)
.collect(Collectors.toSet());
Assert.isTrue(waitDelRetryDeadLetterIdSet.size() == retryDeadLetterAccess.delete(groupName,
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, groupName)
.in(RetryDeadLetter::getId, waitDelRetryDeadLetterIdSet)),
() -> new EasyRetryServerException("删除死信队列数据失败"))
;
// 变更日志的状态
@ -127,9 +160,10 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
Set<String> uniqueIdSet = waitRollbackList.stream().map(RetryTask::getUniqueId).collect(Collectors.toSet());
int update = retryTaskLogMapper.update(retryTaskLog, new LambdaUpdateWrapper<RetryTaskLog>()
.in(RetryTaskLog::getUniqueId, uniqueIdSet)
.eq(RetryTaskLog::getGroupName, groupName));
Assert.isTrue(update == uniqueIdSet.size(), () -> new EasyRetryServerException("回滚日志状态失败, 可能原因: 日志信息缺失或存在多个相同uniqueId"));
.in(RetryTaskLog::getUniqueId, uniqueIdSet)
.eq(RetryTaskLog::getGroupName, groupName));
Assert.isTrue(update == uniqueIdSet.size(),
() -> new EasyRetryServerException("回滚日志状态失败, 可能原因: 日志信息缺失或存在多个相同uniqueId"));
return update;
}
@ -138,8 +172,8 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
public int batchDelete(BatchDeleteRetryDeadLetterVO deadLetterVO) {
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
return retryDeadLetterAccess.delete(deadLetterVO.getGroupName(),
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds()));
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds()));
}
}

View File

@ -6,12 +6,16 @@ import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext;
@ -139,7 +143,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
RetryStatusEnum retryStatusEnum = RetryStatusEnum.getByStatus(retryTaskUpdateStatusRequestVO.getRetryStatus());
if (Objects.isNull(retryStatusEnum)) {
throw new EasyRetryServerException("重试状态错误");
throw new EasyRetryServerException("重试状态错误. [{}]", retryTaskUpdateStatusRequestVO.getRetryStatus());
}
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
@ -154,8 +158,15 @@ public class RetryTaskServiceImpl implements RetryTaskService {
// 若恢复重试则需要重新计算下次触发时间
if (RetryStatusEnum.RUNNING.getStatus().equals(retryStatusEnum.getStatus())) {
retryTask.setNextTriggerAt(
WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeTriggerTime(null));
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(DateUtils.toNowMilli());
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
waitStrategyContext.setDelayLevel(retryTask.getRetryCount() + 1);
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
retryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
}
if (RetryStatusEnum.FINISH.getStatus().equals(retryStatusEnum.getStatus())) {
@ -163,7 +174,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
RetryTaskLogMessage retryTaskLogMessage = new RetryTaskLogMessage();
retryTaskLogMessage.setUniqueId(retryTask.getUniqueId());
retryTaskLogMessage.setGroupName(retryTask.getGroupName());
retryTaskLogMessage.setMessage("页面操作完成");
retryTaskLogMessage.setMessage("手动操作完成");
retryTaskLogMessage.setCreateDt(LocalDateTime.now());
retryTaskLogMessageMapper.insert(retryTaskLogMessage);

View File

@ -483,7 +483,7 @@ export default {
this.form.validateFields((err, values) => {
if (!err) {
if (this.taskTypeValue === '3') {
values['argsStr'] = this.argsStrValue
values['argsStr'] = JSON.stringify(this.argsStrValue)
}
if (this.formType === 'create') {
@ -518,8 +518,13 @@ export default {
formData.blockStrategy = formData.blockStrategy.toString()
formData.triggerType = formData.triggerType.toString()
this.triggerTypeValue = formData.triggerType
this.argsStrValue = JSON.parse(formData.argsStr)
formData.argsStr = this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join(';')
this.taskTypeValue = formData.taskType
if (this.taskTypeValue === '3') {
this.argsStrValue = JSON.parse(formData.argsStr)
formData.argsStr = this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join(';')
}
form.setFieldsValue(formData)
})
},