feat: 2.6.0

1. 优化滑动窗口
This commit is contained in:
byteblogs168 2024-01-24 16:00:14 +08:00
parent dde8459a96
commit 2437f4660d
3 changed files with 59 additions and 25 deletions

View File

@ -155,6 +155,9 @@ public class SlidingWindow<T> {
private void oldWindowAdd(T data) {
LocalDateTime windowPeriod = getNewWindowPeriod();
if (Objects.isNull(windowPeriod)) {
return;
}
ConcurrentLinkedQueue<T> list = saveData.get(windowPeriod);
list.add(data);
@ -222,7 +225,13 @@ public class SlidingWindow<T> {
* @return 窗口期时间
*/
private LocalDateTime getOldWindowPeriod() {
return saveData.firstKey();
try {
return saveData.firstKey();
} catch (NoSuchElementException e) {
EasyRetryLog.LOCAL.error("第一个窗口异常. saveData:[{}]", JsonUtil.toJsonString(saveData));
return null;
}
}
/**
@ -231,7 +240,12 @@ public class SlidingWindow<T> {
* @return 窗口期时间
*/
private LocalDateTime getNewWindowPeriod() {
return saveData.lastKey();
try {
return saveData.lastKey();
} catch (NoSuchElementException e) {
EasyRetryLog.LOCAL.error("第后一个窗口异常. saveData:[{}]", JsonUtil.toJsonString(saveData));
return null;
}
}
/**
@ -241,11 +255,15 @@ public class SlidingWindow<T> {
*/
private boolean isOpenNewWindow(LocalDateTime now) {
if (saveData.size() == 0) {
if (saveData.isEmpty()) {
return true;
}
LocalDateTime windowPeriod = getNewWindowPeriod();
if (Objects.isNull(windowPeriod)) {
return true;
}
return windowPeriod.isBefore(now);
}
@ -256,11 +274,14 @@ public class SlidingWindow<T> {
*/
private void extract(LocalDateTime condition) {
if (saveData.size() == 0) {
if (saveData.isEmpty()) {
return;
}
LocalDateTime windowPeriod = getOldWindowPeriod();
if (Objects.isNull(windowPeriod)) {
return;
}
// 删除过期窗口期数据
removeInvalidWindow(windowPeriod);

View File

@ -49,10 +49,14 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
jobArgs = buildJobArgs(jobContext);
}
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
try {
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
return doJobExecute(jobArgs);
} finally {
ThreadLocalLogUtil.removeContext();
}
return doJobExecute(jobArgs);
});
FutureCache.addFuture(jobContext.getTaskBatchId(), submit);

View File

@ -40,35 +40,44 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
@Override
public void onSuccess(ExecuteResult result) {
// 上报执行成功
EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result));
if (Objects.isNull(result)) {
result = ExecuteResult.success();
}
int taskStatus;
if (result.getStatus() == StatusEnum.NO.getStatus()) {
taskStatus = JobTaskStatusEnum.FAIL.getStatus();
} else {
taskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
}
try {
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
// 上报执行成功
EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(),
JsonUtil.toJsonString(result));
if (Objects.isNull(result)) {
result = ExecuteResult.success();
}
int taskStatus;
if (result.getStatus() == StatusEnum.NO.getStatus()) {
taskStatus = JobTaskStatusEnum.FAIL.getStatus();
} else {
taskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
}
CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus));
} catch (Exception e) {
EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
} finally {
stopThreadPool();
ThreadLocalLogUtil.removeContext();
stopThreadPool();
}
}
@Override
public void onFailure(final Throwable t) {
// 上报执行失败
EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
try {
// 初始化调度信息日志上报LogUtil
ThreadLocalLogUtil.setContext(jobContext);
// 上报执行失败
EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
ExecuteResult failure = ExecuteResult.failure();
if (t instanceof CancellationException) {
@ -78,13 +87,13 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
}
CLIENT.dispatchResult(
buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus())
buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus())
);
} catch (Exception e) {
EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
} finally {
stopThreadPool();
ThreadLocalLogUtil.removeContext();
stopThreadPool();
}
}