From 18b952234cb0567840cca4e26b024d9a75e05255 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Wed, 11 Oct 2023 18:46:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.4.0=201.=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E6=B5=81=E7=A8=8B=E8=AE=BE?= =?UTF-8?q?=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/enums/JobOperationReasonEnum.java | 1 + .../common/core/enums/JobTaskStatusEnum.java | 11 +-- .../server/common/client/RequestBuilder.java | 30 ++++++- .../common/client/RpcClientInvokeHandler.java | 47 +++------- .../common/client/annotation/Mapping.java | 16 +--- .../retry/server/common/dto/ScanTask.java | 4 + .../common/util/PartitionTaskUtils.java | 46 ++++++++++ .../server/job/task/JobTaskConverter.java | 8 ++ .../server/job/task/client/JobRpcClient.java | 25 ++++++ .../task/dispatch/JobExecutorResultActor.java | 29 +++++++ .../job/task/dispatch/ScanJobTaskActor.java | 31 ++----- .../server/job/task/dto/JobTimerTaskDTO.java | 2 + .../batch/JobTaskBatchGenerator.java | 3 +- .../executor/RealJobExecutorActor.java | 85 +++++++++---------- .../handler/helper/JobTaskBatchHelper.java | 6 +- .../prepare/RunningJobPrepareHandler.java | 22 +++-- .../prepare/WaitJobPrepareHandler.java | 2 + .../stop/AbstractJobTaskStopHandler.java | 2 +- .../task/handler/stop/RealStopTaskActor.java | 9 +- .../job/task/handler/timer/JobTimerTask.java | 55 ++++++++---- .../handler/timer/JobTimerWheelHandler.java | 2 +- .../job/task/strategy/BlockStrategies.java | 13 +-- .../retry/task/client/RetryRpcClient.java} | 19 ++--- .../actor/exec/ExecCallbackUnitActor.java | 6 +- .../dispatch/actor/exec/ExecUnitActor.java | 6 +- .../server/dispatch/ConsumerBucketActor.java | 2 + 26 files changed, 302 insertions(+), 180 deletions(-) create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/client/JobRpcClient.java rename easy-retry-server/{easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClient.java => easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java} (74%) diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java index a889a2d6..e854debf 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java @@ -18,6 +18,7 @@ public enum JobOperationReasonEnum { NONE(0, StrUtil.EMPTY), EXECUTE_TIMEOUT(1, "执行超时"), NOT_CLIENT(2, "无客户端节点"), + JOB_CLOSED(3, "任务已关闭"), ; private final int reason; diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobTaskStatusEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobTaskStatusEnum.java index e81b0f2b..bc7914f2 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobTaskStatusEnum.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobTaskStatusEnum.java @@ -4,21 +4,18 @@ import lombok.AllArgsConstructor; import lombok.Getter; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** * @author: www.byteblogs.com * @date : 2023-09-26 14:26 + * @since : 2.4.0 */ @AllArgsConstructor @Getter public enum JobTaskStatusEnum { - /** - * 待处理 - */ - WAITING(1), - /** * 处理中 */ @@ -35,7 +32,7 @@ public enum JobTaskStatusEnum { FAIL(4), /** - * 任务停止成功 + * 任务停止 */ STOP(5), @@ -44,7 +41,7 @@ public enum JobTaskStatusEnum { private final int status; - public static final List NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status); + public static final List NOT_COMPLETE = Collections.singletonList(RUNNING.status); public static final List COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status); } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java index 55f71c21..5a8b9bba 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.client; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.github.rholder.retry.RetryListener; import java.lang.reflect.Proxy; import java.util.Objects; @@ -21,6 +22,11 @@ public class RequestBuilder { private String hostIp; private Integer hostPort; private String contextPath; + private boolean failRetry; + private int retryTimes = 3; + private int retryInterval = 1; + private RetryListener retryListener = new SimpleRetryListener(); + public static RequestBuilder newBuilder() { return new RequestBuilder<>(); @@ -56,6 +62,27 @@ public class RequestBuilder { return this; } + public RequestBuilder failRetry(boolean failRetry) { + this.failRetry = failRetry; + return this; + } + + public RequestBuilder retryTimes(int retryTimes) { + this.retryTimes = retryTimes; + return this; + } + + public RequestBuilder retryInterval(int retryInterval) { + this.retryInterval = retryInterval; + return this; + } + + public RequestBuilder retryListener(RetryListener retryListener) { + this.retryListener = retryListener; + return this; + } + + public T build() { if (Objects.isNull(clintInterface)) { throw new EasyRetryServerException("clintInterface cannot be null"); @@ -73,7 +100,8 @@ public class RequestBuilder { throw new EasyRetryServerException("class not found exception to: [{}]", clintInterface.getName()); } - RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler(groupName, hostId, hostIp, hostPort, contextPath); + RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler( + groupName, hostId, hostIp, hostPort, contextPath, failRetry, retryTimes, retryInterval, retryListener); return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(), new Class[]{clintInterface}, clientInvokeHandler); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java index 346d1a16..85d4383f 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java @@ -4,7 +4,6 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.URLUtil; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.context.SpringContext; -import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.HostUtils; import com.aizuda.easy.retry.common.core.util.JsonUtil; @@ -16,14 +15,13 @@ import com.aizuda.easy.retry.server.common.client.annotation.Param; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; -import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; -import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryListener; import com.github.rholder.retry.Retryer; import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; @@ -32,7 +30,6 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.util.CollectionUtils; -import org.springframework.util.ReflectionUtils; import org.springframework.web.client.ResourceAccessException; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; @@ -45,7 +42,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** @@ -56,28 +52,20 @@ import java.util.concurrent.TimeUnit; * @since 2.0.0 */ @Slf4j +@AllArgsConstructor public class RpcClientInvokeHandler implements InvocationHandler { public static final String URL = "http://{0}:{1}/{2}"; - private final String groupName; + private String groupName; private String hostId; private String hostIp; private Integer hostPort; private String contextPath; - - public RpcClientInvokeHandler( - final String groupName, - final String hostId, - final String hostIp, - final Integer hostPort, - final String contextPath) { - this.groupName = groupName; - this.hostId = hostId; - this.hostIp = hostIp; - this.hostPort = hostPort; - this.contextPath = contextPath; - } + private boolean failRetry; + private int retryTimes; + private int retryInterval; + private RetryListener retryListener; @Override public Result invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { @@ -124,7 +112,7 @@ public class RpcClientInvokeHandler implements InvocationHandler { RestTemplate restTemplate = SpringContext.CONTEXT.getBean(RestTemplate.class); - Retryer retryer = buildResultRetryer(mapping); + Retryer retryer = buildResultRetryer(); Result result = retryer.call(() -> { ResponseEntity response = restTemplate.exchange( @@ -188,21 +176,12 @@ public class RpcClientInvokeHandler implements InvocationHandler { return null; } - private Retryer buildResultRetryer(Mapping mapping) throws InstantiationException, IllegalAccessException, NoSuchMethodException { - Class retryListenerClazz = mapping.retryListener(); - RetryListener retryListener = retryListenerClazz.newInstance(); - Method method = retryListenerClazz.getMethod("onRetry", Attempt.class); - + private Retryer buildResultRetryer() { Retryer retryer = RetryerBuilder.newBuilder() - .retryIfException(throwable -> mapping.failRetry()) - .withStopStrategy(StopStrategies.stopAfterAttempt(mapping.retryTimes())) - .withWaitStrategy(WaitStrategies.fixedWait(mapping.retryInterval(), TimeUnit.SECONDS)) - .withRetryListener(new RetryListener() { - @Override - public void onRetry(Attempt attempt) { - ReflectionUtils.invokeMethod(method, retryListener, attempt); - } - }) + .retryIfException(throwable -> failRetry) + .withStopStrategy(StopStrategies.stopAfterAttempt(retryTimes)) + .withWaitStrategy(WaitStrategies.fixedWait(retryInterval, TimeUnit.SECONDS)) + .withRetryListener(retryListener) .build(); return retryer; } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java index 8fd292ec..ed18eb19 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java @@ -1,8 +1,6 @@ package com.aizuda.easy.retry.server.common.client.annotation; import com.aizuda.easy.retry.server.common.client.RequestMethod; -import com.aizuda.easy.retry.server.common.client.SimpleRetryListener; -import com.github.rholder.retry.RetryListener; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -26,16 +24,10 @@ public @interface Mapping { String path() default ""; + /** + * 是否支持失败转移 + * @return false or trur + */ boolean failover() default false; - boolean failRetry() default false; - - int retryTimes() default 3; - - int retryInterval() default 1; - - Class retryListener() default SimpleRetryListener.class; - - - } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java index cf5fadda..d9b9305f 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java @@ -18,4 +18,8 @@ public class ScanTask { private String groupName; private Set buckets; + + private long size; + + private long startId; } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java new file mode 100644 index 00000000..3b5d3ab1 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java @@ -0,0 +1,46 @@ +package com.aizuda.easy.retry.server.common.util; + +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import org.springframework.util.CollectionUtils; + +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * @author: www.byteblogs.com + * @date : 2023-10-11 10:58 + * @since : 2.4.0 + */ +public class PartitionTaskUtils { + + private PartitionTaskUtils() { + } + + public static long process( + Function> dataSource, Consumer> task, + long startId) { + int total = 0; + do { + List products = dataSource.apply(startId); + if (CollectionUtils.isEmpty(products)) { + // 没有查询到数据直接退出 + break; + } + + total += products.size(); + + task.accept(products); + startId = maxId(products); + } while (startId > 0); + + return total; + } + + private static long maxId(List products) { + Optional max = products.stream().map(PartitionTask::getId).max(Long::compareTo); + return max.orElse(-1L) + 1; + } + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/JobTaskConverter.java index 30b48b55..4ffeca31 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/JobTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/JobTaskConverter.java @@ -33,6 +33,11 @@ public interface JobTaskConverter { ) JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTask job); + @Mappings( + @Mapping(source = "id", target = "jobId") + ) + JobTaskPrepareDTO toJobTaskPrepare(Job job); + JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO); JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategies.BlockStrategyContext context); @@ -70,6 +75,9 @@ public interface JobTaskConverter { JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context); + @Mappings( + @Mapping(source = "id", target = "taskId") + ) JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask); RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/client/JobRpcClient.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/client/JobRpcClient.java new file mode 100644 index 00000000..48ce08f2 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/client/JobRpcClient.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.job.task.client; + +import com.aizuda.easy.retry.client.model.StopJobDTO; +import com.aizuda.easy.retry.client.model.request.DispatchJobRequest; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.common.client.RequestMethod; +import com.aizuda.easy.retry.server.common.client.annotation.Body; +import com.aizuda.easy.retry.server.common.client.annotation.Mapping; + +/** + * 调用客户端接口 + * + * @author: www.byteblogs.com + * @date : 2023-06-19 15:40 + * @since 2.0.0 + */ +public interface JobRpcClient { + + @Mapping(path = "/job/stop/v1", method = RequestMethod.POST) + Result stop(@Body StopJobDTO stopJobDTO); + + @Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST) + Result dispatch(@Body DispatchJobRequest dispatchJobRequest); + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java index 807bf983..dfb07dcc 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java @@ -3,17 +3,29 @@ package com.aizuda.easy.retry.server.job.task.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.cache.CacheGroupScanActor; +import com.aizuda.easy.retry.server.common.dto.ScanTask; +import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.WaitStrategy; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.handler.helper.JobTaskBatchHelper; +import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -23,6 +35,7 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; +import java.time.LocalDateTime; import java.util.Objects; /** @@ -41,6 +54,8 @@ public class JobExecutorResultActor extends AbstractActor { private TransactionTemplate transactionTemplate; @Autowired private JobTaskBatchHelper jobTaskBatchHelper; + @Autowired + private JobTaskBatchMapper jobTaskBatchMapper; @Override public Receive createReceive() { @@ -65,6 +80,20 @@ public class JobExecutorResultActor extends AbstractActor { } }); + // TODO 60秒内的任务直接丢入时间轮中 + if (Integer.parseInt("30") <= 60) { + if (jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() + .eq(JobTaskBatch::getId, result.getTaskBatchId()) + .in(JobTaskBatch::getTaskStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)) <= 0) { + ActorRef scanActorRef = CacheGroupScanActor.get("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); + ScanTask scanTask = new ScanTask(); + scanTask.setBuckets(Sets.newHashSet(0)); + scanTask.setSize(1); + scanTask.setStartId(result.getJobId()); + scanActorRef.tell(scanTask, scanActorRef); + } + } + JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result); jobLogDTO.setMessage(result.getMessage()); jobLogDTO.setClientId(result.getClientId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java index a2ee0cd2..0cc7374e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java @@ -10,6 +10,7 @@ import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.job.task.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.WaitStrategy; import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask; @@ -70,7 +71,7 @@ public class ScanJobTaskActor extends AbstractActor { private void doScan(final ScanTask scanTask) { log.info("job scan start"); - long total = process(startId -> listAvailableJobs(startId, scanTask), partitionTasks -> { + long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), partitionTasks -> { for (final JobPartitionTask partitionTask : (List) partitionTasks) { CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask); @@ -92,18 +93,18 @@ public class ScanJobTaskActor extends AbstractActor { ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); actorRef.tell(jobTaskPrepare, actorRef); } - }, 0); + }, scanTask.getStartId()); log.info("job scan end. total:[{}]", total); } private List listAvailableJobs(Long startId, ScanTask scanTask) { - List jobs = jobMapper.selectPage(new PageDTO(0, 1000), + List jobs = jobMapper.selectPage(new PageDTO(0, scanTask.getSize()), new LambdaQueryWrapper() .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) .in(Job::getBucketIndex, scanTask.getBuckets()) - .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(6)) + .le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(60)) .eq(Job::getDeleted, StatusEnum.NO.getStatus()) .gt(Job::getId, startId) ).getRecords(); @@ -111,27 +112,5 @@ public class ScanJobTaskActor extends AbstractActor { return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs); } - public long process( - Function> dataSource, Consumer> task, long startId) { - int total = 0; - do { - List products = dataSource.apply(startId); - if (CollectionUtils.isEmpty(products)) { - // 没有查询到数据直接退出 - break; - } - total += products.size(); - - task.accept(products); - startId = maxId(products); - } while (startId > 0); - - return total; - } - - private static long maxId(List products) { - Optional max = products.stream().map(PartitionTask::getId).max(Long::compareTo); - return max.orElse(-1L) + 1; - } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java index 55f7f1f8..e447d6db 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java @@ -11,4 +11,6 @@ import lombok.Data; public class JobTimerTaskDTO { private Long taskBatchId; + private String groupName; + private Long jobId; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java index 0427e5df..50aed393 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java @@ -56,7 +56,8 @@ public class JobTaskBatchGenerator { - System.currentTimeMillis(); JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId()); - + jobTimerTaskDTO.setGroupName(context.getGroupName()); + jobTimerTaskDTO.setJobId(context.getJobId()); JobTimerWheelHandler.register(context.getGroupName(), jobTaskBatch.getId(), new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/executor/RealJobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/executor/RealJobExecutorActor.java index 86be9b81..43557c55 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/executor/RealJobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/executor/RealJobExecutorActor.java @@ -10,9 +10,9 @@ import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.client.RequestBuilder; -import com.aizuda.easy.retry.server.common.client.RpcClient; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.job.task.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.client.JobRpcClient; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; @@ -21,10 +21,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryListener; -import com.github.rholder.retry.Retryer; -import com.github.rholder.retry.RetryerBuilder; -import com.github.rholder.retry.StopStrategies; -import com.github.rholder.retry.WaitStrategies; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -32,7 +28,6 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** * @author www.byteblogs.com @@ -60,7 +55,8 @@ public class RealJobExecutorActor extends AbstractActor { private void doExecute(RealJobExecutorDTO realJobExecutorDTO) { // 检查客户端是否存在 - RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(realJobExecutorDTO.getGroupName(), realJobExecutorDTO.getClientId()); + RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(realJobExecutorDTO.getGroupName(), + realJobExecutorDTO.getClientId()); if (Objects.isNull(registerNodeInfo)) { taskExecuteFailure(realJobExecutorDTO, "无可执行的客户端"); return; @@ -69,14 +65,12 @@ public class RealJobExecutorActor extends AbstractActor { JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); DispatchJobRequest dispatchJobRequest = JobTaskConverter.INSTANCE.toDispatchJobRequest(realJobExecutorDTO); - // 构建重试组件 - Retryer> retryer = buildResultRetryer(realJobExecutorDTO); - try { // 构建请求客户端对象 - RpcClient rpcClient = buildRpcClient(registerNodeInfo); - Result dispatch = retryer.call(() -> rpcClient.dispatch(dispatchJobRequest)); - if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) { + JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO); + Result dispatch = rpcClient.dispatch(dispatchJobRequest); + if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), + Boolean.TRUE)) { jobLogDTO.setMessage("任务调度成功"); } else { jobLogDTO.setMessage(dispatch.getMessage()); @@ -96,43 +90,48 @@ public class RealJobExecutorActor extends AbstractActor { } - private Retryer> buildResultRetryer(RealJobExecutorDTO realJobExecutorDTO) { - Retryer> retryer = RetryerBuilder.>newBuilder() - .retryIfException(throwable -> true) - .withStopStrategy(StopStrategies.stopAfterAttempt(realJobExecutorDTO.getMaxRetryTimes())) - .withWaitStrategy(WaitStrategies.fixedWait(realJobExecutorDTO.getRetryInterval(), TimeUnit.SECONDS)) - .withRetryListener(new RetryListener() { - @Override - public void onRetry(Attempt attempt) { - if (attempt.hasException()) { - LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]", - realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause()); - JobTask jobTask = new JobTask(); - jobTask.setRetryCount((int) attempt.getAttemptNumber()); - jobTaskMapper.updateById(jobTask); - } - } - }) - .build(); - return retryer; + public static class JobExecutorRetryListener implements RetryListener { + + private RealJobExecutorDTO realJobExecutorDTO; + private JobTaskMapper jobTaskMapper; + + public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO, + final JobTaskMapper jobTaskMapper) { + this.realJobExecutorDTO = realJobExecutorDTO; + this.jobTaskMapper = jobTaskMapper; + } + + @Override + public void onRetry(final Attempt attempt) { + if (attempt.hasException()) { + LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]", + realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause()); + JobTask jobTask = new JobTask(); + jobTask.setRetryCount((int) attempt.getAttemptNumber()); + jobTaskMapper.updateById(jobTask); + } + } } - private RpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) { - RpcClient rpcClient = RequestBuilder.newBuilder() - .hostPort(registerNodeInfo.getHostPort()) - .groupName(registerNodeInfo.getGroupName()) - .hostId(registerNodeInfo.getHostId()) - .hostIp(registerNodeInfo.getHostIp()) - .contextPath(registerNodeInfo.getContextPath()) - .client(RpcClient.class) - .build(); - return rpcClient; + private JobRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealJobExecutorDTO realJobExecutorDTO) { + return RequestBuilder.newBuilder() + .hostPort(registerNodeInfo.getHostPort()) + .groupName(registerNodeInfo.getGroupName()) + .hostId(registerNodeInfo.getHostId()) + .hostIp(registerNodeInfo.getHostIp()) + .contextPath(registerNodeInfo.getContextPath()) + .failRetry(Boolean.TRUE) + .retryTimes(realJobExecutorDTO.getMaxRetryTimes()) + .retryInterval(realJobExecutorDTO.getRetryInterval()) + .retryListener(new JobExecutorRetryListener(realJobExecutorDTO, jobTaskMapper)) + .client(JobRpcClient.class) + .build(); } private static void taskExecuteFailure(RealJobExecutorDTO realJobExecutorDTO, String message) { ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); JobExecutorResultDTO jobExecutorResultDTO = new JobExecutorResultDTO(); - jobExecutorResultDTO.setTaskId(realJobExecutorDTO.getTaskBatchId()); + jobExecutorResultDTO.setTaskId(realJobExecutorDTO.getTaskId()); jobExecutorResultDTO.setJobId(realJobExecutorDTO.getJobId()); jobExecutorResultDTO.setTaskBatchId(realJobExecutorDTO.getTaskBatchId()); jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/helper/JobTaskBatchHelper.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/helper/JobTaskBatchHelper.java index 88605a0c..d297f9cc 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/helper/JobTaskBatchHelper.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/helper/JobTaskBatchHelper.java @@ -24,14 +24,14 @@ public class JobTaskBatchHelper { @Autowired private JobTaskBatchMapper jobTaskBatchMapper; - public void complete(Long taskBatchId) { + public boolean complete(Long taskBatchId) { List jobTasks = jobTaskMapper.selectList( new LambdaQueryWrapper().select(JobTask::getExecuteStatus) .eq(JobTask::getTaskBatchId, taskBatchId)); if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getExecuteStatus()))) { - return; + return false; } long failCount = jobTasks.stream().filter(jobTask -> jobTask.getExecuteStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count(); @@ -49,5 +49,7 @@ public class JobTaskBatchHelper { jobTaskBatchMapper.updateById(jobTaskBatch); + return true; + } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/RunningJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/RunningJobPrepareHandler.java index e1280fd4..703a77ac 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/RunningJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/RunningJobPrepareHandler.java @@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.job.task.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.handler.helper.JobTaskBatchHelper; import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies; +import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; @@ -41,16 +42,19 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler { log.info("存在运行中的任务. taskBatchId:[{}]", prepare.getTaskBatchId()); // 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理 - jobTaskBatchHelper.complete(prepare.getTaskBatchId()); - - // 计算超时时间 - long delay = System.currentTimeMillis() - prepare.getExecutionAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - int blockStrategy = prepare.getBlockStrategy(); - // 计算超时时间,到达超时时间覆盖任务 - if (delay > prepare.getExecutorTimeout() * 1000) { - log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, prepare.getExecutorTimeout() * 1000); - blockStrategy = BlockStrategies.BlockStrategyEnum.OVERLAY.getBlockStrategy(); + if (jobTaskBatchHelper.complete(prepare.getTaskBatchId())) { + blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); + } else { + // 计算超时时间 + long delay = System.currentTimeMillis() - prepare.getExecutionAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + + // 计算超时时间,到达超时时间覆盖任务 + if (delay > prepare.getExecutorTimeout() * 1000) { + log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, prepare.getExecutorTimeout() * 1000); + blockStrategy = BlockStrategies.BlockStrategyEnum.OVERLAY.getBlockStrategy(); + } + } BlockStrategies.BlockStrategyContext blockStrategyContext = JobTaskConverter.INSTANCE.toBlockStrategyContext(prepare); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java index 27b4fd93..48dd4e17 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java @@ -39,6 +39,8 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { - System.currentTimeMillis(); JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId()); + jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId()); + jobTimerTaskDTO.setGroupName(jobPrepareDTO.getGroupName()); JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId(), new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/AbstractJobTaskStopHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/AbstractJobTaskStopHandler.java index 5861d42e..f6da700a 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/AbstractJobTaskStopHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/AbstractJobTaskStopHandler.java @@ -30,7 +30,7 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler, List jobTasks = jobTaskMapper.selectList( new LambdaQueryWrapper() .eq(JobTask::getTaskBatchId, context.getTaskBatchId()) - .eq(JobTask::getExecuteStatus, JobTaskStatusEnum.NOT_COMPLETE) + .in(JobTask::getExecuteStatus, JobTaskStatusEnum.NOT_COMPLETE) ); if (CollectionUtils.isEmpty(jobTasks)) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/RealStopTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/RealStopTaskActor.java index 8e481801..728a86ed 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/RealStopTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/stop/RealStopTaskActor.java @@ -6,8 +6,8 @@ import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.client.RequestBuilder; -import com.aizuda.easy.retry.server.common.client.RpcClient; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.job.task.client.JobRpcClient; import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -48,13 +48,16 @@ public class RealStopTaskActor extends AbstractActor { } private Result requestClient(RealStopTaskInstanceDTO realStopTaskInstanceDTO, RegisterNodeInfo registerNodeInfo) { - RpcClient rpcClient = RequestBuilder.newBuilder() + JobRpcClient rpcClient = RequestBuilder.newBuilder() .hostPort(registerNodeInfo.getHostPort()) .groupName(realStopTaskInstanceDTO.getGroupName()) .hostId(registerNodeInfo.getHostId()) .hostIp(registerNodeInfo.getHostIp()) .contextPath(registerNodeInfo.getContextPath()) - .client(RpcClient.class) + .failRetry(Boolean.TRUE) + .retryTimes(3) + .retryInterval(1) + .client(JobRpcClient.class) .build(); StopJobDTO stopJobDTO = new StopJobDTO(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerTask.java index 01027034..8581273d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerTask.java @@ -3,12 +3,21 @@ package com.aizuda.easy.retry.server.job.task.handler.timer; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.WaitStrategy; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.util.Timeout; @@ -42,36 +51,48 @@ public class JobTimerTask implements TimerTask { log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); executor.execute(() -> { - Long jobId = 0L; - String groupName = ""; + try { - JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBeanByType(JobTaskBatchMapper.class); - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper() - .select(JobTaskBatch::getJobId, JobTaskBatch::getGroupName, JobTaskBatch::getId) - .eq(JobTaskBatch::getId, jobTimerTaskDTO.getTaskBatchId()) - .eq(JobTaskBatch::getTaskStatus, JobTaskBatchStatusEnum.WAITING.getStatus())); - if (Objects.isNull(jobTaskBatch)) { - return; + // 清除时间轮的缓存 + JobTimerWheelHandler.clearCache(jobTimerTaskDTO.getGroupName(), jobTimerTaskDTO.getTaskBatchId()); + + JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class); + Job job = jobMapper.selectOne(new LambdaQueryWrapper() + .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) + .eq(Job::getId, jobTimerTaskDTO.getJobId()) + ); + + int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); + int operationReason = JobOperationReasonEnum.NONE.getReason(); + if (Objects.isNull(job)) { + log.warn("任务已经关闭不允许执行. jobId:[{}]", jobTimerTaskDTO.getJobId()); + taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); + operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason(); } - jobId = jobTaskBatch.getJobId(); - groupName = jobTaskBatch.getGroupName(); + JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBeanByType(JobTaskBatchMapper.class); + JobTaskBatch jobTaskBatch = new JobTaskBatch(); + jobTaskBatch.setId(jobTimerTaskDTO.getTaskBatchId()); jobTaskBatch.setExecutionAt(LocalDateTime.now()); - jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.RUNNING.getStatus()); + jobTaskBatch.setTaskStatus(taskStatus); + jobTaskBatch.setOperationReason(operationReason); Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch), () -> new EasyRetryServerException("更新任务失败")); + // 如果任务已经关闭则不需要执行 + if (Objects.isNull(job)) { + return; + } + TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); - taskExecuteDTO.setGroupName(groupName); - taskExecuteDTO.setJobId(jobId); + taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName()); + taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId()); ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); actorRef.tell(taskExecuteDTO, actorRef); + } catch (Exception e) { log.error("任务调度执行失败", e); - } finally { - // 清除时间轮的缓存 - JobTimerWheelHandler.clearCache(groupName, jobId); } }); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java index ca8e1112..75acedbc 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java @@ -34,7 +34,7 @@ public class JobTimerWheelHandler implements Lifecycle { // tickDuration 和 timeUnit 一格的时间长度 // ticksPerWheel 一圈有多少格 timer = new HashedWheelTimer( - new CustomizableThreadFactory("job-task-timer-wheel-"), 100, + new CustomizableThreadFactory("job-task-timer-wheel-"), 1000, TimeUnit.MILLISECONDS, 1024); timer.start(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java index 32653176..3e647293 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/strategy/BlockStrategies.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.job.task.generator.batch.JobTaskBatchGenerat import com.aizuda.easy.retry.server.job.task.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.handler.stop.JobTaskStopHandler; import com.aizuda.easy.retry.server.job.task.handler.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.handler.stop.TaskStopJobContext; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; @@ -49,7 +50,7 @@ public class BlockStrategies { private Long jobId; - private Long taskId; + private Long taskBatchId; private String groupName; @@ -69,7 +70,7 @@ public class BlockStrategies { @Override public void block(final BlockStrategyContext context) { - log.warn("阻塞策略为丢弃此次执行. jobId:[{}]", context.getJobId()); + log.warn("阻塞策略为丢弃此次执行. taskBatchId:[{}]", context.getTaskBatchId()); } } @@ -77,11 +78,13 @@ public class BlockStrategies { @Override public void block(final BlockStrategyContext context) { - log.warn("阻塞策略为覆盖. jobId:[{}]", context.getJobId()); + log.warn("阻塞策略为覆盖. taskBatchId:[{}]", context.getTaskBatchId()); // 停止任务 JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.taskType); - instanceInterrupt.stop(JobTaskConverter.INSTANCE.toStopJobContext(context)); + TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context); + stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); + instanceInterrupt.stop(stopJobContext); // 重新生成任务 JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class); @@ -94,7 +97,7 @@ public class BlockStrategies { @Override public void block(final BlockStrategyContext context) { - log.warn("阻塞策略为并行执行. jobId:[{}]", context.getJobId()); + log.warn("阻塞策略为并行执行. taskBatchId:[{}]", context.getTaskBatchId()); // 重新生成任务 JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClient.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java similarity index 74% rename from easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClient.java rename to easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java index 635da1b1..74d7f7df 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClient.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java @@ -1,12 +1,13 @@ -package com.aizuda.easy.retry.server.common.client; +package com.aizuda.easy.retry.server.retry.task.client; -import com.aizuda.easy.retry.client.model.request.DispatchJobRequest; import com.aizuda.easy.retry.client.model.DispatchRetryDTO; import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; -import com.aizuda.easy.retry.client.model.StopJobDTO; import com.aizuda.easy.retry.client.model.RetryCallbackDTO; +import com.aizuda.easy.retry.client.model.StopJobDTO; +import com.aizuda.easy.retry.client.model.request.DispatchJobRequest; import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders; import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.common.client.RequestMethod; import com.aizuda.easy.retry.server.common.client.annotation.Body; import com.aizuda.easy.retry.server.common.client.annotation.Header; import com.aizuda.easy.retry.server.common.client.annotation.Mapping; @@ -18,18 +19,12 @@ import com.aizuda.easy.retry.server.common.client.annotation.Mapping; * @date : 2023-06-19 15:40 * @since 2.0.0 */ -public interface RpcClient { +public interface RetryRpcClient { - @Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST) + @Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST, failover = true) Result dispatch(@Body DispatchRetryDTO dispatchRetryDTO, @Header EasyRetryHeaders headers); - @Mapping(path = "/retry/callback/v1", method = RequestMethod.POST) + @Mapping(path = "/retry/callback/v1", method = RequestMethod.POST, failover = true) Result callback(@Body RetryCallbackDTO retryCallbackDTO); - @Mapping(path = "/job/stop/v1", method = RequestMethod.POST) - Result stop(@Body StopJobDTO stopJobDTO); - - @Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST) - Result dispatch(@Body DispatchJobRequest dispatchJobRequest); - } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java index d3ddaf3f..3ab40304 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -11,9 +11,9 @@ import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.client.RequestBuilder; -import com.aizuda.easy.retry.server.common.client.RpcClient; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient; import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; @@ -135,13 +135,13 @@ public class ExecCallbackUnitActor extends AbstractActor { retryCallbackDTO.setExecutorName(callbackTask.getExecutorName()); retryCallbackDTO.setUniqueId(callbackTask.getUniqueId()); - RpcClient rpcClient = RequestBuilder.newBuilder() + RetryRpcClient rpcClient = RequestBuilder.newBuilder() .hostPort(serverNode.getHostPort()) .groupName(serverNode.getGroupName()) .hostId(serverNode.getHostId()) .hostIp(serverNode.getHostIp()) .contextPath(serverNode.getContextPath()) - .client(RpcClient.class) + .client(RetryRpcClient.class) .build(); return rpcClient.callback(retryCallbackDTO); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java index 894d5241..c5c7b71a 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java @@ -14,8 +14,8 @@ import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.client.RequestBuilder; -import com.aizuda.easy.retry.server.common.client.RpcClient; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient; import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; @@ -145,13 +145,13 @@ public class ExecUnitActor extends AbstractActor { easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId()); requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders)); - RpcClient rpcClient = RequestBuilder.newBuilder() + RetryRpcClient rpcClient = RequestBuilder.newBuilder() .hostPort(serverNode.getHostPort()) .groupName(serverNode.getGroupName()) .hostId(serverNode.getHostId()) .hostIp(serverNode.getHostIp()) .contextPath(serverNode.getContextPath()) - .client(RpcClient.class) + .client(RetryRpcClient.class) .build(); return rpcClient.dispatch(dispatchRetryDTO, easyRetryHeaders); diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java index 011d7943..7c8999fa 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java @@ -88,6 +88,8 @@ public class ConsumerBucketActor extends AbstractActor { // 扫描回调数据 ScanTask scanTask = new ScanTask(); scanTask.setBuckets(consumerBucket.getBuckets()); + scanTask.setSize(1000); + scanTask.setStartId(0); ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); scanJobActorRef.tell(scanTask, scanJobActorRef); }