diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/window/SlidingWindow.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/window/SlidingWindow.java index ab5dcb4e..4ba51e44 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/window/SlidingWindow.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/window/SlidingWindow.java @@ -155,6 +155,9 @@ public class SlidingWindow { private void oldWindowAdd(T data) { LocalDateTime windowPeriod = getNewWindowPeriod(); + if (Objects.isNull(windowPeriod)) { + return; + } ConcurrentLinkedQueue list = saveData.get(windowPeriod); list.add(data); @@ -222,7 +225,13 @@ public class SlidingWindow { * @return 窗口期时间 */ private LocalDateTime getOldWindowPeriod() { - return saveData.firstKey(); + try { + return saveData.firstKey(); + } catch (NoSuchElementException e) { + EasyRetryLog.LOCAL.error("第一个窗口异常. saveData:[{}]", JsonUtil.toJsonString(saveData)); + return null; + } + } /** @@ -231,7 +240,12 @@ public class SlidingWindow { * @return 窗口期时间 */ private LocalDateTime getNewWindowPeriod() { - return saveData.lastKey(); + try { + return saveData.lastKey(); + } catch (NoSuchElementException e) { + EasyRetryLog.LOCAL.error("第后一个窗口异常. saveData:[{}]", JsonUtil.toJsonString(saveData)); + return null; + } } /** @@ -241,11 +255,15 @@ public class SlidingWindow { */ private boolean isOpenNewWindow(LocalDateTime now) { - if (saveData.size() == 0) { + if (saveData.isEmpty()) { return true; } LocalDateTime windowPeriod = getNewWindowPeriod(); + if (Objects.isNull(windowPeriod)) { + return true; + } + return windowPeriod.isBefore(now); } @@ -256,11 +274,14 @@ public class SlidingWindow { */ private void extract(LocalDateTime condition) { - if (saveData.size() == 0) { + if (saveData.isEmpty()) { return; } LocalDateTime windowPeriod = getOldWindowPeriod(); + if (Objects.isNull(windowPeriod)) { + return; + } // 删除过期窗口期数据 removeInvalidWindow(windowPeriod); diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java index 1237c3d7..9fba5f8a 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java @@ -49,10 +49,14 @@ public abstract class AbstractJobExecutor implements IJobExecutor { jobArgs = buildJobArgs(jobContext); } - // 初始化调度信息(日志上报LogUtil) - ThreadLocalLogUtil.setContext(jobContext); + try { + // 初始化调度信息(日志上报LogUtil) + ThreadLocalLogUtil.setContext(jobContext); + return doJobExecute(jobArgs); + } finally { + ThreadLocalLogUtil.removeContext(); + } - return doJobExecute(jobArgs); }); FutureCache.addFuture(jobContext.getTaskBatchId(), submit); diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java index de1be81e..54d1da33 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java @@ -40,35 +40,44 @@ public class JobExecutorFutureCallback implements FutureCallback @Override public void onSuccess(ExecuteResult result) { - // 上报执行成功 - EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result)); - - if (Objects.isNull(result)) { - result = ExecuteResult.success(); - } - - int taskStatus; - if (result.getStatus() == StatusEnum.NO.getStatus()) { - taskStatus = JobTaskStatusEnum.FAIL.getStatus(); - } else { - taskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); - } try { + // 初始化调度信息(日志上报LogUtil) + ThreadLocalLogUtil.setContext(jobContext); + + // 上报执行成功 + EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), + JsonUtil.toJsonString(result)); + + if (Objects.isNull(result)) { + result = ExecuteResult.success(); + } + + int taskStatus; + if (result.getStatus() == StatusEnum.NO.getStatus()) { + taskStatus = JobTaskStatusEnum.FAIL.getStatus(); + } else { + taskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); + } + CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus)); } catch (Exception e) { EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); } finally { - stopThreadPool(); ThreadLocalLogUtil.removeContext(); + stopThreadPool(); } } @Override public void onFailure(final Throwable t) { - // 上报执行失败 - EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t); + try { + // 初始化调度信息(日志上报LogUtil) + ThreadLocalLogUtil.setContext(jobContext); + + // 上报执行失败 + EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t); ExecuteResult failure = ExecuteResult.failure(); if (t instanceof CancellationException) { @@ -78,13 +87,13 @@ public class JobExecutorFutureCallback implements FutureCallback } CLIENT.dispatchResult( - buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus()) + buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus()) ); } catch (Exception e) { EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); } finally { - stopThreadPool(); ThreadLocalLogUtil.removeContext(); + stopThreadPool(); } }