From 943532412cc49bb21868d83359b5715ee26ba446 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Tue, 10 Oct 2023 23:00:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E4=BF=AE=E5=A4=8D=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E6=8F=90=E4=BA=A4=E6=97=B6taskBatchId?= =?UTF-8?q?=E5=80=BC=E9=94=99=E8=AF=AF=E5=AF=BC=E8=87=B4=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=BC=82=E5=B8=B8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/JobExecutorFutureCallback.java | 33 ++++++++++--------- .../task/dispatch/JobExecutorResultActor.java | 1 + .../job/task/dispatch/ScanJobTaskActor.java | 2 +- .../server/job/task/dto/JobPartitionTask.java | 5 +++ .../batch/JobTaskBatchGenerator.java | 2 +- .../prepare/WaitJobPrepareHandler.java | 4 +-- .../handler/timer/JobTimerWheelHandler.java | 12 +++---- 7 files changed, 33 insertions(+), 26 deletions(-) diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java index 089830b6..2fd83e85 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java @@ -22,8 +22,8 @@ import lombok.extern.slf4j.Slf4j; public class JobExecutorFutureCallback implements FutureCallback { private static final JobNettyClient CLIENT = RequestBuilder.newBuilder() - .client(JobNettyClient.class) - .callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build(); + .client(JobNettyClient.class) + .callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build(); private JobContext jobContext; @@ -35,34 +35,35 @@ public class JobExecutorFutureCallback implements FutureCallback public void onSuccess(final ExecuteResult result) { // 上报执行成功 log.info("任务执行成功 [{}]", JsonUtil.toJsonString(result)); - DispatchJobResultRequest dispatchJobRequest = new DispatchJobResultRequest(); + + int taskStatus; if (result.getStatus() == StatusEnum.NO.getStatus()) { - dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); + taskStatus = JobTaskStatusEnum.FAIL.getStatus(); } else { - dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus()); + taskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); } - dispatchJobRequest.setTaskBatchId(jobContext.getTaskId()); - dispatchJobRequest.setGroupName(jobContext.getGroupName()); - dispatchJobRequest.setJobId(jobContext.getJobId()); - dispatchJobRequest.setTaskId(jobContext.getTaskId()); - dispatchJobRequest.setTaskType(jobContext.getTaskType()); - dispatchJobRequest.setExecuteResult(result); - CLIENT.dispatchResult(dispatchJobRequest); + CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus)); } @Override public void onFailure(final Throwable t) { // 上报执行失败 log.error("任务执行失败 jobTask:[{}]", jobContext.getTaskId(), t); + CLIENT.dispatchResult( + buildDispatchJobResultRequest(ExecuteResult.failure(t.getMessage()), JobTaskStatusEnum.FAIL.getStatus()) + ); + } + + private DispatchJobResultRequest buildDispatchJobResultRequest(ExecuteResult executeResult, int status) { DispatchJobResultRequest dispatchJobRequest = new DispatchJobResultRequest(); - dispatchJobRequest.setTaskBatchId(jobContext.getTaskId()); + dispatchJobRequest.setTaskBatchId(jobContext.getTaskBatchId()); dispatchJobRequest.setGroupName(jobContext.getGroupName()); dispatchJobRequest.setJobId(jobContext.getJobId()); dispatchJobRequest.setTaskId(jobContext.getTaskId()); - dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); dispatchJobRequest.setTaskType(jobContext.getTaskType()); - dispatchJobRequest.setExecuteResult(ExecuteResult.failure(t.getMessage())); - CLIENT.dispatchResult(dispatchJobRequest); + dispatchJobRequest.setExecuteResult(executeResult); + dispatchJobRequest.setTaskStatus(status); + return dispatchJobRequest; } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java index 6c4f0749..807bf983 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/JobExecutorResultActor.java @@ -45,6 +45,7 @@ public class JobExecutorResultActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder().match(JobExecutorResultDTO.class, result -> { + log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java index 13bd2680..a2ee0cd2 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dispatch/ScanJobTaskActor.java @@ -80,7 +80,7 @@ public class ScanJobTaskActor extends AbstractActor { WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setTriggerType(partitionTask.getTriggerType()); waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt()); + waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); Job job = new Job(); job.setId(partitionTask.getId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java index 1012de9d..a4796b06 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java @@ -49,4 +49,9 @@ public class JobPartitionTask extends PartitionTask { */ private Integer executorTimeout; + /** + * 任务类型 + */ + private Integer taskType; + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java index 5275addc..0427e5df 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/generator/batch/JobTaskBatchGenerator.java @@ -57,7 +57,7 @@ public class JobTaskBatchGenerator { JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId()); - JobTimerWheelHandler.register(context.getGroupName(), context.getJobId(), + JobTimerWheelHandler.register(context.getGroupName(), jobTaskBatch.getId(), new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java index 4e3d8ff7..27b4fd93 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/prepare/WaitJobPrepareHandler.java @@ -32,7 +32,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); // 若时间轮中数据不存在则重新加入 - if (!JobTimerWheelHandler.isExisted(jobPrepareDTO.getGroupName(), jobPrepareDTO.getJobId())) { + if (!JobTimerWheelHandler.isExisted(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId())) { // 进入时间轮 long delay = jobPrepareDTO.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() @@ -40,7 +40,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler { JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId()); - JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getJobId(), + JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId(), new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java index 7ebbce90..ca8e1112 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/handler/timer/JobTimerWheelHandler.java @@ -45,7 +45,7 @@ public class JobTimerWheelHandler implements Lifecycle { .build(); } - public static void register(String groupName, Long taskId, TimerTask task, long delay, TimeUnit unit) { + public static void register(String groupName, Long taskBatchId, TimerTask task, long delay, TimeUnit unit) { if (delay < 0) { delay = 0; @@ -54,19 +54,19 @@ public class JobTimerWheelHandler implements Lifecycle { // TODO 支持可配置 if (delay > 60 * 1000) { LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]", - groupName, taskId, delay); + groupName, taskBatchId, delay); return; } - Timeout timeout = getTimeout(groupName, taskId); + Timeout timeout = getTimeout(groupName, taskBatchId); if (Objects.isNull(timeout)) { try { - log.info("加入时间轮. delay:[{}ms] taskId:[{}]", delay, taskId); + log.info("加入时间轮. delay:[{}ms] taskId:[{}]", delay, taskBatchId); timeout = timer.newTimeout(task, delay, unit); - cache.put(getKey(groupName, taskId), timeout); + cache.put(getKey(groupName, taskBatchId), timeout); } catch (Exception e) { LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]", - groupName, taskId, e); + groupName, taskBatchId, e); } }