From c821f7057f91b86b228ba3234e5d2b611dd47f74 Mon Sep 17 00:00:00 2001 From: CYQ <123@163.com> Date: Wed, 5 Nov 2025 16:00:35 +0800 Subject: [PATCH] =?UTF-8?q?ic=E5=8D=A1=E6=8D=A2=E5=8D=A1=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E8=B4=A6=E5=8F=B7=E6=9D=A1=E4=BB=B6=20?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E8=BF=9E=E6=8E=A5=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MpsFormalBatch/conf/application.properties | 17 +- .../gtsoft/mps/batch/MpsFormalBatchMain.java | 13 +- .../mps/batch/service/BatchStatusService.java | 11 +- .../test/ConnectionPoolRobustnessTest.java | 204 -------------- .../mps/batch/util/DatabaseConnection.java | 260 +++++++++--------- .../mps/batch/util/SimpleConnectionPool.java | 202 -------------- 6 files changed, 147 insertions(+), 560 deletions(-) delete mode 100644 MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/test/ConnectionPoolRobustnessTest.java delete mode 100644 MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/SimpleConnectionPool.java diff --git a/MpsFormalBatch/conf/application.properties b/MpsFormalBatch/conf/application.properties index 3bef499..106ed68 100644 --- a/MpsFormalBatch/conf/application.properties +++ b/MpsFormalBatch/conf/application.properties @@ -6,25 +6,14 @@ db.mysql.password=ENC(K2gWg4+uYaXiUFDXQWXDBA==) db.mysql.driver=com.mysql.cj.jdbc.Driver # Database Configuration - DM -db.dm.url=jdbc:dm://101.42.2.210:5236/CDSDB?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8 -db.dm.username=cdsdb -db.dm.password=GTsoft+123. +db.dm.url=jdbc:dm://xx.xx.x.x:x/xxDB?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8 +db.dm.username= +db.dm.password= db.dm.driver=dm.jdbc.driver.DmDriver # Database Type (mysql/dm) db.type=mysql -# Connection Pool Settings -db.pool.maxActive=10 -db.pool.initialSize=3 -db.pool.maxWait=60000 -db.pool.timeBetweenEvictionRunsMillis=60000 -db.pool.minEvictableIdleTimeMillis=300000 -db.pool.validationQuery=SELECT 1 -db.pool.testWhileIdle=true -db.pool.testOnBorrow=false -db.pool.testOnReturn=false - # Batch Settings batch.retry.maxTimes=3 batch.timeout=1800000 diff --git a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/MpsFormalBatchMain.java b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/MpsFormalBatchMain.java index 0adf6c7..9184df5 100644 --- a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/MpsFormalBatchMain.java +++ b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/MpsFormalBatchMain.java @@ -43,6 +43,8 @@ public class MpsFormalBatchMain { // 最大等待时间(小时) private static final int MAX_WAIT_HOURS = 240; + // 状态检查间隔时间 + private static final int WAIT_TIME = 10; public MpsFormalBatchMain() { this.batchStatusService = new BatchStatusService(); @@ -197,8 +199,8 @@ public class MpsFormalBatchMain { } // 等待60分钟后重新检查 - batchLogger.info("等待10分钟后重新检查状态"); - TimeUnit.MINUTES.sleep(10); + batchLogger.info("等待{}分钟后重新检查状态",WAIT_TIME); + TimeUnit.MINUTES.sleep(WAIT_TIME); return executeBatch(context); // 递归调用 } @@ -298,8 +300,11 @@ public class MpsFormalBatchMain { */ private boolean executeNormalBatch(String batchMonth) { try { + + batchLogger.info("--------->>>>准备执行批量"); + // 检查是否可以执行预处理批量 - // importStatus=0 and (preBatchStatus=0 or preBatchStatus=3) + // importStatus=1 and (preBatchStatus=0 or preBatchStatus=3) if (batchStatusService.canExecutePreBatch(batchMonth)) { // 更新状态为进行中 batchStatusService.updateBatchStatus( @@ -344,6 +349,8 @@ public class MpsFormalBatchMain { return executeNormalBatch(batchMonth); // 递归调用 } + batchLogger.info("---------<<<<批量执行完成"); + // 如果都不满足,可能是未知状态 batchLogger.error("未知状态,无法处理"); return false; diff --git a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/service/BatchStatusService.java b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/service/BatchStatusService.java index 875a41e..4d77836 100644 --- a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/service/BatchStatusService.java +++ b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/service/BatchStatusService.java @@ -39,6 +39,9 @@ public class BatchStatusService { public static final String MARKETING_PERFORMANCE_MD = "marketing_performance_md"; // 正式批量第二次正式处理汇总结果表 public static final String MARKETING_PERFORMANCE_M = "marketing_performance_m"; + // 状态检查间隔时间 + private static final int WAIT_TIME = 10; + private static final Logger logger = LoggerFactory.getLogger(BatchStatusService.class); private static final Logger batchLogger = LoggerFactory.getLogger("BATCH_EXECUTION"); // 更新批量状态等栏位 @@ -275,8 +278,8 @@ public class BatchStatusService { long maxWaitMillis = TimeUnit.HOURS.toMillis(maxWaitHours); while (System.currentTimeMillis() - startTime < maxWaitMillis) { - batchLogger.info("等待导入状态完成,10分钟后重新检查..."); - TimeUnit.MINUTES.sleep(10); + batchLogger.info("等待导入状态完成,{}分钟后重新检查...",WAIT_TIME); + TimeUnit.MINUTES.sleep(WAIT_TIME); // 检查导入状态是否完成 if (isImportCompleted(batchMonth)) { @@ -302,8 +305,8 @@ public class BatchStatusService { long maxWaitMillis = TimeUnit.HOURS.toMillis(maxWaitHours); while (System.currentTimeMillis() - startTime < maxWaitMillis) { - batchLogger.info("等待预处理和核对状态完成,10分钟后重新检查..."); - TimeUnit.MINUTES.sleep(10); + batchLogger.info("等待预处理和核对状态完成,{}分钟后重新检查...",WAIT_TIME); + TimeUnit.MINUTES.sleep(WAIT_TIME); // 检查预处理和核对状态是否完成 if (isPreBatchCompletedAndChecked(batchMonth)) { diff --git a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/test/ConnectionPoolRobustnessTest.java b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/test/ConnectionPoolRobustnessTest.java deleted file mode 100644 index 226fd3e..0000000 --- a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/test/ConnectionPoolRobustnessTest.java +++ /dev/null @@ -1,204 +0,0 @@ -package com.gtsoft.mps.batch.test; - -import com.gtsoft.mps.batch.config.LogConfig; -import com.gtsoft.mps.batch.util.DatabaseConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** - * 数据库连接池健壮性测试 - * 用于测试连接池的性能和健壮性 - * - * @author mps-batch - * @version 1.0 - */ -public class ConnectionPoolRobustnessTest { - - private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolRobustnessTest.class); - private static final Logger batchLogger = LoggerFactory.getLogger("BATCH_EXECUTION"); - - public static void main(String[] args) { - // 初始化日志配置 - LogConfig.initLogConfig(); - - System.out.println("========== 数据库连接池健壮性测试开始 =========="); - batchLogger.info("========== 数据库连接池健壮性测试开始 =========="); - - ConnectionPoolRobustnessTest test = new ConnectionPoolRobustnessTest(); - - try { - // 测试1:基本连接获取和释放 - test.testBasicConnectionOps(); - - // 测试2:连接池容量测试 - test.testConnectionPoolCapacity(); - - // 测试3:并发连接测试 - test.testConcurrentConnections(); - - // 测试4:连接泄漏检测 - test.testConnectionLeak(); - - } catch (Exception e) { - logger.error("测试过程中出现异常", e); - } finally { - // 关闭连接池 - DatabaseConnection.getInstance().shutdown(); - } - - System.out.println("========== 数据库连接池健壮性测试结束 =========="); - batchLogger.info("========== 数据库连接池健壮性测试结束 =========="); - } - - /** - * 测试基本连接操作 - */ - private void testBasicConnectionOps() { - batchLogger.info("==============================="); - batchLogger.info("测试1:基本连接获取和释放测试"); - - DatabaseConnection dbConnection = DatabaseConnection.getInstance(); - - try { - // 连接池状态检查 - batchLogger.info("初始状态:{}", dbConnection.getConnectionPoolStatus()); - - // 获取连接 - Connection conn1 = dbConnection.getConnection(); - batchLogger.info("获取连接1后:{}", dbConnection.getConnectionPoolStatus()); - - Connection conn2 = dbConnection.getConnection(); - batchLogger.info("获取连接2后:{}", dbConnection.getConnectionPoolStatus()); - - // 释放连接 - dbConnection.closeConnection(conn1); - batchLogger.info("释放连接1后:{}", dbConnection.getConnectionPoolStatus()); - - dbConnection.closeConnection(conn2); - batchLogger.info("释放连接2后:{}", dbConnection.getConnectionPoolStatus()); - - batchLogger.info("基本连接操作测试通过"); - - } catch (Exception e) { - batchLogger.error("基本连接操作测试失败", e); - } - } - - /** - * 测试连接池容量 - */ - private void testConnectionPoolCapacity() { - batchLogger.info("==============================="); - batchLogger.info("测试2:连接池容量测试"); - - DatabaseConnection dbConnection = DatabaseConnection.getInstance(); - List connections = new ArrayList<>(); - - try { - // 尝试获取最大数量的连接 - for (int i = 0; i < 12; i++) { // 超过最大连接数10 - try { - Connection conn = dbConnection.getConnection(); - connections.add(conn); - batchLogger.info("获取连接{}:{}", i + 1, dbConnection.getConnectionPoolStatus()); - } catch (SQLException e) { - batchLogger.warn("无法获取连接{}:{}", i + 1, e.getMessage()); - break; - } - } - - batchLogger.info("连接池容量测试完成,成功获取{}个连接", connections.size()); - - } finally { - // 释放所有连接 - for (Connection conn : connections) { - dbConnection.closeConnection(conn); - } - batchLogger.info("释放所有连接后:{}", dbConnection.getConnectionPoolStatus()); - } - } - - /** - * 测试并发连接 - */ - private void testConcurrentConnections() { - batchLogger.info("==============================="); - batchLogger.info("测试3:并发连接测试"); - - DatabaseConnection dbConnection = DatabaseConnection.getInstance(); - ExecutorService executor = Executors.newFixedThreadPool(5); - List> futures = new ArrayList<>(); - - // 创建5个并发任务 - for (int i = 0; i < 5; i++) { - final int taskId = i + 1; - Future future = executor.submit(() -> { - try { - Connection conn = dbConnection.getConnection(); - batchLogger.info("任务{}获取连接成功:{}", taskId, dbConnection.getConnectionPoolStatus()); - - // 模拟使用连接 - Thread.sleep(1000); - - dbConnection.closeConnection(conn); - batchLogger.info("任务{}释放连接:{}", taskId, dbConnection.getConnectionPoolStatus()); - - } catch (Exception e) { - batchLogger.error("任务{}执行失败", taskId, e); - } - }); - futures.add(future); - } - - // 等待所有任务完成 - for (Future future : futures) { - try { - future.get(); - } catch (Exception e) { - batchLogger.error("等待任务完成时出现异常", e); - } - } - - executor.shutdown(); - batchLogger.info("并发连接测试完成:{}", dbConnection.getConnectionPoolStatus()); - } - - /** - * 测试连接泄漏检测 - */ - private void testConnectionLeak() { - batchLogger.info("==============================="); - batchLogger.info("测试4:连接泄漏检测测试"); - - DatabaseConnection dbConnection = DatabaseConnection.getInstance(); - - try { - batchLogger.info("测试前:{}", dbConnection.getConnectionPoolStatus()); - - // 模拟连接泄漏(获取连接但不释放) - for (int i = 0; i < 3; i++) { - @SuppressWarnings("unused") - Connection conn = dbConnection.getConnection(); - batchLogger.info("获取连接{}(不释放):{}", i + 1, dbConnection.getConnectionPoolStatus()); - } - - // 强制垃圾回收 - System.gc(); - Thread.sleep(1000); - - batchLogger.info("连接泄漏测试完成:{}", dbConnection.getConnectionPoolStatus()); - batchLogger.warn("注意:这个测试模拟了连接泄漏情况,连接池中可能有未释放的连接"); - - } catch (Exception e) { - batchLogger.error("连接泄漏检测测试失败", e); - } - } -} \ No newline at end of file diff --git a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/DatabaseConnection.java b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/DatabaseConnection.java index e8cd38e..a3ecf39 100644 --- a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/DatabaseConnection.java +++ b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/DatabaseConnection.java @@ -4,43 +4,57 @@ import com.gtsoft.mps.batch.config.ConfigManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; /** - * 数据库连接管理类 - * 支持MySQL和达梦数据库,使用连接池管理连接 - * + * 简化的数据库连接管理 - 适合顺序执行的批量程序 + * * @author mps-batch - * @version 1.0 */ public class DatabaseConnection { - + private static final Logger logger = LoggerFactory.getLogger(DatabaseConnection.class); - + private static volatile DatabaseConnection instance; - private final String dbType; - private final SimpleConnectionPool connectionPool; - - /** - * 私有构造方法,实现单例模式 - */ + private Connection connection; + private boolean connectionInUse = false; + private final String url; + private final String username; + private final String password; + private final String driver; + private DatabaseConnection() { - System.out.println("[DatabaseConnection] 开始初始化数据库连接管理器..."); - - this.dbType = ConfigManager.getString("db.type", "mysql"); - System.out.println("[DatabaseConnection] dbType=" + dbType); - - // 初始化连接池 - this.connectionPool = SimpleConnectionPool.getInstance(); - - System.out.println("[DatabaseConnection] 初始化完成,使用连接池管理连接"); + System.out.println("[DatabaseConnection] 初始化简单数据库连接管理..."); + + String dbType = ConfigManager.getString("db.type", "mysql"); + System.out.println("[DatabaseConnection] 数据库类型: " + dbType); + + if ("mysql".equalsIgnoreCase(dbType)) { + this.url = ConfigManager.getString("db.mysql.url"); + this.username = ConfigManager.getString("db.mysql.username"); + this.password = ConfigManager.getString("db.mysql.password"); + this.driver = ConfigManager.getString("db.mysql.driver", "com.mysql.cj.jdbc.Driver"); + } else if ("dm".equalsIgnoreCase(dbType)) { + this.url = ConfigManager.getString("db.dm.url"); + this.username = ConfigManager.getString("db.dm.username"); + this.password = ConfigManager.getString("db.dm.password"); + this.driver = ConfigManager.getString("db.dm.driver", "dm.jdbc.driver.DmDriver"); + } else { + throw new RuntimeException("不支持的数据库类型: " + dbType); + } + + // 加载驱动 + try { + Class.forName(driver); + logger.info("数据库驱动加载成功: {}", driver); + } catch (ClassNotFoundException e) { + logger.error("数据库驱动加载失败: {}", driver, e); + throw new RuntimeException("数据库驱动加载失败: " + driver, e); + } } - - /** - * 获取单例实例 - * - * @return DatabaseConnection实例 - */ + public static DatabaseConnection getInstance() { if (instance == null) { synchronized (DatabaseConnection.class) { @@ -51,54 +65,88 @@ public class DatabaseConnection { } return instance; } - + /** * 获取数据库连接 - * - * @return 数据库连接 - * @throws SQLException 数据库连接异常 */ - public Connection getConnection() throws SQLException { - try { - Connection connection = connectionPool.getConnection(); - logger.debug("从连接池获取数据库连接成功"); - return connection; - } catch (SQLException e) { - logger.error("从连接池获取数据库连接失败", e); - throw e; + public synchronized Connection getConnection() throws SQLException { + if (connectionInUse) { + throw new SQLException("数据库连接正在使用中,程序可能存在并发问题"); + } + + if (connection == null || connection.isClosed() || !isConnectionValid(connection)) { + closeConnection(); // 关闭旧连接 + connection = createNewConnection(); + logger.info("创建新的数据库连接"); + } else { + logger.debug("复用现有数据库连接"); + } + + connectionInUse = true; + return connection; + } + + /** + * 归还连接(标记为可用状态) + */ + public synchronized void returnConnection(Connection conn) { + if (conn == this.connection) { + connectionInUse = false; + logger.debug("数据库连接已归还"); } } - + /** - * 关闭数据库连接(返回连接池) - * - * @param connection 数据库连接 + * 关闭连接 */ - public void closeConnection(Connection connection) { - if (connection != null) { + public synchronized void closeConnection(Connection conn) { + if (conn != null) { try { - // 将连接返回连接池而不是直接关闭 - connectionPool.returnConnection(connection); - logger.debug("数据库连接已返回连接池"); - } catch (Exception e) { - logger.error("返回数据库连接到连接池失败", e); - // 如果返回连接池失败,直接关闭连接 - try { - if (!connection.isClosed()) { - connection.close(); - } - } catch (SQLException closeException) { - logger.error("关闭数据库连接失败", closeException); + if (!conn.isClosed()) { + conn.close(); + logger.debug("数据库连接已关闭"); + } + } catch (SQLException e) { + logger.error("关闭数据库连接失败", e); + } finally { + if (conn == this.connection) { + this.connection = null; + connectionInUse = false; } } } } - + /** - * 提交事务 - * - * @param connection 数据库连接 + * 关闭当前连接 */ + public synchronized void closeConnection() { + closeConnection(this.connection); + } + + /** + * 创建新连接 + */ + private Connection createNewConnection() throws SQLException { + Connection conn = DriverManager.getConnection(url, username, password); + conn.setAutoCommit(false); // 批量处理需要手动控制事务 + return conn; + } + + /** + * 检查连接有效性 + */ + private boolean isConnectionValid(Connection conn) { + if (conn == null) return false; + + try { + return !conn.isClosed() && conn.isValid(5); // 5秒超时检查 + } catch (SQLException e) { + logger.debug("检查连接有效性失败", e); + return false; + } + } + public void commit(Connection connection) { if (connection != null) { try { @@ -110,12 +158,7 @@ public class DatabaseConnection { } } } - - /** - * 回滚事务 - * - * @param connection 数据库连接 - */ + public void rollback(Connection connection) { if (connection != null) { try { @@ -126,43 +169,23 @@ public class DatabaseConnection { } } } - - /** - * 测试数据库连接 - * - * @return 连接是否成功 - */ + public boolean testConnection() { - Connection connection = null; - try { - connection = getConnection(); - logger.info("数据库连接测试成功: 数据库类型={}, 连接池状态: 当前连接数={}, 可用连接数={}", - dbType, connectionPool.getCurrentPoolSize(), connectionPool.getAvailableConnections()); + try (Connection testConn = createNewConnection()) { + logger.info("数据库连接测试成功"); return true; } catch (SQLException e) { logger.error("数据库连接测试失败", e); return false; - } finally { - closeConnection(connection); } } - - /** - * 获取当前数据库类型 - * - * @return 数据库类型 - */ - public String getDbType() { - return dbType; + + public void shutdown() { + closeConnection(); + logger.info("数据库连接已关闭"); } - - /** - * 执行查询并返回整数结果 - * - * @param sql SQL语句 - * @param params 参数 - * @return 查询结果 - */ + + // 原有的查询方法保持兼容 public int queryForInt(String sql, Object... params) { Connection connection = null; try { @@ -172,27 +195,19 @@ public class DatabaseConnection { logger.error("执行查询SQL失败: {}", sql, e); throw new RuntimeException("查询执行失败", e); } finally { - closeConnection(connection); + if (connection != null) { + returnConnection(connection); + } } } - - /** - * 使用指定连接执行查询并返回整数结果 - * - * @param connection 数据库连接 - * @param sql SQL语句 - * @param params 参数 - * @return 查询结果 - * @throws SQLException SQL异常 - */ + private int queryForInt(Connection connection, String sql, Object... params) throws SQLException { - try (PreparedStatement ps = connection.prepareStatement(sql)) { - // 设置参数 + try (java.sql.PreparedStatement ps = connection.prepareStatement(sql)) { for (int i = 0; i < params.length; i++) { ps.setObject(i + 1, params[i]); } - - try (ResultSet rs = ps.executeQuery()) { + + try (java.sql.ResultSet rs = ps.executeQuery()) { if (rs.next()) { return rs.getInt(1); } else { @@ -201,25 +216,4 @@ public class DatabaseConnection { } } } - - /** - * 获取连接池状态信息 - * - * @return 连接池状态字符串 - */ - public String getConnectionPoolStatus() { - return String.format("连接池状态: 当前连接数=%d, 可用连接数=%d", - connectionPool.getCurrentPoolSize(), connectionPool.getAvailableConnections()); - } - - /** - * 关闭连接池中的所有连接 - * 在程序停止时调用 - */ - public void shutdown() { - if (connectionPool != null) { - connectionPool.closeAllConnections(); - logger.info("数据库连接池已关闭"); - } - } } \ No newline at end of file diff --git a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/SimpleConnectionPool.java b/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/SimpleConnectionPool.java deleted file mode 100644 index 1aa0a6e..0000000 --- a/MpsFormalBatch/src/main/java/com/gtsoft/mps/batch/util/SimpleConnectionPool.java +++ /dev/null @@ -1,202 +0,0 @@ -package com.gtsoft.mps.batch.util; - -import com.gtsoft.mps.batch.config.ConfigManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.sql.DataSource; -import java.sql.Connection; -import java.sql.SQLException; -import java.util.Properties; - -/** - * 简单连接池实现 - * 为了避免引入第三方依赖,实现简单的连接池 - * - * @author mps-batch - * @version 1.0 - */ -public class SimpleConnectionPool { - - private static final Logger logger = LoggerFactory.getLogger(SimpleConnectionPool.class); - - private static volatile SimpleConnectionPool instance; - private final java.util.Queue connectionPool; - private final int maxPoolSize; - private final int initialPoolSize; - private final String url; - private final String username; - private final String password; - private final String driver; - private int currentPoolSize = 0; - - private SimpleConnectionPool() { - this.maxPoolSize = ConfigManager.getInt("db.pool.maxActive", 10); - this.initialPoolSize = ConfigManager.getInt("db.pool.initialSize", 3); - - String dbType = ConfigManager.getString("db.type", "mysql"); - if ("mysql".equalsIgnoreCase(dbType)) { - this.url = ConfigManager.getString("db.mysql.url"); - this.username = ConfigManager.getString("db.mysql.username"); - this.password = ConfigManager.getString("db.mysql.password"); - this.driver = ConfigManager.getString("db.mysql.driver"); - } else if ("dm".equalsIgnoreCase(dbType)) { - this.url = ConfigManager.getString("db.dm.url"); - this.username = ConfigManager.getString("db.dm.username"); - this.password = ConfigManager.getString("db.dm.password"); - this.driver = ConfigManager.getString("db.dm.driver"); - } else { - throw new RuntimeException("不支持的数据库类型: " + dbType); - } - - // 加载驱动 - try { - Class.forName(driver); - logger.info("数据库驱动加载成功: {}", driver); - } catch (ClassNotFoundException e) { - logger.error("数据库驱动加载失败: {}", driver, e); - throw new RuntimeException("数据库驱动加载失败: " + driver, e); - } - - this.connectionPool = new java.util.concurrent.ConcurrentLinkedQueue<>(); - - // 初始化连接池 - initializePool(); - - logger.info("连接池初始化完成,初始连接数: {}, 最大连接数: {}", initialPoolSize, maxPoolSize); - } - - public static SimpleConnectionPool getInstance() { - if (instance == null) { - synchronized (SimpleConnectionPool.class) { - if (instance == null) { - instance = new SimpleConnectionPool(); - } - } - } - return instance; - } - - private void initializePool() { - for (int i = 0; i < initialPoolSize; i++) { - try { - Connection connection = createNewConnection(); - connectionPool.offer(connection); - currentPoolSize++; - } catch (SQLException e) { - logger.error("初始化连接池时创建连接失败", e); - } - } - } - - private Connection createNewConnection() throws SQLException { - Connection connection = java.sql.DriverManager.getConnection(url, username, password); - connection.setAutoCommit(false); - logger.debug("创建新的数据库连接"); - return connection; - } - - public synchronized Connection getConnection() throws SQLException { - Connection connection = connectionPool.poll(); - - if (connection == null || connection.isClosed()) { - if (currentPoolSize < maxPoolSize) { - connection = createNewConnection(); - currentPoolSize++; - logger.debug("创建新连接,当前连接池大小: {}", currentPoolSize); - } else { - // 连接池已满,等待一段时间后再次尝试 - logger.warn("连接池已满,等待连接释放..."); - - // 简单的等待机制,等待最多5秒 - for (int i = 0; i < 50; i++) { - try { - Thread.sleep(100); // 等待100ms - connection = connectionPool.poll(); - if (connection != null && !connection.isClosed() && isConnectionValid(connection)) { - logger.debug("从连接池获取到释放的连接"); - return connection; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SQLException("等待连接被中断", e); - } - } - - // 如果等待后仍无可用连接,抛出异常 - throw new SQLException("无法获取数据库连接,连接池已耗尽,请检查连接池配置或是否存在连接泄漏"); - } - } else { - // 从连接池获取到连接 - logger.debug("从连接池获取连接成功"); - } - - // 测试连接有效性 - if (!isConnectionValid(connection)) { - logger.warn("连接无效,重新创建"); - try { - connection.close(); - } catch (SQLException e) { - logger.debug("关闭无效连接时发生异常", e); - } - - // 重新创建连接 - if (currentPoolSize < maxPoolSize) { - connection = createNewConnection(); - currentPoolSize++; - } else { - throw new SQLException("无法创建新连接,连接池已满"); - } - } - - return connection; - } - - public synchronized void returnConnection(Connection connection) { - if (connection != null) { - try { - if (!connection.isClosed() && isConnectionValid(connection)) { - connectionPool.offer(connection); - logger.debug("连接已返回连接池"); - } else { - logger.debug("连接已关闭或无效,不返回连接池"); - currentPoolSize--; - } - } catch (SQLException e) { - logger.error("检查连接状态失败", e); - currentPoolSize--; - } - } - } - - private boolean isConnectionValid(Connection connection) { - try { - return connection != null && !connection.isClosed() && connection.isValid(5); - } catch (SQLException e) { - return false; - } - } - - public void closeAllConnections() { - Connection connection; - while ((connection = connectionPool.poll()) != null) { - try { - if (!connection.isClosed()) { - connection.close(); - } - } catch (SQLException e) { - logger.error("关闭连接失败", e); - } - } - currentPoolSize = 0; - logger.info("连接池已关闭"); - } - - public int getCurrentPoolSize() { - return currentPoolSize; - } - - public int getAvailableConnections() { - return connectionPool.size(); - } -} \ No newline at end of file