ic卡换卡添加新的账号条件

删除连接池
This commit is contained in:
CYQ 2025-11-05 16:00:35 +08:00
parent 3617709df5
commit c821f7057f
6 changed files with 147 additions and 560 deletions

View File

@ -6,25 +6,14 @@ db.mysql.password=ENC(K2gWg4+uYaXiUFDXQWXDBA==)
db.mysql.driver=com.mysql.cj.jdbc.Driver
# Database Configuration - DM
db.dm.url=jdbc:dm://101.42.2.210:5236/CDSDB?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8
db.dm.username=cdsdb
db.dm.password=GTsoft+123.
db.dm.url=jdbc:dm://xx.xx.x.x:x/xxDB?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8
db.dm.username=
db.dm.password=
db.dm.driver=dm.jdbc.driver.DmDriver
# Database Type (mysql/dm)
db.type=mysql
# Connection Pool Settings
db.pool.maxActive=10
db.pool.initialSize=3
db.pool.maxWait=60000
db.pool.timeBetweenEvictionRunsMillis=60000
db.pool.minEvictableIdleTimeMillis=300000
db.pool.validationQuery=SELECT 1
db.pool.testWhileIdle=true
db.pool.testOnBorrow=false
db.pool.testOnReturn=false
# Batch Settings
batch.retry.maxTimes=3
batch.timeout=1800000

View File

