【深入理解MyBatis】- 02Mybatis连接池pool讲解和简要实现

    技术2022-07-10  166

    连接池概念

    空闲连接队列与活跃队列

    获取connection,空闲连接队列与活跃队列均为空时

    获取connection,空闲连接队列不为空

    获取connection,活跃连接队列已经满了

    关闭connection,空闲连接队列还有空间

    关闭connection,空闲连接队列没有空间,直接关闭

    未显示关闭connection处理,超时概念

    未显示关闭connection处理,当Client获取连接

    连接池PooledDataSource伪代码

    Mybatis连接池代码参考org.apache.ibatis.datasource.pooled.PooledDataSource, 主要方法包括获取连接popConnection方法, 释放连接popConnection方法,这里给出伪代码思路,先明白连接池的工作原理,后面我再给出一个简要版的连接池。

    // 获取连接 popConnection() { Connection connection = null; while ( connection == null) { 1. 池中空闲连接队列是否为空,否(Pool has available connection) 1.1 直接从空闲连接队列中获取连接connection 2. 池中空闲连接队列是否为空,是(Pool does not have available connection) 2.1 判断正在使用队列长度是否大于允许使用的最大连接数,否 2.1.1 创建新的Connection对象 2.2 判断正在使用队列长度是否大于允许使用的最大连接数,是 // 无法再创建新的连接,尝试回收历史(旧)使用的连接对象 // 为什么可以回收正在使用的连接数对象呢? // 因为Client在使用connection以后,未显示调用close方法,导致connection状态显示为正在使用,实际上已经是空闲状态 // 如何使用界限来区分哪一些应该回收呢?定义了poolMaximumCheckoutTime字段,意义为超时时间 // 从获取连接开始计算时间,到回收时,计算这段时间是否大于超时时间,如果大于,将这个连接对象回收 2.2.1 判断旧连接对象的使用时间是否大于超时时间,是 2.2.1.1 回收旧连接对象,从可用连接队列remove 2.2.2 判断旧连接对象的使用时间是否大于超时时间,否 // Must wait 2.2.2.1 执行wait逻辑,睡眠xxx时间,除非提前唤醒(有新的connection归还) 3. 若获取到的connection不为空,判断connection是否可用(ping to server and check the connection is valid or not) 3.1 connection是否, 是 3.1.1 初始化连接,并重新放置可用连接队列 3.2 connection是否, 否 3.2.1 connection == null,重新获取 } 4. 若connection为空,抛出SQLException异常,获取连接对象connection失败 5. 返回connection } // 释放连接 pushConnection(Connection connection) { 1. 从可用连接队列remove归还的连接connection 2. 判断归还的连接connection是否可用,是 2.1 判断空闲队列是否已经满了,否 2.1.1 将归还的连接connection添加到空闲队列中 2.1.2 并执行线程notifyAll,可能有的线程正在等着用连接呢 2.2 判断空闲队列是否已经满了,是 2.2.1 将归还的连接connection关闭 3. 判断归还的连接connection是否可用,否 3.1 忽略,可以不用看 }

    非连接池实现

    为了同Mybatis连接池代码保持一致,这里先实现非连接池实现,这里使用的JDK8

    先对接口DataSource实现一些不需要的方法,保留抽象方法public Connection getConnection() throws SQLException与public Connection getConnection(String username, String password) throws SQLException即可

    public abstract class SourceImplWrapper implements DataSource { @Override public PrintWriter getLogWriter() throws SQLException { return DriverManager.getLogWriter(); } @Override public int getLoginTimeout() throws SQLException { return DriverManager.getLoginTimeout(); } @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException { return Logger.getGlobal(); } @Override public void setLogWriter(PrintWriter printWriter) throws SQLException { DriverManager.setLogWriter(printWriter); } @Override public void setLoginTimeout(int loginTimeout) throws SQLException { DriverManager.setLoginTimeout(loginTimeout); } @Override public boolean isWrapperFor(Class<?> arg0) throws SQLException { return false; } @Override public <T> T unwrap(Class<T> arg0) throws SQLException { return null; } }

    非连接池实现代码,也就是我们平时使用到的JDBC方式,这个是重写后的版本,简写的很多细节,不过可运行

    public class UnpooledSourceImpl extends SourceImplWrapper { private static Map<String, Boolean> registeredDrivers = new ConcurrentHashMap<>(); static { Enumeration<Driver> drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver driver = drivers.nextElement(); registeredDrivers.put(driver.getClass().getName(), true); } } private String driver; private String url; private String username; private String password; public UnpooledSourceImpl(String driver, String url, String username, String password) { this.driver = driver; this.url = url; this.username = username; this.password = password; } @Override public Connection getConnection() throws SQLException { return getConnection(username, password); } @Override public Connection getConnection(String username, String password) throws SQLException { return doGetConnection(username, password); } private Connection doGetConnection(String username, String password) throws SQLException { initializeDriver(); Connection connection = DriverManager.getConnection(url, username, password); return connection; } private synchronized void initializeDriver() throws SQLException { if (!registeredDrivers.containsKey(driver)) { try { Class.forName(driver); registeredDrivers.put(driver, true); } catch (Exception e) { throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e); } } } }

    测试类

    public class App { public static void main( String[] args ) throws Exception { String driver = "com.mysql.jdbc.Driver"; String url = "jdbc:mysql://localhost:3306/test?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true"; String username = "root"; String password = "root"; // 1. 使用数据源 DataSource dataSource = new UnpooledSourceImpl(driver, url, username, password); Connection connection = dataSource.getConnection(); // 3. Statement用以运行SQL语句。下面是一个插入(INSERT)的例子 Statement stmt = connection.createStatement(); stmt.executeUpdate("INSERT INTO MyTable( name ) VALUES ( 'my name' ) "); // 4. 查询(SELECT)的结果存放于结果集(ResultSet)中,可以按照顺序依次访问 ResultSet rs = stmt.executeQuery("SELECT * FROM MyTable"); while (rs.next()) { int numColumns = rs.getMetaData().getColumnCount(); for (int i = 1; i <= numColumns; i++) { // 与大部分Java API中下标的使用方法不同,字段的下标从1开始 // 当然,还有其他很多的方式(ResultSet.getXXX())获取数据 System.out.println("COLUMN " + i + " = " + rs.getObject(i)); } } rs.close(); stmt.close(); } }

    连接池实现

    Mybatis连接池代码复用了非连接池实现,下面将展示简要的连接池实现,保持了连接池的核心代码,删除了一些细节处理,这里主要是学习连接池思想

    PooledSourceImpl连接池

    public class PooledSourceImpl extends SourceImplWrapper { private UnpooledDataSource dataSource; protected final List<ConnectionProxy> idleConnections = new ArrayList<>(); protected final List<ConnectionProxy> activeConnections = new ArrayList<>(); protected int timeoutMillis = 20000; protected int poolMaximumIdleSize = 5; protected int poolMaximumActiveSize = 10; public PooledSourceImpl(String driver, String url, String username, String password) { dataSource = new UnpooledDataSource(driver, url, username, password); } @Override public Connection getConnection() throws SQLException { return popConnection().getProxyConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException { return dataSource.getConnection(username, password); } private ConnectionProxy popConnection() throws SQLException { ConnectionProxy connectionProxy = null; while (connectionProxy == null) { synchronized (dataSource) { if (!idleConnections.isEmpty()) { connectionProxy = idleConnections.remove(0); } else { if (activeConnections.size() < poolMaximumActiveSize) { connectionProxy = new ConnectionProxy(dataSource.getConnection(), this); activeConnections.add(connectionProxy); } else { // Cannot create new connection,尝试从超时connection,尝试回收使用 ConnectionProxy connection = activeConnections.get(0); long useTimestamp = connection.getUseTimestamp(); // 超时 if (useTimestamp > timeoutMillis) { connectionProxy = connection; activeConnections.remove(connection); activeConnections.add(connectionProxy); } else { // Must wait, 等待时间到期,或者主动close释放connection //long wt = System.currentTimeMillis(); try { dataSource.wait(5000); } catch (InterruptedException e) { e.printStackTrace(); } // System.out.println("Waiting as long as " + (System.currentTimeMillis() - wt) + " milliseconds for connection."); } } } } } connectionProxy.setUseTimestamp(System.currentTimeMillis()); return connectionProxy; } protected void pushConnection(ConnectionProxy connection) throws SQLException { synchronized (dataSource) { if (idleConnections.size() < poolMaximumIdleSize) { idleConnections.add(connection); activeConnections.remove(connection); dataSource.notifyAll(); } else { connection.getRealConnection().close(); } } } public void printStatus() { synchronized (dataSource) { StringBuilder builder = new StringBuilder(); builder.append("\n ---STATUS-----------------------------------------------------"); builder.append("\n activeConnections ").append(activeConnections.size()); builder.append("\n idleConnections ").append(idleConnections.size()); builder.append("\n==============================================================="); System.out.println(builder); } } }

    返回的代理对象,需要特殊处理关闭连接connection,调用close

    public class ConnectionProxy implements InvocationHandler { private final PooledSourceImpl dataSource; private final Connection realConnection; private final Connection proxyConnection; private long useTimestamp; public ConnectionProxy(Connection connection, PooledSourceImpl dataSource) { this.dataSource = dataSource; this.realConnection = connection; this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class<?>[] { Connection.class }, this); } public Connection getRealConnection() { return realConnection; } public Connection getProxyConnection() { return proxyConnection; } public long getUseTimestamp() { return System.currentTimeMillis() - useTimestamp; } public void setUseTimestamp(long useTimestamp) { this.useTimestamp = useTimestamp; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if ("close".equals(method.getName())) { dataSource.pushConnection(this); return null; } else { return method.invoke(realConnection, args); } } }

    测试类

    public class App3 { public static void main(String[] args) throws Exception { String driver = "com.mysql.jdbc.Driver"; String url = "jdbc:mysql://localhost:3306/test?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true"; String username = "root"; String password = "root"; // 1. 使用数据源 PooledSourceImpl dataSource = new PooledSourceImpl(driver, url, username, password); new Thread(() -> { for (int j = 0; j < 100; j++) { try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } dataSource.printStatus(); } }).start(); for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 10; j++) { try { Connection connection = dataSource.getConnection(); System.out.println(connection.hashCode()); Thread.sleep(new Random().nextInt(1000)); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } }

    连接池总结

    Mybatis连接池pool设计,十分简单清晰 完整代码例子

    执行命令从github上面拉取代码:git clone git@github.com:dengjili/mybatis-3-mybatis-3.4.6.git相关代码目录src/test/java priv.mybatis.example02.datasource
    Processed: 0.018, SQL: 9