reactor:2.5.0

1. 优化时间轮的执行器使用自定义线程池
2. 修复创建重试日志失败问题
3. 优化sql文件,添加索引
This commit is contained in:
byteblogs168 2023-12-03 23:29:11 +08:00
parent a8021e6690
commit 3c808f5c56
9 changed files with 21 additions and 12 deletions

View File

@ -62,7 +62,7 @@ CREATE TABLE `notify_config`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, `scene_name`)
) ENGINE = InnoDB ) ENGINE = InnoDB
AUTO_INCREMENT = 0 AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='通知配置' DEFAULT CHARSET = utf8mb4 COMMENT ='通知配置'
@ -393,7 +393,7 @@ CREATE TABLE `job_notify_config`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name` (`namespace_id`,`group_name`) KEY `idx_namespace_id_group_name_job_id` (`namespace_id`,`group_name`, job_id)
) ENGINE=InnoDB ) ENGINE=InnoDB
AUTO_INCREMENT=4 AUTO_INCREMENT=4
DEFAULT CHARSET=utf8mb4 COMMENT='job通知配置'; DEFAULT CHARSET=utf8mb4 COMMENT='job通知配置';
@ -415,6 +415,7 @@ CREATE TABLE `job_summary`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name_job_id` (`namespace_id`, `group_name`, job_id),
UNIQUE KEY `uk_job_id_trigger_at` (`job_id`, `trigger_at`) USING BTREE UNIQUE KEY `uk_job_id_trigger_at` (`job_id`, `trigger_at`) USING BTREE
) ENGINE = InnoDB ) ENGINE = InnoDB
AUTO_INCREMENT = 1 AUTO_INCREMENT = 1
@ -434,7 +435,8 @@ CREATE TABLE `retry_summary`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `uk_scene_name_trigger_at` (`scene_name`, `trigger_at`) USING BTREE KEY `idx_namespace_id_group_name_scene_name` (`namespace_id`, `group_name`, scene_name),
UNIQUE KEY `uk_scene_name_trigger_at` (`namespace_id`, `group_name`, `scene_name`, `trigger_at`) USING BTREE
) ENGINE = InnoDB ) ENGINE = InnoDB
AUTO_INCREMENT = 1 AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='DashBoard_Retry'; DEFAULT CHARSET = utf8mb4 COMMENT ='DashBoard_Retry';

View File

@ -29,7 +29,7 @@
<foreach collection="list" item="item" separator=","> <foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, (#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId},
#{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs},
#{item.taskType}, #{item.createDt}, #{namespaceId}) #{item.taskType}, #{item.createDt}, #{item.namespaceId})
</foreach> </foreach>
</insert> </insert>
</mapper> </mapper>

View File

@ -30,7 +30,7 @@
<foreach collection="list" item="item" separator=","> <foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, (#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId},
#{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs},
#{item.taskType}, #{item.createDt}, #{namespaceId}) #{item.taskType}, #{item.createDt}, #{item.namespaceId})
</foreach> </foreach>
</insert> </insert>

View File

@ -29,7 +29,7 @@
<foreach collection="list" item="item" separator=","> <foreach collection="list" item="item" separator=",">
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, (#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId},
#{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs},
#{item.taskType}, #{item.createDt}, #{namespaceId}) #{item.taskType}, #{item.createDt}, #{item.namespaceId})
</foreach> </foreach>
</insert> </insert>
</mapper> </mapper>

View File

@ -109,7 +109,7 @@ public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmE
@Override @Override
public void onApplicationEvent(JobTaskFailAlarmEvent event) { public void onApplicationEvent(JobTaskFailAlarmEvent event) {
if (queue.offer(event.getJobTaskBatchId())) { if (!queue.offer(event.getJobTaskBatchId())) {
LogUtils.warn(log, "JOB任务执行失败告警队列已满"); LogUtils.warn(log, "JOB任务执行失败告警队列已满");
} }
} }

View File

@ -11,6 +11,8 @@ import org.springframework.stereotype.Component;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -25,6 +27,9 @@ public class JobTimerWheel implements Lifecycle {
private static final int TICK_DURATION = 100; private static final int TICK_DURATION = 100;
private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-"; private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-";
private static HashedWheelTimer timer = null; private static HashedWheelTimer timer = null;
private static final ThreadPoolExecutor executor =
new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX));
private static final TimerIdempotent idempotent = new TimerIdempotent(); private static final TimerIdempotent idempotent = new TimerIdempotent();
@ -32,7 +37,7 @@ public class JobTimerWheel implements Lifecycle {
public void start() { public void start() {
timer = new HashedWheelTimer( timer = new HashedWheelTimer(
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS, 512, true, -1, executor);
timer.start(); timer.start();
} }

View File

@ -74,7 +74,7 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm<Ret
@Override @Override
public void onApplicationEvent(RetryTaskFailDeadLetterAlarmEvent event) { public void onApplicationEvent(RetryTaskFailDeadLetterAlarmEvent event) {
if (queue.offer(event.getRetryDeadLetters())) { if (!queue.offer(event.getRetryDeadLetters())) {
LogUtils.warn(log, "任务重试失败进入死信队列告警队列已满"); LogUtils.warn(log, "任务重试失败进入死信队列告警队列已满");
} }
} }

View File

@ -71,7 +71,7 @@ public class RetryTaskFailMoreThresholdAlarmListener extends
@Override @Override
public void onApplicationEvent(RetryTaskFailMoreThresholdAlarmEvent event) { public void onApplicationEvent(RetryTaskFailMoreThresholdAlarmEvent event) {
if (queue.offer(event.getRetryTask())) { if (!queue.offer(event.getRetryTask())) {
LogUtils.warn(log, "任务失败数量超过阈值告警队列已满"); LogUtils.warn(log, "任务失败数量超过阈值告警队列已满");
} }
} }

View File

@ -26,8 +26,10 @@ public class RetryTimerWheel implements Lifecycle {
private static final int TICK_DURATION = 500; private static final int TICK_DURATION = 500;
private static final String THREAD_NAME_PREFIX = "retry-task-timer-wheel-"; private static final String THREAD_NAME_PREFIX = "retry-task-timer-wheel-";
private static HashedWheelTimer timer = null; private static HashedWheelTimer timer = null;
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS, private static final ThreadPoolExecutor executor =
new LinkedBlockingQueue<>()); new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new CustomizableThreadFactory(THREAD_NAME_PREFIX));
private static final TimerIdempotent idempotent = new TimerIdempotent(); private static final TimerIdempotent idempotent = new TimerIdempotent();
@Override @Override