feat: 2.6.0
1. 优化滑动窗口
This commit is contained in:
		
							parent
							
								
									2d18ff42a8
								
							
						
					
					
						commit
						6784274c0b
					
				@ -155,6 +155,9 @@ public class SlidingWindow<T> {
 | 
			
		||||
    private void oldWindowAdd(T data) {
 | 
			
		||||
 | 
			
		||||
        LocalDateTime windowPeriod = getNewWindowPeriod();
 | 
			
		||||
        if (Objects.isNull(windowPeriod)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        ConcurrentLinkedQueue<T> list = saveData.get(windowPeriod);
 | 
			
		||||
        list.add(data);
 | 
			
		||||
@ -222,7 +225,13 @@ public class SlidingWindow<T> {
 | 
			
		||||
     * @return 窗口期时间
 | 
			
		||||
     */
 | 
			
		||||
    private LocalDateTime getOldWindowPeriod() {
 | 
			
		||||
        return saveData.firstKey();
 | 
			
		||||
        try {
 | 
			
		||||
            return saveData.firstKey();
 | 
			
		||||
        } catch (NoSuchElementException e) {
 | 
			
		||||
            EasyRetryLog.LOCAL.error("第一个窗口异常. saveData:[{}]", JsonUtil.toJsonString(saveData));
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@ -231,7 +240,12 @@ public class SlidingWindow<T> {
 | 
			
		||||
     * @return 窗口期时间
 | 
			
		||||
     */
 | 
			
		||||
    private LocalDateTime getNewWindowPeriod() {
 | 
			
		||||
        return saveData.lastKey();
 | 
			
		||||
        try {
 | 
			
		||||
            return saveData.lastKey();
 | 
			
		||||
        } catch (NoSuchElementException e) {
 | 
			
		||||
            EasyRetryLog.LOCAL.error("第后一个窗口异常. saveData:[{}]", JsonUtil.toJsonString(saveData));
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@ -241,11 +255,15 @@ public class SlidingWindow<T> {
 | 
			
		||||
     */
 | 
			
		||||
    private boolean isOpenNewWindow(LocalDateTime now) {
 | 
			
		||||
 | 
			
		||||
        if (saveData.size() == 0) {
 | 
			
		||||
        if (saveData.isEmpty()) {
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        LocalDateTime windowPeriod = getNewWindowPeriod();
 | 
			
		||||
        if (Objects.isNull(windowPeriod)) {
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return windowPeriod.isBefore(now);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -256,11 +274,14 @@ public class SlidingWindow<T> {
 | 
			
		||||
     */
 | 
			
		||||
    private void extract(LocalDateTime condition) {
 | 
			
		||||
 | 
			
		||||
        if (saveData.size() == 0) {
 | 
			
		||||
        if (saveData.isEmpty()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        LocalDateTime windowPeriod = getOldWindowPeriod();
 | 
			
		||||
        if (Objects.isNull(windowPeriod)) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 删除过期窗口期数据
 | 
			
		||||
        removeInvalidWindow(windowPeriod);
 | 
			
		||||
 | 
			
		||||
@ -49,10 +49,14 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
 | 
			
		||||
                jobArgs = buildJobArgs(jobContext);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // 初始化调度信息(日志上报LogUtil)
 | 
			
		||||
            ThreadLocalLogUtil.setContext(jobContext);
 | 
			
		||||
            try {
 | 
			
		||||
                // 初始化调度信息(日志上报LogUtil)
 | 
			
		||||
                ThreadLocalLogUtil.setContext(jobContext);
 | 
			
		||||
                return doJobExecute(jobArgs);
 | 
			
		||||
            } finally {
 | 
			
		||||
                ThreadLocalLogUtil.removeContext();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return doJobExecute(jobArgs);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        FutureCache.addFuture(jobContext.getTaskBatchId(), submit);
 | 
			
		||||
 | 
			
		||||
@ -40,35 +40,44 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onSuccess(ExecuteResult result) {
 | 
			
		||||
        // 上报执行成功
 | 
			
		||||
        EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result));
 | 
			
		||||
 | 
			
		||||
        if (Objects.isNull(result)) {
 | 
			
		||||
            result = ExecuteResult.success();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        int taskStatus;
 | 
			
		||||
        if (result.getStatus() == StatusEnum.NO.getStatus()) {
 | 
			
		||||
            taskStatus = JobTaskStatusEnum.FAIL.getStatus();
 | 
			
		||||
        } else {
 | 
			
		||||
            taskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            // 初始化调度信息(日志上报LogUtil)
 | 
			
		||||
            ThreadLocalLogUtil.setContext(jobContext);
 | 
			
		||||
 | 
			
		||||
            // 上报执行成功
 | 
			
		||||
            EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(),
 | 
			
		||||
                JsonUtil.toJsonString(result));
 | 
			
		||||
 | 
			
		||||
            if (Objects.isNull(result)) {
 | 
			
		||||
                result = ExecuteResult.success();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            int taskStatus;
 | 
			
		||||
            if (result.getStatus() == StatusEnum.NO.getStatus()) {
 | 
			
		||||
                taskStatus = JobTaskStatusEnum.FAIL.getStatus();
 | 
			
		||||
            } else {
 | 
			
		||||
                taskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus));
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
 | 
			
		||||
        } finally {
 | 
			
		||||
            stopThreadPool();
 | 
			
		||||
            ThreadLocalLogUtil.removeContext();
 | 
			
		||||
            stopThreadPool();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onFailure(final Throwable t) {
 | 
			
		||||
        // 上报执行失败
 | 
			
		||||
        EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            // 初始化调度信息(日志上报LogUtil)
 | 
			
		||||
            ThreadLocalLogUtil.setContext(jobContext);
 | 
			
		||||
 | 
			
		||||
            // 上报执行失败
 | 
			
		||||
            EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
 | 
			
		||||
 | 
			
		||||
            ExecuteResult failure = ExecuteResult.failure();
 | 
			
		||||
            if (t instanceof CancellationException) {
 | 
			
		||||
@ -78,13 +87,13 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            CLIENT.dispatchResult(
 | 
			
		||||
                    buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus())
 | 
			
		||||
                buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus())
 | 
			
		||||
            );
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
 | 
			
		||||
        } finally {
 | 
			
		||||
            stopThreadPool();
 | 
			
		||||
            ThreadLocalLogUtil.removeContext();
 | 
			
		||||
            stopThreadPool();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user