feat:2.4.0
1. 支持10秒以内任务调度
This commit is contained in:
parent
6b1380b8ca
commit
60d52e8524
@ -235,6 +235,7 @@ CREATE TABLE `job` (
|
|||||||
`parallel_num` int(11) NOT NULL DEFAULT '1' COMMENT '并行数',
|
`parallel_num` int(11) NOT NULL DEFAULT '1' COMMENT '并行数',
|
||||||
`retry_interval` int(11) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)',
|
`retry_interval` int(11) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)',
|
||||||
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
|
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
|
||||||
|
`resident` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否是常驻任务',
|
||||||
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
|
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
|
||||||
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
|
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
@ -1,14 +1,14 @@
|
|||||||
package com.aizuda.easy.retry.client.job.core.cache;
|
package com.aizuda.easy.retry.client.job.core.cache;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.client.job.core.dto.JobExecutorInfo;
|
|
||||||
import com.aizuda.easy.retry.client.model.ExecuteResult;
|
import com.aizuda.easy.retry.client.model.ExecuteResult;
|
||||||
import com.google.common.collect.HashBasedTable;
|
import com.google.common.collect.HashBasedTable;
|
||||||
import com.google.common.collect.Table;
|
import com.google.common.collect.Table;
|
||||||
import com.google.common.collect.Tables;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -18,28 +18,18 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
*/
|
*/
|
||||||
public class FutureCache {
|
public class FutureCache {
|
||||||
|
|
||||||
private static final Table<Long, Long, ListenableFuture<ExecuteResult>> futureCache = HashBasedTable.create();
|
private static final ConcurrentHashMap<Long, ListenableFuture<ExecuteResult>> futureCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public static void addFuture(Long taskBatchId, Long taskId, ListenableFuture<ExecuteResult> future) {
|
public static void addFuture(Long taskBatchId, ListenableFuture<ExecuteResult> future) {
|
||||||
futureCache.put(taskBatchId, taskId, future);
|
futureCache.put(taskBatchId, future);
|
||||||
}
|
|
||||||
|
|
||||||
public static void remove(Long taskBatchId, Long taskId) {
|
|
||||||
ListenableFuture<ExecuteResult> future = futureCache.get(taskBatchId, taskId);
|
|
||||||
future.cancel(true);
|
|
||||||
futureCache.remove(taskBatchId, taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void remove(Long taskBatchId) {
|
public static void remove(Long taskBatchId) {
|
||||||
Map<Long, ListenableFuture<ExecuteResult>> futureMap = futureCache.row(taskBatchId);
|
Optional.ofNullable(futureCache.get(taskBatchId)).ifPresent(future -> {
|
||||||
if (Objects.isNull(futureMap)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
futureMap.forEach((taskId, future) -> {
|
|
||||||
future.cancel(true);
|
future.cancel(true);
|
||||||
futureCache.remove(taskBatchId, taskId);
|
futureCache.remove(taskBatchId);
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO 任务执行完成了,该如何优雅的终止线程池?????
|
|
||||||
*
|
*
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2023-09-27 17:12
|
* @date : 2023-09-27 17:12
|
||||||
@ -46,6 +45,7 @@ public class ThreadPoolCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
threadPoolExecutor.shutdownNow();
|
threadPoolExecutor.shutdownNow();
|
||||||
|
CACHE_THREAD_POOL.remove(taskBatchId);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,12 +72,12 @@ public class JobEndPoint {
|
|||||||
|
|
||||||
@PostMapping("/stop/v1")
|
@PostMapping("/stop/v1")
|
||||||
public Result<Boolean> stopJob(@RequestBody @Validated StopJobDTO interruptJob) {
|
public Result<Boolean> stopJob(@RequestBody @Validated StopJobDTO interruptJob) {
|
||||||
ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskId());
|
ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId());
|
||||||
if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {
|
if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {
|
||||||
return new Result<>(Boolean.TRUE);
|
return new Result<>(Boolean.TRUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadPoolCache.stopThreadPool(interruptJob.getTaskId());
|
ThreadPoolCache.stopThreadPool(interruptJob.getTaskBatchId());
|
||||||
return new Result<>(threadPool.isShutdown() || threadPool.isTerminated());
|
return new Result<>(threadPool.isShutdown() || threadPool.isTerminated());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
|||||||
return doJobExecute(jobArgs);
|
return doJobExecute(jobArgs);
|
||||||
});
|
});
|
||||||
|
|
||||||
FutureCache.addFuture(jobContext.getTaskBatchId(), jobContext.getTaskId(), submit);
|
FutureCache.addFuture(jobContext.getTaskBatchId(), submit);
|
||||||
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator);
|
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
package com.aizuda.easy.retry.client.job.core.executor;
|
package com.aizuda.easy.retry.client.job.core.executor;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.client.common.proxy.RequestBuilder;
|
import com.aizuda.easy.retry.client.common.proxy.RequestBuilder;
|
||||||
|
import com.aizuda.easy.retry.client.job.core.cache.ThreadPoolCache;
|
||||||
import com.aizuda.easy.retry.client.job.core.client.JobNettyClient;
|
import com.aizuda.easy.retry.client.job.core.client.JobNettyClient;
|
||||||
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
|
import com.aizuda.easy.retry.client.job.core.dto.JobContext;
|
||||||
import com.aizuda.easy.retry.client.model.ExecuteResult;
|
import com.aizuda.easy.retry.client.model.ExecuteResult;
|
||||||
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
|
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.common.core.model.NettyResult;
|
import com.aizuda.easy.retry.common.core.model.NettyResult;
|
||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
@ -47,6 +49,14 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
|||||||
CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus));
|
CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
|
log.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
|
||||||
|
} finally {
|
||||||
|
stopThreadPool();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopThreadPool() {
|
||||||
|
if (jobContext.getTaskType() == TaskTypeEnum.CLUSTER.getType()) {
|
||||||
|
ThreadPoolCache.stopThreadPool(jobContext.getTaskBatchId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,6 +70,8 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
|||||||
);
|
);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
|
log.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
|
||||||
|
} finally {
|
||||||
|
stopThreadPool();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -15,8 +15,8 @@ public class StopJobDTO {
|
|||||||
@NotNull(message = "jobId 不能为空")
|
@NotNull(message = "jobId 不能为空")
|
||||||
private Long jobId;
|
private Long jobId;
|
||||||
|
|
||||||
@NotNull(message = "taskId 不能为空")
|
@NotNull(message = "taskBatchId 不能为空")
|
||||||
private Long taskId;
|
private Long taskBatchId;
|
||||||
|
|
||||||
@NotBlank(message = "group 不能为空")
|
@NotBlank(message = "group 不能为空")
|
||||||
private String groupName;
|
private String groupName;
|
||||||
|
@ -124,6 +124,11 @@ public class Job implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private Integer bucketIndex;
|
private Integer bucketIndex;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否是常驻任务
|
||||||
|
*/
|
||||||
|
private Integer resident;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 描述
|
* 描述
|
||||||
*/
|
*/
|
||||||
|
@ -54,4 +54,8 @@ public class JobPartitionTask extends PartitionTask {
|
|||||||
*/
|
*/
|
||||||
private Integer taskType;
|
private Integer taskType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否是常驻任务
|
||||||
|
*/
|
||||||
|
private Integer resident;
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,7 @@ public interface JobTaskConverter {
|
|||||||
BlockStrategies.BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO);
|
BlockStrategies.BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO);
|
||||||
|
|
||||||
TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context);
|
TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context);
|
||||||
|
TaskStopJobContext toStopJobContext(JobExecutorResultDTO context);
|
||||||
|
|
||||||
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
|
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
|
||||||
|
|
||||||
|
@ -0,0 +1,42 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.cache;
|
||||||
|
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-10-21 23:35:42
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
public class ResidentTaskCache {
|
||||||
|
|
||||||
|
private static final Cache<Long, LocalDateTime> cache;
|
||||||
|
|
||||||
|
static {
|
||||||
|
cache = CacheBuilder.newBuilder()
|
||||||
|
.concurrencyLevel(8) // 并发级别
|
||||||
|
.expireAfterWrite(10, TimeUnit.SECONDS) // 写入后的过期时间
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void refresh(Long jobId, LocalDateTime nextTriggerTime) {
|
||||||
|
cache.put(jobId, nextTriggerTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static LocalDateTime getOrDefault(Long jobId, LocalDateTime nextTriggerTime) {
|
||||||
|
return Optional.ofNullable(cache.getIfPresent(jobId)).orElse(nextTriggerTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static LocalDateTime get(Long jobId) {
|
||||||
|
return getOrDefault(jobId, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isResident(Long jobId) {
|
||||||
|
return cache.asMap().containsKey(jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -3,13 +3,17 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
|
|||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
|
||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
|
||||||
import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler;
|
import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||||
@ -60,7 +64,18 @@ public class JobExecutorResultActor extends AbstractActor {
|
|||||||
()-> new EasyRetryServerException("更新任务实例失败"));
|
()-> new EasyRetryServerException("更新任务实例失败"));
|
||||||
|
|
||||||
// 更新批次上的状态
|
// 更新批次上的状态
|
||||||
jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReasonEnum());
|
boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReasonEnum());
|
||||||
|
if (complete) {
|
||||||
|
// 尝试停止任务
|
||||||
|
// 若是集群任务则客户端会主动关闭
|
||||||
|
if (result.getTaskType() != TaskTypeEnum.CLUSTER.getType()) {
|
||||||
|
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
|
||||||
|
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
|
||||||
|
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
|
||||||
|
stopJobContext.setForceStop(Boolean.TRUE);
|
||||||
|
instanceInterrupt.stop(stopJobContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -5,9 +5,9 @@ import akka.actor.ActorRef;
|
|||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
|
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
||||||
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
||||||
@ -15,6 +15,9 @@ import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
|||||||
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
|
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
|
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
@ -27,6 +30,7 @@ import org.springframework.stereotype.Component;
|
|||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
import static com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.*;
|
import static com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.*;
|
||||||
|
|
||||||
@ -62,46 +66,77 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
private void doScan(final ScanTask scanTask) {
|
private void doScan(final ScanTask scanTask) {
|
||||||
log.info("job scan start");
|
log.info("job scan start");
|
||||||
|
|
||||||
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), partitionTasks -> {
|
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), this::processJobPartitionTasks, scanTask.getStartId());
|
||||||
for (final JobPartitionTask partitionTask : (List<JobPartitionTask>) partitionTasks) {
|
|
||||||
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
|
|
||||||
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
|
|
||||||
|
|
||||||
// 更新下次触发时间
|
|
||||||
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType());
|
|
||||||
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
|
|
||||||
waitStrategyContext.setTriggerType(partitionTask.getTriggerType());
|
|
||||||
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
|
|
||||||
waitStrategyContext.setNextTriggerAt(jobTaskPrepare.getNextTriggerAt());
|
|
||||||
|
|
||||||
Job job = new Job();
|
|
||||||
job.setId(partitionTask.getId());
|
|
||||||
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
|
|
||||||
Assert.isTrue(1 == jobMapper.updateById(job),
|
|
||||||
() -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId()));
|
|
||||||
|
|
||||||
// 执行预处理阶段
|
|
||||||
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
|
||||||
actorRef.tell(jobTaskPrepare, actorRef);
|
|
||||||
}
|
|
||||||
}, scanTask.getStartId());
|
|
||||||
|
|
||||||
log.info("job scan end. total:[{}]", total);
|
log.info("job scan end. total:[{}]", total);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void processJobPartitionTasks(List<? extends PartitionTask> partitionTasks) {
|
||||||
|
for (PartitionTask partitionTask : partitionTasks) {
|
||||||
|
processJob((JobPartitionTask) partitionTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processJob(JobPartitionTask partitionTask) {
|
||||||
|
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
|
||||||
|
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
|
||||||
|
|
||||||
|
Job job = new Job();
|
||||||
|
job.setId(partitionTask.getId());
|
||||||
|
|
||||||
|
boolean triggerTask = true;
|
||||||
|
LocalDateTime nextTriggerAt = ResidentTaskCache.get(partitionTask.getId());
|
||||||
|
if (needCalculateNextTriggerTime(partitionTask, nextTriggerAt)) {
|
||||||
|
// 更新下次触发时间
|
||||||
|
nextTriggerAt = calculateNextTriggerTime(partitionTask);
|
||||||
|
} else {
|
||||||
|
triggerTask = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
job.setNextTriggerAt(nextTriggerAt);
|
||||||
|
Assert.isTrue(1 == jobMapper.updateById(job),
|
||||||
|
() -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId()));
|
||||||
|
|
||||||
|
if (triggerTask) {
|
||||||
|
// 执行预处理阶段
|
||||||
|
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||||
|
actorRef.tell(jobTaskPrepare, actorRef);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 需要重新计算触发时间的条件
|
||||||
|
* 1、不是常驻任务
|
||||||
|
* 2、常驻任务缓存的触发任务为空
|
||||||
|
* 3、常驻任务中的触发时间不是最新的
|
||||||
|
*/
|
||||||
|
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) {
|
||||||
|
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident())
|
||||||
|
|| Objects.isNull(nextTriggerAt) || partitionTask.getNextTriggerAt().isAfter(nextTriggerAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) {
|
||||||
|
// 更新下次触发时间
|
||||||
|
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType());
|
||||||
|
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
|
||||||
|
waitStrategyContext.setTriggerType(partitionTask.getTriggerType());
|
||||||
|
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
|
||||||
|
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
|
||||||
|
|
||||||
|
return waitStrategy.computeRetryTime(waitStrategyContext);
|
||||||
|
}
|
||||||
|
|
||||||
private List<JobPartitionTask> listAvailableJobs(Long startId, ScanTask scanTask) {
|
private List<JobPartitionTask> listAvailableJobs(Long startId, ScanTask scanTask) {
|
||||||
|
|
||||||
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, scanTask.getSize()),
|
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, scanTask.getSize()),
|
||||||
new LambdaQueryWrapper<Job>()
|
new LambdaQueryWrapper<Job>()
|
||||||
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
||||||
.in(Job::getBucketIndex, scanTask.getBuckets())
|
.in(Job::getBucketIndex, scanTask.getBuckets())
|
||||||
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(10))
|
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(10))
|
||||||
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
|
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
|
||||||
.ge(Job::getId, startId)
|
.ge(Job::getId, startId)
|
||||||
).getRecords();
|
).getRecords();
|
||||||
|
|
||||||
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
|
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,10 @@ package com.aizuda.easy.retry.server.job.task.support.handler;
|
|||||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
|
@ -28,11 +28,14 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
|
|||||||
@Override
|
@Override
|
||||||
public void stop(TaskStopJobContext context) {
|
public void stop(TaskStopJobContext context) {
|
||||||
|
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
LambdaQueryWrapper<JobTask> queryWrapper = new LambdaQueryWrapper<JobTask>()
|
||||||
new LambdaQueryWrapper<JobTask>()
|
.eq(JobTask::getTaskBatchId, context.getTaskBatchId());
|
||||||
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
|
|
||||||
.in(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_COMPLETE)
|
if (!context.isForceStop()) {
|
||||||
);
|
queryWrapper.in(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_COMPLETE);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<JobTask> jobTasks = jobTaskMapper.selectList(queryWrapper);
|
||||||
|
|
||||||
if (CollectionUtils.isEmpty(jobTasks)) {
|
if (CollectionUtils.isEmpty(jobTasks)) {
|
||||||
return;
|
return;
|
||||||
|
@ -28,7 +28,7 @@ public class ClusterTaskStopHandler extends AbstractJobTaskStopHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doStop(TaskStopJobContext context) {
|
public void doStop(TaskStopJobContext context) {
|
||||||
List<JobTask> jobTasks = context.getJobTasks();
|
List<JobTask> jobTasks = context.getJobTasks();
|
||||||
|
|
||||||
RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context);
|
RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context);
|
||||||
@ -40,5 +40,4 @@ public class ClusterTaskStopHandler extends AbstractJobTaskStopHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ public class RealStopTaskActor extends AbstractActor {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
StopJobDTO stopJobDTO = new StopJobDTO();
|
StopJobDTO stopJobDTO = new StopJobDTO();
|
||||||
stopJobDTO.setTaskId(realStopTaskInstanceDTO.getTaskBatchId());
|
stopJobDTO.setTaskBatchId(realStopTaskInstanceDTO.getTaskBatchId());
|
||||||
stopJobDTO.setJobId(realStopTaskInstanceDTO.getJobId());
|
stopJobDTO.setJobId(realStopTaskInstanceDTO.getJobId());
|
||||||
stopJobDTO.setGroupName(realStopTaskInstanceDTO.getGroupName());
|
stopJobDTO.setGroupName(realStopTaskInstanceDTO.getGroupName());
|
||||||
return rpcClient.stop(stopJobDTO);
|
return rpcClient.stop(stopJobDTO);
|
||||||
|
@ -35,11 +35,6 @@ public class TaskStopJobContext {
|
|||||||
*/
|
*/
|
||||||
private Integer taskType;
|
private Integer taskType;
|
||||||
|
|
||||||
/**
|
|
||||||
* 下次触发时间
|
|
||||||
*/
|
|
||||||
private LocalDateTime nextTriggerAt;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 是否需要变更任务状态
|
* 是否需要变更任务状态
|
||||||
*/
|
*/
|
||||||
@ -49,4 +44,13 @@ public class TaskStopJobContext {
|
|||||||
|
|
||||||
private JobOperationReasonEnum jobOperationReasonEnum;
|
private JobOperationReasonEnum jobOperationReasonEnum;
|
||||||
|
|
||||||
|
private boolean forceStop;
|
||||||
|
|
||||||
|
protected List<JobTask> getJobTasks() {
|
||||||
|
return jobTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setJobTasks(List<JobTask> jobTasks) {
|
||||||
|
this.jobTasks = jobTasks;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,8 +8,13 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
|||||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
@ -20,6 +25,7 @@ import io.netty.util.TimerTask;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
@ -37,8 +43,8 @@ public class JobTimerTask implements TimerTask {
|
|||||||
|
|
||||||
private JobTimerTaskDTO jobTimerTaskDTO;
|
private JobTimerTaskDTO jobTimerTaskDTO;
|
||||||
|
|
||||||
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS,
|
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<>());
|
new LinkedBlockingQueue<>());
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(final Timeout timeout) throws Exception {
|
public void run(final Timeout timeout) throws Exception {
|
||||||
@ -46,15 +52,15 @@ public class JobTimerTask implements TimerTask {
|
|||||||
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
|
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
|
||||||
|
|
||||||
executor.execute(() -> {
|
executor.execute(() -> {
|
||||||
|
Job job = null;
|
||||||
try {
|
try {
|
||||||
// 清除时间轮的缓存
|
// 清除时间轮的缓存
|
||||||
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
|
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
|
||||||
|
|
||||||
JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class);
|
JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class);
|
||||||
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
|
job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
|
||||||
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
||||||
.eq(Job::getId, jobTimerTaskDTO.getJobId())
|
.eq(Job::getId, jobTimerTaskDTO.getJobId())
|
||||||
);
|
);
|
||||||
|
|
||||||
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
|
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
|
||||||
@ -72,7 +78,7 @@ public class JobTimerTask implements TimerTask {
|
|||||||
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||||
jobTaskBatch.setOperationReason(operationReason);
|
jobTaskBatch.setOperationReason(operationReason);
|
||||||
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
|
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
|
||||||
() -> new EasyRetryServerException("更新任务失败"));
|
() -> new EasyRetryServerException("更新任务失败"));
|
||||||
|
|
||||||
// 如果任务已经关闭则不需要执行
|
// 如果任务已经关闭则不需要执行
|
||||||
if (Objects.isNull(job)) {
|
if (Objects.isNull(job)) {
|
||||||
@ -88,9 +94,42 @@ public class JobTimerTask implements TimerTask {
|
|||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("任务调度执行失败", e);
|
log.error("任务调度执行失败", e);
|
||||||
|
} finally {
|
||||||
|
// 处理常驻任务
|
||||||
|
doHandlerResidentTask(job);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doHandlerResidentTask(Job job) {
|
||||||
|
if (Objects.nonNull(job)) {
|
||||||
|
// 是否是常驻任务
|
||||||
|
if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) {
|
||||||
|
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
|
||||||
|
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
|
||||||
|
|
||||||
|
LocalDateTime preTriggerAt = ResidentTaskCache.get(jobTimerTaskDTO.getJobId());
|
||||||
|
if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) {
|
||||||
|
preTriggerAt = job.getNextTriggerAt();
|
||||||
|
}
|
||||||
|
|
||||||
|
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
|
||||||
|
waitStrategyContext.setTriggerType(job.getTriggerType());
|
||||||
|
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
|
||||||
|
waitStrategyContext.setNextTriggerAt(preTriggerAt);
|
||||||
|
LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext);
|
||||||
|
|
||||||
|
// 获取时间差的毫秒数
|
||||||
|
Duration duration = Duration.between(preTriggerAt, nextTriggerAt);
|
||||||
|
long milliseconds = duration.toMillis();
|
||||||
|
|
||||||
|
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000);
|
||||||
|
job.setNextTriggerAt(nextTriggerAt);
|
||||||
|
|
||||||
|
JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - System.currentTimeMillis() % 1000, TimeUnit.MILLISECONDS);
|
||||||
|
ResidentTaskCache.refresh(jobTimerTaskDTO.getJobId(), nextTriggerAt);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,59 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.timer;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-10-20 23:09:13
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class ResidentJobTimerTask implements TimerTask {
|
||||||
|
|
||||||
|
private JobTimerTaskDTO jobTimerTaskDTO;
|
||||||
|
private Job job;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(Timeout timeout) throws Exception {
|
||||||
|
try {
|
||||||
|
// 清除时间轮的缓存
|
||||||
|
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
|
||||||
|
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
|
||||||
|
// 执行预处理阶段
|
||||||
|
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||||
|
actorRef.tell(jobTaskPrepare, actorRef);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("任务调度执行失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -52,13 +52,12 @@ public class DispatchService implements Lifecycle {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// 当正在rebalance时延迟10s,尽量等待所有节点都完成rebalance
|
// 当正在rebalance时延迟10s,尽量等待所有节点都完成rebalance
|
||||||
if ( DistributeInstance.RE_BALANCE_ING.get()) {
|
if (DistributeInstance.RE_BALANCE_ING.get()) {
|
||||||
LogUtils.info(log, "正在rebalance中....");
|
LogUtils.info(log, "正在rebalance中....");
|
||||||
TimeUnit.SECONDS.sleep(INITIAL_DELAY);
|
TimeUnit.SECONDS.sleep(INITIAL_DELAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<Integer> currentConsumerBuckets = getConsumerBucket();
|
Set<Integer> currentConsumerBuckets = getConsumerBucket();
|
||||||
// LogUtils.info(log, "当前节点分配的桶:[{}]", currentConsumerBuckets);
|
|
||||||
if (!CollectionUtils.isEmpty(currentConsumerBuckets)) {
|
if (!CollectionUtils.isEmpty(currentConsumerBuckets)) {
|
||||||
ConsumerBucket scanTaskDTO = new ConsumerBucket();
|
ConsumerBucket scanTaskDTO = new ConsumerBucket();
|
||||||
scanTaskDTO.setBuckets(currentConsumerBuckets);
|
scanTaskDTO.setBuckets(currentConsumerBuckets);
|
||||||
|
@ -14,15 +14,11 @@
|
|||||||
</a-select>
|
</a-select>
|
||||||
</a-form-item>
|
</a-form-item>
|
||||||
</a-col>
|
</a-col>
|
||||||
<!-- <a-col :md="8" :sm="24">-->
|
<a-col :md="8" :sm="24">
|
||||||
<!-- <a-form-item label="场景名称">-->
|
<a-form-item label="任务名称">
|
||||||
<!-- <a-select v-model="queryParam.sceneName" placeholder="请选择场景名称" allowClear>-->
|
<a-input v-model="queryParam.jobName" placeholder="请输入任务名称" allowClear />
|
||||||
<!-- <a-select-option v-for="item in sceneList" :value="item.sceneName" :key="item.sceneName">-->
|
</a-form-item>
|
||||||
<!-- {{ item.sceneName }}</a-select-option-->
|
</a-col>
|
||||||
<!-- >-->
|
|
||||||
<!-- </a-select>-->
|
|
||||||
<!-- </a-form-item>-->
|
|
||||||
<!-- </a-col>-->
|
|
||||||
<a-col :md="8" :sm="24">
|
<a-col :md="8" :sm="24">
|
||||||
<a-form-item label="状态">
|
<a-form-item label="状态">
|
||||||
<a-select v-model="queryParam.jobStatus" placeholder="请选择状态" allowClear>
|
<a-select v-model="queryParam.jobStatus" placeholder="请选择状态" allowClear>
|
||||||
|
@ -311,7 +311,6 @@ import CronModal from '@/views/job/from/CronModal'
|
|||||||
|
|
||||||
const enums = require('@/utils/enum')
|
const enums = require('@/utils/enum')
|
||||||
|
|
||||||
let id = 0
|
|
||||||
export default {
|
export default {
|
||||||
name: 'JobFrom',
|
name: 'JobFrom',
|
||||||
components: { CronModal },
|
components: { CronModal },
|
||||||
@ -341,7 +340,8 @@ export default {
|
|||||||
executorType: enums.executorType,
|
executorType: enums.executorType,
|
||||||
routeKey: enums.routeKey,
|
routeKey: enums.routeKey,
|
||||||
loading: false,
|
loading: false,
|
||||||
visible: false
|
visible: false,
|
||||||
|
count: 0
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
beforeCreate () {
|
beforeCreate () {
|
||||||
@ -395,7 +395,7 @@ export default {
|
|||||||
// can use data-binding to get
|
// can use data-binding to get
|
||||||
const keys = dynamicForm.getFieldValue('keys')
|
const keys = dynamicForm.getFieldValue('keys')
|
||||||
console.log(keys)
|
console.log(keys)
|
||||||
const nextKeys = keys.concat(id++)
|
const nextKeys = keys.concat(this.count++)
|
||||||
// can use data-binding to set
|
// can use data-binding to set
|
||||||
// important! notify form to detect changes
|
// important! notify form to detect changes
|
||||||
dynamicForm.setFieldsValue({
|
dynamicForm.setFieldsValue({
|
||||||
@ -419,7 +419,7 @@ export default {
|
|||||||
const restoredArray = keyValuePairs.map(pair => {
|
const restoredArray = keyValuePairs.map(pair => {
|
||||||
const [index, value] = pair.split('=')
|
const [index, value] = pair.split('=')
|
||||||
console.log(value)
|
console.log(value)
|
||||||
id++
|
this.count++
|
||||||
return Number.parseInt(index)
|
return Number.parseInt(index)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user