@ -43,6 +43,8 @@ public class MpsFormalBatchMain {
// 最大等待时间小时
private static final int MAX_WAIT_HOURS = 240;
// 状态检查间隔时间
private static final int WAIT_TIME = 10;
public MpsFormalBatchMain() {
this.batchStatusService = new BatchStatusService();
@ -197,8 +199,8 @@ public class MpsFormalBatchMain {
}
// 等待60分钟后重新检查
batchLogger.info("等待10分钟后重新检查状态");
TimeUnit.MINUTES.sleep(10);
batchLogger.info("等待{}分钟后重新检查状态",WAIT_TIME);
TimeUnit.MINUTES.sleep(WAIT_TIME);
return executeBatch(context); // 递归调用
}
@ -298,8 +300,11 @@ public class MpsFormalBatchMain {
*/
private boolean executeNormalBatch(String batchMonth) {
try {
batchLogger.info("--------->>>>准备执行批量");
// 检查是否可以执行预处理批量
// importStatus=0 and (preBatchStatus=0 or preBatchStatus=3)
// importStatus=1 and (preBatchStatus=0 or preBatchStatus=3)
if (batchStatusService.canExecutePreBatch(batchMonth)) {
// 更新状态为进行中
batchStatusService.updateBatchStatus(
@ -344,6 +349,8 @@ public class MpsFormalBatchMain {
return executeNormalBatch(batchMonth); // 递归调用
}
batchLogger.info("---------<<<<批量执行完成");
// 如果都不满足可能是未知状态
batchLogger.error("未知状态,无法处理");
return false;

View File

@ -39,6 +39,9 @@ public class BatchStatusService {
public static final String MARKETING_PERFORMANCE_MD = "marketing_performance_md";
// 正式批量第二次正式处理汇总结果表
public static final String MARKETING_PERFORMANCE_M = "marketing_performance_m";
// 状态检查间隔时间
private static final int WAIT_TIME = 10;
private static final Logger logger = LoggerFactory.getLogger(BatchStatusService.class);
private static final Logger batchLogger = LoggerFactory.getLogger("BATCH_EXECUTION");
// 更新批量状态等栏位
@ -275,8 +278,8 @@ public class BatchStatusService {
long maxWaitMillis = TimeUnit.HOURS.toMillis(maxWaitHours);
while (System.currentTimeMillis() - startTime < maxWaitMillis) {
batchLogger.info("等待导入状态完成,10分钟后重新检查...");
TimeUnit.MINUTES.sleep(10);
batchLogger.info("等待导入状态完成,{}分钟后重新检查...",WAIT_TIME);
TimeUnit.MINUTES.sleep(WAIT_TIME);
// 检查导入状态是否完成
if (isImportCompleted(batchMonth)) {
@ -302,8 +305,8 @@ public class BatchStatusService {
long maxWaitMillis = TimeUnit.HOURS.toMillis(maxWaitHours);
while (System.currentTimeMillis() - startTime < maxWaitMillis) {
batchLogger.info("等待预处理和核对状态完成,10分钟后重新检查...");
TimeUnit.MINUTES.sleep(10);
batchLogger.info("等待预处理和核对状态完成,{}分钟后重新检查...",WAIT_TIME);
TimeUnit.MINUTES.sleep(WAIT_TIME);
// 检查预处理和核对状态是否完成
if (isPreBatchCompletedAndChecked(batchMonth)) {

View File

@ -1,204 +0,0 @@
package com.gtsoft.mps.batch.test;
import com.gtsoft.mps.batch.config.LogConfig;
import com.gtsoft.mps.batch.util.DatabaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 数据库连接池健壮性测试
* 用于测试连接池的性能和健壮性
*
* @author mps-batch
* @version 1.0
*/
public class ConnectionPoolRobustnessTest {
private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolRobustnessTest.class);
private static final Logger batchLogger = LoggerFactory.getLogger("BATCH_EXECUTION");
public static void main(String[] args) {
// 初始化日志配置
LogConfig.initLogConfig();
System.out.println("========== 数据库连接池健壮性测试开始 ==========");
batchLogger.info("========== 数据库连接池健壮性测试开始 ==========");
ConnectionPoolRobustnessTest test = new ConnectionPoolRobustnessTest();
try {
// 测试1基本连接获取和释放
test.testBasicConnectionOps();
// 测试2连接池容量测试
test.testConnectionPoolCapacity();
// 测试3并发连接测试
test.testConcurrentConnections();
// 测试4连接泄漏检测
test.testConnectionLeak();
} catch (Exception e) {
logger.error("测试过程中出现异常", e);
} finally {
// 关闭连接池
DatabaseConnection.getInstance().shutdown();
}
System.out.println("========== 数据库连接池健壮性测试结束 ==========");
batchLogger.info("========== 数据库连接池健壮性测试结束 ==========");
}
/**
* 测试基本连接操作
*/
private void testBasicConnectionOps() {
batchLogger.info("===============================");
batchLogger.info("测试1基本连接获取和释放测试");
DatabaseConnection dbConnection = DatabaseConnection.getInstance();
try {
// 连接池状态检查
batchLogger.info("初始状态:{}", dbConnection.getConnectionPoolStatus());
// 获取连接
Connection conn1 = dbConnection.getConnection();
batchLogger.info("获取连接1后{}", dbConnection.getConnectionPoolStatus());
Connection conn2 = dbConnection.getConnection();
batchLogger.info("获取连接2后{}", dbConnection.getConnectionPoolStatus());
// 释放连接
dbConnection.closeConnection(conn1);
batchLogger.info("释放连接1后{}", dbConnection.getConnectionPoolStatus());
dbConnection.closeConnection(conn2);
batchLogger.info("释放连接2后{}", dbConnection.getConnectionPoolStatus());
batchLogger.info("基本连接操作测试通过");
} catch (Exception e) {
batchLogger.error("基本连接操作测试失败", e);
}
}
/**
* 测试连接池容量
*/
private void testConnectionPoolCapacity() {
batchLogger.info("===============================");
batchLogger.info("测试2连接池容量测试");
DatabaseConnection dbConnection = DatabaseConnection.getInstance();
List<Connection> connections = new ArrayList<>();
try {
// 尝试获取最大数量的连接
for (int i = 0; i < 12; i++) { // 超过最大连接数10
try {
Connection conn = dbConnection.getConnection();
connections.add(conn);
batchLogger.info("获取连接{}{}", i + 1, dbConnection.getConnectionPoolStatus());
} catch (SQLException e) {
batchLogger.warn("无法获取连接{}{}", i + 1, e.getMessage());
break;
}
}
batchLogger.info("连接池容量测试完成,成功获取{}个连接", connections.size());
} finally {
// 释放所有连接
for (Connection conn : connections) {
dbConnection.closeConnection(conn);
}
batchLogger.info("释放所有连接后:{}", dbConnection.getConnectionPoolStatus());
}
}
/**
* 测试并发连接
*/
private void testConcurrentConnections() {
batchLogger.info("===============================");
batchLogger.info("测试3并发连接测试");
DatabaseConnection dbConnection = DatabaseConnection.getInstance();
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
// 创建5个并发任务
for (int i = 0; i < 5; i++) {
final int taskId = i + 1;
Future<?> future = executor.submit(() -> {
try {
Connection conn = dbConnection.getConnection();
batchLogger.info("任务{}获取连接成功:{}", taskId, dbConnection.getConnectionPoolStatus());
// 模拟使用连接
Thread.sleep(1000);
dbConnection.closeConnection(conn);
batchLogger.info("任务{}释放连接:{}", taskId, dbConnection.getConnectionPoolStatus());
} catch (Exception e) {
batchLogger.error("任务{}执行失败", taskId, e);
}
});
futures.add(future);
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
batchLogger.error("等待任务完成时出现异常", e);
}
}
executor.shutdown();
batchLogger.info("并发连接测试完成:{}", dbConnection.getConnectionPoolStatus());
}
/**
* 测试连接泄漏检测
*/
private void testConnectionLeak() {
batchLogger.info("===============================");
batchLogger.info("测试4连接泄漏检测测试");
DatabaseConnection dbConnection = DatabaseConnection.getInstance();
try {
batchLogger.info("测试前:{}", dbConnection.getConnectionPoolStatus());
// 模拟连接泄漏获取连接但不释放
for (int i = 0; i < 3; i++) {
@SuppressWarnings("unused")
Connection conn = dbConnection.getConnection();
batchLogger.info("获取连接{}(不释放):{}", i + 1, dbConnection.getConnectionPoolStatus());
}
// 强制垃圾回收
System.gc();
Thread.sleep(1000);
batchLogger.info("连接泄漏测试完成:{}", dbConnection.getConnectionPoolStatus());
batchLogger.warn("注意:这个测试模拟了连接泄漏情况,连接池中可能有未释放的连接");
} catch (Exception e) {
batchLogger.error("连接泄漏检测测试失败", e);
}
}
}

View File

@ -4,43 +4,57 @@ import com.gtsoft.mps.batch.config.ConfigManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* 数据库连接管理类
* 支持MySQL和达梦数据库使用连接池管理连接
*
* 简化的数据库连接管理 - 适合顺序执行的批量程序
*
* @author mps-batch
* @version 1.0
*/
public class DatabaseConnection {
private static final Logger logger = LoggerFactory.getLogger(DatabaseConnection.class);
private static volatile DatabaseConnection instance;
private final String dbType;
private final SimpleConnectionPool connectionPool;
/**
* 私有构造方法实现单例模式
*/
private Connection connection;
private boolean connectionInUse = false;
private final String url;
private final String username;
private final String password;
private final String driver;
private DatabaseConnection() {
System.out.println("[DatabaseConnection] 开始初始化数据库连接管理器...");
this.dbType = ConfigManager.getString("db.type", "mysql");
System.out.println("[DatabaseConnection] dbType=" + dbType);
// 初始化连接池
this.connectionPool = SimpleConnectionPool.getInstance();
System.out.println("[DatabaseConnection] 初始化完成,使用连接池管理连接");
System.out.println("[DatabaseConnection] 初始化简单数据库连接管理...");
String dbType = ConfigManager.getString("db.type", "mysql");
System.out.println("[DatabaseConnection] 数据库类型: " + dbType);
if ("mysql".equalsIgnoreCase(dbType)) {
this.url = ConfigManager.getString("db.mysql.url");
this.username = ConfigManager.getString("db.mysql.username");
this.password = ConfigManager.getString("db.mysql.password");
this.driver = ConfigManager.getString("db.mysql.driver", "com.mysql.cj.jdbc.Driver");
} else if ("dm".equalsIgnoreCase(dbType)) {
this.url = ConfigManager.getString("db.dm.url");
this.username = ConfigManager.getString("db.dm.username");
this.password = ConfigManager.getString("db.dm.password");
this.driver = ConfigManager.getString("db.dm.driver", "dm.jdbc.driver.DmDriver");
} else {
throw new RuntimeException("不支持的数据库类型: " + dbType);
}
// 加载驱动
try {
Class.forName(driver);
logger.info("数据库驱动加载成功: {}", driver);
} catch (ClassNotFoundException e) {
logger.error("数据库驱动加载失败: {}", driver, e);
throw new RuntimeException("数据库驱动加载失败: " + driver, e);
}
}
/**
* 获取单例实例
*
* @return DatabaseConnection实例
*/
public static DatabaseConnection getInstance() {
if (instance == null) {
synchronized (DatabaseConnection.class) {
@ -51,54 +65,88 @@ public class DatabaseConnection {
}
return instance;
}
/**
* 获取数据库连接
*
* @return 数据库连接
* @throws SQLException 数据库连接异常
*/
public Connection getConnection() throws SQLException {
try {
Connection connection = connectionPool.getConnection();
logger.debug("从连接池获取数据库连接成功");
return connection;
} catch (SQLException e) {
logger.error("从连接池获取数据库连接失败", e);
throw e;
public synchronized Connection getConnection() throws SQLException {
if (connectionInUse) {
throw new SQLException("数据库连接正在使用中,程序可能存在并发问题");
}
if (connection == null || connection.isClosed() || !isConnectionValid(connection)) {
closeConnection(); // 关闭旧连接
connection = createNewConnection();
logger.info("创建新的数据库连接");
} else {
logger.debug("复用现有数据库连接");
}
connectionInUse = true;
return connection;
}
/**
* 归还连接标记为可用状态
*/
public synchronized void returnConnection(Connection conn) {
if (conn == this.connection) {
connectionInUse = false;
logger.debug("数据库连接已归还");
}
}
/**
* 关闭数据库连接返回连接池
*
* @param connection 数据库连接
* 关闭连接
*/
public void closeConnection(Connection connection) {
if (connection != null) {
public synchronized void closeConnection(Connection conn) {
if (conn != null) {
try {
// 将连接返回连接池而不是直接关闭
connectionPool.returnConnection(connection);
logger.debug("数据库连接已返回连接池");
} catch (Exception e) {
logger.error("返回数据库连接到连接池失败", e);
// 如果返回连接池失败直接关闭连接
try {
if (!connection.isClosed()) {
connection.close();
}
} catch (SQLException closeException) {
logger.error("关闭数据库连接失败", closeException);
if (!conn.isClosed()) {
conn.close();
logger.debug("数据库连接已关闭");
}
} catch (SQLException e) {
logger.error("关闭数据库连接失败", e);
} finally {
if (conn == this.connection) {
this.connection = null;
connectionInUse = false;
}
}
}
}
/**
* 提交事务
*
* @param connection 数据库连接
* 关闭当前连接
*/
public synchronized void closeConnection() {
closeConnection(this.connection);
}
/**
* 创建新连接
*/
private Connection createNewConnection() throws SQLException {
Connection conn = DriverManager.getConnection(url, username, password);
conn.setAutoCommit(false); // 批量处理需要手动控制事务
return conn;
}
/**
* 检查连接有效性
*/
private boolean isConnectionValid(Connection conn) {
if (conn == null) return false;
try {
return !conn.isClosed() && conn.isValid(5); // 5秒超时检查
} catch (SQLException e) {
logger.debug("检查连接有效性失败", e);
return false;
}
}
public void commit(Connection connection) {
if (connection != null) {
try {
@ -110,12 +158,7 @@ public class DatabaseConnection {
}
}
}
/**
* 回滚事务
*
* @param connection 数据库连接
*/
public void rollback(Connection connection) {
if (connection != null) {
try {
@ -126,43 +169,23 @@ public class DatabaseConnection {
}
}
}
/**
* 测试数据库连接
*
* @return 连接是否成功
*/
public boolean testConnection() {
Connection connection = null;
try {
connection = getConnection();
logger.info("数据库连接测试成功: 数据库类型={}, 连接池状态: 当前连接数={}, 可用连接数={}",
dbType, connectionPool.getCurrentPoolSize(), connectionPool.getAvailableConnections());
try (Connection testConn = createNewConnection()) {
logger.info("数据库连接测试成功");
return true;
} catch (SQLException e) {
logger.error("数据库连接测试失败", e);
return false;
} finally {
closeConnection(connection);
}
}
/**
* 获取当前数据库类型
*
* @return 数据库类型
*/
public String getDbType() {
return dbType;
public void shutdown() {
closeConnection();
logger.info("数据库连接已关闭");
}
/**
* 执行查询并返回整数结果
*
* @param sql SQL语句
* @param params 参数
* @return 查询结果
*/
// 原有的查询方法保持兼容
public int queryForInt(String sql, Object... params) {
Connection connection = null;
try {
@ -172,27 +195,19 @@ public class DatabaseConnection {
logger.error("执行查询SQL失败: {}", sql, e);
throw new RuntimeException("查询执行失败", e);
} finally {
closeConnection(connection);
if (connection != null) {
returnConnection(connection);
}
}
}
/**
* 使用指定连接执行查询并返回整数结果
*
* @param connection 数据库连接
* @param sql SQL语句
* @param params 参数
* @return 查询结果
* @throws SQLException SQL异常
*/
private int queryForInt(Connection connection, String sql, Object... params) throws SQLException {
try (PreparedStatement ps = connection.prepareStatement(sql)) {
// 设置参数
try (java.sql.PreparedStatement ps = connection.prepareStatement(sql)) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
try (ResultSet rs = ps.executeQuery()) {
try (java.sql.ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
return rs.getInt(1);
} else {
@ -201,25 +216,4 @@ public class DatabaseConnection {
}
}
}
/**
* 获取连接池状态信息
*
* @return 连接池状态字符串
*/
public String getConnectionPoolStatus() {
return String.format("连接池状态: 当前连接数=%d, 可用连接数=%d",
connectionPool.getCurrentPoolSize(), connectionPool.getAvailableConnections());
}
/**
* 关闭连接池中的所有连接
* 在程序停止时调用
*/
public void shutdown() {
if (connectionPool != null) {
connectionPool.closeAllConnections();
logger.info("数据库连接池已关闭");
}
}
}

View File

@ -1,202 +0,0 @@
package com.gtsoft.mps.batch.util;
import com.gtsoft.mps.batch.config.ConfigManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
/**
* 简单连接池实现
* 为了避免引入第三方依赖实现简单的连接池
*
* @author mps-batch
* @version 1.0
*/
public class SimpleConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(SimpleConnectionPool.class);
private static volatile SimpleConnectionPool instance;
private final java.util.Queue<Connection> connectionPool;
private final int maxPoolSize;
private final int initialPoolSize;
private final String url;
private final String username;
private final String password;
private final String driver;
private int currentPoolSize = 0;
private SimpleConnectionPool() {
this.maxPoolSize = ConfigManager.getInt("db.pool.maxActive", 10);
this.initialPoolSize = ConfigManager.getInt("db.pool.initialSize", 3);
String dbType = ConfigManager.getString("db.type", "mysql");
if ("mysql".equalsIgnoreCase(dbType)) {
this.url = ConfigManager.getString("db.mysql.url");
this.username = ConfigManager.getString("db.mysql.username");
this.password = ConfigManager.getString("db.mysql.password");
this.driver = ConfigManager.getString("db.mysql.driver");
} else if ("dm".equalsIgnoreCase(dbType)) {
this.url = ConfigManager.getString("db.dm.url");
this.username = ConfigManager.getString("db.dm.username");
this.password = ConfigManager.getString("db.dm.password");
this.driver = ConfigManager.getString("db.dm.driver");
} else {
throw new RuntimeException("不支持的数据库类型: " + dbType);
}
// 加载驱动
try {
Class.forName(driver);
logger.info("数据库驱动加载成功: {}", driver);
} catch (ClassNotFoundException e) {
logger.error("数据库驱动加载失败: {}", driver, e);
throw new RuntimeException("数据库驱动加载失败: " + driver, e);
}
this.connectionPool = new java.util.concurrent.ConcurrentLinkedQueue<>();
// 初始化连接池
initializePool();
logger.info("连接池初始化完成,初始连接数: {}, 最大连接数: {}", initialPoolSize, maxPoolSize);
}
public static SimpleConnectionPool getInstance() {
if (instance == null) {
synchronized (SimpleConnectionPool.class) {
if (instance == null) {
instance = new SimpleConnectionPool();
}
}
}
return instance;
}
private void initializePool() {
for (int i = 0; i < initialPoolSize; i++) {
try {
Connection connection = createNewConnection();
connectionPool.offer(connection);
currentPoolSize++;
} catch (SQLException e) {
logger.error("初始化连接池时创建连接失败", e);
}
}
}
private Connection createNewConnection() throws SQLException {
Connection connection = java.sql.DriverManager.getConnection(url, username, password);
connection.setAutoCommit(false);
logger.debug("创建新的数据库连接");
return connection;
}
public synchronized Connection getConnection() throws SQLException {
Connection connection = connectionPool.poll();
if (connection == null || connection.isClosed()) {
if (currentPoolSize < maxPoolSize) {
connection = createNewConnection();
currentPoolSize++;
logger.debug("创建新连接,当前连接池大小: {}", currentPoolSize);
} else {
// 连接池已满等待一段时间后再次尝试
logger.warn("连接池已满,等待连接释放...");
// 简单的等待机制等待最多5秒
for (int i = 0; i < 50; i++) {
try {
Thread.sleep(100); // 等待100ms
connection = connectionPool.poll();
if (connection != null && !connection.isClosed() && isConnectionValid(connection)) {
logger.debug("从连接池获取到释放的连接");
return connection;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException("等待连接被中断", e);
}
}
// 如果等待后仍无可用连接抛出异常
throw new SQLException("无法获取数据库连接,连接池已耗尽,请检查连接池配置或是否存在连接泄漏");
}
} else {
// 从连接池获取到连接
logger.debug("从连接池获取连接成功");
}
// 测试连接有效性
if (!isConnectionValid(connection)) {
logger.warn("连接无效,重新创建");
try {
connection.close();
} catch (SQLException e) {
logger.debug("关闭无效连接时发生异常", e);
}
// 重新创建连接
if (currentPoolSize < maxPoolSize) {
connection = createNewConnection();
currentPoolSize++;
} else {
throw new SQLException("无法创建新连接,连接池已满");
}
}
return connection;
}
public synchronized void returnConnection(Connection connection) {
if (connection != null) {
try {
if (!connection.isClosed() && isConnectionValid(connection)) {
connectionPool.offer(connection);
logger.debug("连接已返回连接池");
} else {
logger.debug("连接已关闭或无效,不返回连接池");
currentPoolSize--;
}
} catch (SQLException e) {
logger.error("检查连接状态失败", e);
currentPoolSize--;
}
}
}
private boolean isConnectionValid(Connection connection) {
try {
return connection != null && !connection.isClosed() && connection.isValid(5);
} catch (SQLException e) {
return false;
}
}
public void closeAllConnections() {
Connection connection;
while ((connection = connectionPool.poll()) != null) {
try {
if (!connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
logger.error("关闭连接失败", e);
}
}
currentPoolSize = 0;
logger.info("连接池已关闭");
}
public int getCurrentPoolSize() {
return currentPoolSize;
}
public int getAvailableConnections() {
return connectionPool.size();
}
}