连接池化技术源码详解(一)_创建连接的细节分析

/ 默认分类 / 1 条评论 / 3078浏览

连接池化技术源码详解(一)_创建连接的细节分析

一.jdbc数据库驱动集合

jdbc:derby:=org.apache.derby.jdbc.EmbeddedDriver	 
jdbc:mysql:=com.mysql.jdbc.Driver
jdbc:log4jdbc:=net.sf.log4jdbc.DriverSpy
jdbc:oracle:=oracle.jdbc.driver.OracleDriver
jdbc:microsoft:=com.microsoft.jdbc.sqlserver.SQLServerDriver	 
jdbc:jtds:=net.sourceforge.jtds.jdbc.Driver	 
jdbc:postgresql:=org.postgresql.Driver	 
jdbc:fake:=com.alibaba.druid.mock.MockDriver	 
jdbc:hsqldb:=org.hsqldb.jdbcDriver	 
jdbc:db2:=COM.ibm.db2.jdbc.app.DB2Driver
jdbc:sqlite:=org.sqlite.JDBC	 
jdbc:ingres:=com.ingres.jdbc.IngresDriver	 
jdbc:h2:=org.h2.Driver	 
jdbc:mckoi:=com.mckoi.JDBCDriver

二.JDBC创建连接源码浅析

首先来看下基本的使用jdbc操作数据库的代码逻辑

public class JdbcDriverManagerTest {

    private static final String URL = "jdbc:mysql://localhost:3306/test?characterEncoding=utf8";
    private static final String USER = "root";
    private static final String PASSWORD = "123456";

    @Test
    public void testJdbcRaw() throws Exception {

        //1.加载驱动程序
        Class.forName("com.mysql.jdbc.Driver");
        //2. 获得数据库连接
        Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);
        //3.操作数据库,实现增删改查, 连接模式有2种: createStatement / prepareStatement
        Statement stmt = conn.createStatement();
        // PreparedStatement ptmt = conn.prepareStatement(sql); //预编译SQL,减少sql执行 //预编译
        ResultSet rs = stmt.executeQuery("SELECT username, age FROM user");

        //如果有数据,rs.next()返回true
        while(rs.next()){
            System.out.println(rs.getString("username")+" 年龄:"+rs.getInt("age"));
        }

        // 4. 关闭连接
        conn.close();

    }
}

jdbc的全称是Java Database Connectivity,是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口.

所以jdbc的相关api肯定是来自与jdk中的.其实也就是jdk中的java.sql包下的程序. 下面是jdbc涉及的主要结构: jdbc设计结构

先来看,第一步的加载注册驱动:

Class.forName("com.mysql.jdbc.Driver");
public class Driver extends NonRegisteringDriver implements java.sql.Driver {
    //
    // Register ourselves with the DriverManager
    // 其实就是在执行反射方法forName的时候,一定会执行static静态代码块,实际的将Driver注册到DriverManager就是在这里执行的
    static {
        try {
            java.sql.DriverManager.registerDriver(new Driver());
        } catch (SQLException E) {
            throw new RuntimeException("Can't register driver!");
        }
    }

    /**
     * Construct a new driver and register it with DriverManager
     * 
     * @throws SQLException
     *             if a database error occurs.
     */
    public Driver() throws SQLException {
        // Required for Class.forName().newInstance()
    }
}

public class DriverManager {


    // List of registered JDBC drivers
    //使用写时复制的List可以高效适应读多写少的场景
    private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();

    public static synchronized void registerDriver(java.sql.Driver driver)
        throws SQLException {

        registerDriver(driver, null);
    }

    /**
     * Registers the given driver with the {@code DriverManager}.
     * A newly-loaded driver class should call
     * the method {@code registerDriver} to make itself
     * known to the {@code DriverManager}. If the driver is currently
     * registered, no action is taken.
     *
     * @param driver the new JDBC Driver that is to be registered with the
     *               {@code DriverManager}
     * @param da     the {@code DriverAction} implementation to be used when
     *               {@code DriverManager#deregisterDriver} is called
     * @exception SQLException if a database access error occurs
     * @exception NullPointerException if {@code driver} is null
     * @since 1.8
     */
    public static synchronized void registerDriver(java.sql.Driver driver,
            DriverAction da)
        throws SQLException {

        /* Register the driver if it has not already been added to our list */
        if(driver != null) {
        //不会出现多次注册的情况
            registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
        } else {
            // This is for compatibility with the original DriverManager
            throw new NullPointerException();
        }

        println("registerDriver: " + driver);

    }
}

下面这个是DriverManager中实际获取数据库连接的方法:

private static Connection getConnection(
        String url, java.util.Properties info, Class<?> caller) throws SQLException {
        /*
         * When callerCl is null, we should check the application's
         * (which is invoking this class indirectly)
         * classloader, so that the JDBC driver class outside rt.jar
         * can be loaded from here.
         */
        ClassLoader callerCL = caller != null ? caller.getClassLoader() : null;
        synchronized(DriverManager.class) {
            // synchronize loading of the correct classloader.
            if (callerCL == null) {
                callerCL = Thread.currentThread().getContextClassLoader();
            }
        }

        if(url == null) {
            throw new SQLException("The url cannot be null", "08001");
        }

        println("DriverManager.getConnection(\"" + url + "\")");

        // Walk through the loaded registeredDrivers attempting to make a connection.
        // Remember the first exception that gets raised so we can reraise it.
        SQLException reason = null;
        
        //遍历所有的已注册的驱动,逐个判断是否是当前可用的驱动
        for(DriverInfo aDriver : registeredDrivers) {
            // If the caller does not have permission to load the driver then
            // skip it.
            if(isDriverAllowed(aDriver.driver, callerCL)) {
                try {
                    println("    trying " + aDriver.driver.getClass().getName());
                    //实际使用驱动来创建连接的方法
                    Connection con = aDriver.driver.connect(url, info);
                    if (con != null) {
                        // Success!
                        println("getConnection returning " + aDriver.driver.getClass().getName());
                        return (con);
                    }
                } catch (SQLException ex) {
                    if (reason == null) {
                        reason = ex;
                    }
                }

            } else {
                println("    skipping: " + aDriver.getClass().getName());
            }

        }

        // if we got here nobody could connect.
        if (reason != null)    {
            println("getConnection failed: " + reason);
            throw reason;
        }

        println("getConnection: no suitable driver found for "+ url);
        throw new SQLException("No suitable driver found for "+ url, "08001");
    }

public java.sql.Connection connect(String url, Properties info) throws SQLException {
        if (url == null) {
            throw SQLError.createSQLException(Messages.getString("NonRegisteringDriver.1"), SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE, null);
        }

        if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) {
            return connectLoadBalanced(url, info);
        } else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) {
            return connectReplicationConnection(url, info);
        }

        Properties props = null;
        
        //基于配置的连接url,解析通过mysql协议连接到mysql的所需数据(host,port,datebasename等等)
        if ((props = parseURL(url, info)) == null) {
            return null;
        }

        if (!"1".equals(props.getProperty(NUM_HOSTS_PROPERTY_KEY))) {
            return connectFailover(url, info);
        }

        try {
        //实际连接到mysql的代码
            Connection newConn = com.mysql.jdbc.ConnectionImpl.getInstance(host(props), port(props), props, database(props), url);

            return newConn;
        } catch (SQLException sqlEx) {
            // Don't wrap SQLExceptions, throw
            // them un-changed.
            throw sqlEx;
        } catch (Exception ex) {
            SQLException sqlEx = SQLError.createSQLException(
                    Messages.getString("NonRegisteringDriver.17") + ex.toString() + Messages.getString("NonRegisteringDriver.18"),
                    SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE, null);

            sqlEx.initCause(ex);

            throw sqlEx;
        }
    }
    protected static Connection getInstance(String hostToConnectTo, int portToConnectTo, Properties info, String databaseToConnectTo, String url)
            throws SQLException {
        if (!Util.isJdbc4()) {
            return new ConnectionImpl(hostToConnectTo, portToConnectTo, info, databaseToConnectTo, url);
        }

        return (Connection) Util.handleNewInstance(JDBC_4_CONNECTION_CTOR,
                new Object[] { hostToConnectTo, Integer.valueOf(portToConnectTo), info, databaseToConnectTo, url }, null);
    }
JDBC_4_CONNECTION_CTOR = Class.forName("com.mysql.jdbc.JDBC4Connection")
                        .getConstructor(new Class[] { String.class, Integer.TYPE, Properties.class, String.class, String.class })
public static final Object handleNewInstance(Constructor<?> ctor, Object[] args, ExceptionInterceptor exceptionInterceptor) throws SQLException

可以看出,最终调用的就是通过JDBC4Connection来连接到mysql的

public class JDBC4Connection extends ConnectionImpl implements JDBC4MySQLConnection {

    private static final long serialVersionUID = 2877471301981509475L;

    private JDBC4ClientInfoProvider infoProvider;

    public JDBC4Connection(String hostToConnectTo, int portToConnectTo, Properties info, String databaseToConnectTo, String url) throws SQLException {
        super(hostToConnectTo, portToConnectTo, info, databaseToConnectTo, url);
    }

ConnectionImpl类的构造方法

        //必要的连接数据初始化
        this.port = portToConnectTo;

        this.database = databaseToConnectTo;
        this.myURL = url;
        this.user = info.getProperty(NonRegisteringDriver.USER_PROPERTY_KEY);
        this.password = info.getProperty(NonRegisteringDriver.PASSWORD_PROPERTY_KEY);

        if ((this.user == null) || this.user.equals("")) {
            this.user = "";
        }

        if (this.password == null) {
            this.password = "";
        }
        
        //.......

        try {
            this.dbmd = getMetaData(false, false);
            initializeSafeStatementInterceptors();
            //创建连接到数据库
            createNewIO(false);
            unSafeStatementInterceptors();
        } catch (SQLException ex) {
            cleanup(ex);
            throw ex;
        } catch (Exception ex) {
            cleanup(ex);
        }

        NonRegisteringDriver.trackConnection(this);
public void createNewIO(boolean isForReconnect) throws SQLException {
        synchronized (getConnectionMutex()) {
            // Synchronization Not needed for *new* connections, but defintely for connections going through fail-over, since we might get the new connection up
            // and running *enough* to start sending cached or still-open server-side prepared statements over to the backend before we get a chance to
            // re-prepare them...

            Properties mergedProps = exposeAsProperties(this.props);
            //不是高可用模式,只进行一次连接。失败不会重试!
            if (!getHighAvailability()) {
                connectOneTryOnly(isForReconnect, mergedProps);

                return;
            }

            connectWithRetries(isForReconnect, mergedProps);
        }
    }

只进行一次连接:

    private void connectOneTryOnly(boolean isForReconnect, Properties mergedProps) throws SQLException {
        Exception connectionNotEstablishedBecause = null;

        try {

            coreConnect(mergedProps);
            this.connectionId = this.io.getThreadId();
            this.isClosed = false;

失败会重试:

private void connectWithRetries(boolean isForReconnect, Properties mergedProps) throws SQLException {
        double timeout = getInitialTimeout();
        boolean connectionGood = false;

        Exception connectionException = null;

        for (int attemptCount = 0; (attemptCount < getMaxReconnects()) && !connectionGood; attemptCount++) {
            try {
                if (this.io != null) {
                    this.io.forceClose();
                }

                coreConnect(mergedProps);
 private void coreConnect(Properties mergedProps) throws SQLException, IOException {
        int newPort = 3306;
        String newHost = "localhost";

        String protocol = mergedProps.getProperty(NonRegisteringDriver.PROTOCOL_PROPERTY_KEY);
        //协议为空(何时为tcp和其他这个需要进一步深究)
        if (protocol != null) {
            // "new" style URL

            if ("tcp".equalsIgnoreCase(protocol)) {
                newHost = normalizeHost(mergedProps.getProperty(NonRegisteringDriver.HOST_PROPERTY_KEY));
                newPort = parsePortNumber(mergedProps.getProperty(NonRegisteringDriver.PORT_PROPERTY_KEY, "3306"));
            } else if ("pipe".equalsIgnoreCase(protocol)) {
                setSocketFactoryClassName(NamedPipeSocketFactory.class.getName());

                String path = mergedProps.getProperty(NonRegisteringDriver.PATH_PROPERTY_KEY);

                if (path != null) {
                    mergedProps.setProperty(NamedPipeSocketFactory.NAMED_PIPE_PROP_NAME, path);
                }
            } else {
                // normalize for all unknown protocols
                newHost = normalizeHost(mergedProps.getProperty(NonRegisteringDriver.HOST_PROPERTY_KEY));
                newPort = parsePortNumber(mergedProps.getProperty(NonRegisteringDriver.PORT_PROPERTY_KEY, "3306"));
            }
        } else {

            String[] parsedHostPortPair = NonRegisteringDriver.parseHostPortPair(this.hostPortPair);
            newHost = parsedHostPortPair[NonRegisteringDriver.HOST_NAME_INDEX];

            newHost = normalizeHost(newHost);

            if (parsedHostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] != null) {
                newPort = parsePortNumber(parsedHostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX]);
            }
        }

        this.port = newPort;
        this.host = newHost;

        // reset max-rows to default value
        this.sessionMaxRows = -1;

        // preconfigure some server variables which are consulted before their initialization from server
        this.serverVariables = new HashMap<String, String>();
        this.serverVariables.put("character_set_server", "utf8");

        //创建真正的连接到mysql服务端的socket(MysqlIO就是对这个socket的封装)
        this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), getProxy(), getSocketTimeout(),
                this.largeRowSizeThreshold.getValueAsInt());
        //实际进行连接操作
        this.io.doHandshake(this.user, this.password, this.database);

    /** The connection to the server */
    //和mysql服务端交互的socket
    public Socket mysqlConnection = null;
    protected SocketFactory socketFactory = null;

 public MysqlIO(String host, int port, Properties props, String socketFactoryClassName, MySQLConnection conn, int socketTimeout,
            int useBufferRowSizeThreshold) throws IOException, SQLException {
        this.connection = conn;

        if (this.connection.getEnablePacketDebug()) {
            this.packetDebugRingBuffer = new LinkedList<StringBuilder>();
        }
        this.traceProtocol = this.connection.getTraceProtocol();

        this.useAutoSlowLog = this.connection.getAutoSlowLog();

        this.useBufferRowSizeThreshold = useBufferRowSizeThreshold;
        this.useDirectRowUnpack = this.connection.getUseDirectRowUnpack();

        this.logSlowQueries = this.connection.getLogSlowQueries();

        this.reusablePacket = new Buffer(INITIAL_PACKET_SIZE);
        this.sendPacket = new Buffer(INITIAL_PACKET_SIZE);

        this.port = port;
        this.host = host;

        this.socketFactoryClassName = socketFactoryClassName;
        this.socketFactory = createSocketFactory();
        this.exceptionInterceptor = this.connection.getExceptionInterceptor();

        try {
            //实际创建连接
            this.mysqlConnection = this.socketFactory.connect(this.host, this.port, props);

下面的方法中就可以看到实际的socket网络编程常见的操作了

 public Socket connect(String hostname, int portNumber, Properties props) throws SocketException, IOException {

        if (props != null) {
            this.host = hostname;

            this.port = portNumber;

            String localSocketHostname = props.getProperty("localSocketAddress");
            InetSocketAddress localSockAddr = null;
            if (localSocketHostname != null && localSocketHostname.length() > 0) {
                localSockAddr = new InetSocketAddress(InetAddress.getByName(localSocketHostname), 0);
            }

            String connectTimeoutStr = props.getProperty("connectTimeout");

            int connectTimeout = 0;

            if (connectTimeoutStr != null) {
                try {
                    connectTimeout = Integer.parseInt(connectTimeoutStr);
                } catch (NumberFormatException nfe) {
                    throw new SocketException("Illegal value '" + connectTimeoutStr + "' for connectTimeout");
                }
            }

            if (this.host != null) {
                InetAddress[] possibleAddresses = InetAddress.getAllByName(this.host);

                if (possibleAddresses.length == 0) {
                    throw new SocketException("No addresses for host");
                }

                // save last exception to propagate to caller if connection fails
                SocketException lastException = null;

                // Need to loop through all possible addresses. Name lookup may return multiple addresses including IPv4 and IPv6 addresses. Some versions of
                // MySQL don't listen on the IPv6 address so we try all addresses.
                for (int i = 0; i < possibleAddresses.length; i++) {
                    try {
                        this.rawSocket = createSocket(props);

                        configureSocket(this.rawSocket, props);

                        InetSocketAddress sockAddr = new InetSocketAddress(possibleAddresses[i], this.port);
                        // bind to the local port if not using the ephemeral port
                        if (localSockAddr != null) {
                            this.rawSocket.bind(localSockAddr);
                        }

                        this.rawSocket.connect(sockAddr, getRealTimeout(connectTimeout));

                        break;
                    } catch (SocketException ex) {
                        lastException = ex;
                        resetLoginTimeCountdown();
                        this.rawSocket = null;
                    }
                }

                if (this.rawSocket == null && lastException != null) {
                    throw lastException;
                }

                resetLoginTimeCountdown();

                return this.rawSocket;
            }
        }

        throw new SocketException("Unable to create socket");
    }

三.Druid+Springboot连接创建源码分析

当项目启动后,并没有创建mysql连接,只有当有数据库操作请求的时候,才会真正地创建数据库连接。

public abstract class AbstractRoutingDataSource extends AbstractDataSource {

    public Connection getConnection() throws SQLException {
        String xid = TransactionContext.getXID();
        if (StringUtils.isEmpty(xid)) {
            return this.determineDataSource().getConnection();
        } else {
            String ds = DynamicDataSourceContextHolder.peek();
            ConnectionProxy connection = ConnectionFactory.getConnection(ds);
            return (Connection)(connection == null ? this.getConnectionProxy(ds, this.determineDataSource().getConnection()) : connection);
        }
    }

//druid封装了自己的DataSource对象
public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration
    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }
    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }
dataSourceStat.setResetStatEnable(this.resetStatEnable);
//初始化数据源连接池
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];

SQLException connectError = null;

if (createScheduler != null && asyncInit) {
    for (int i = 0; i < initialSize; ++i) {
        createTaskCount++;
        CreateConnectionTask task = new CreateConnectionTask(true);
        this.createSchedulerFuture = createScheduler.submit(task);
    }
} else if (!asyncInit) {
    // init connections
    //第一次数据库操作请求进来之后,初始化数据库连接池,按照配置的初始化连接数量来创建相应的数据库连接
    while (poolingCount < initialSize) {
        try {
            //Druid中实际创建连接的地方
            PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
            DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
            connections[poolingCount++] = holder;
        } catch (SQLException ex) {
            LOG.error("init datasource error, url: " + this.getUrl(), ex);
            if (initExceptionThrow) {
                connectError = ex;
                break;
            } else {
                Thread.sleep(3000);
            }
        }
                }
public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
        String url = this.getUrl();
        Properties connectProperties = getConnectProperties();

        String user;
        if (getUserCallback() != null) {
            user = getUserCallback().getName();
        } else {
            user = getUsername();
        }

        String password = getPassword();
        PasswordCallback passwordCallback = getPasswordCallback();

        if (passwordCallback != null) {
            if (passwordCallback instanceof DruidPasswordCallback) {
                DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;

                druidPasswordCallback.setUrl(url);
                druidPasswordCallback.setProperties(connectProperties);
            }

            char[] chars = passwordCallback.getPassword();
            if (chars != null) {
                password = new String(chars);
            }
        }

        Properties physicalConnectProperties = new Properties();
        if (connectProperties != null) {
            physicalConnectProperties.putAll(connectProperties);
        }

        if (user != null && user.length() != 0) {
            physicalConnectProperties.put("user", user);
        }

        if (password != null && password.length() != 0) {
            physicalConnectProperties.put("password", password);
        }

        Connection conn = null;

        long connectStartNanos = System.nanoTime();
        long connectedNanos, initedNanos, validatedNanos;

        Map<String, Object> variables = initVariants
                ? new HashMap<String, Object>()
                : null;
        Map<String, Object> globalVariables = initGlobalVariants
                ? new HashMap<String, Object>()
                : null;

        createStartNanosUpdater.set(this, connectStartNanos);
        creatingCountUpdater.incrementAndGet(this);
        try {
            conn = createPhysicalConnection(url, physicalConnectProperties);
            connectedNanos = System.nanoTime();

            if (conn == null) {
                throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
            }

            initPhysicalConnection(conn, variables, globalVariables);
            initedNanos = System.nanoTime();

            validateConnection(conn);
            validatedNanos = System.nanoTime();

            setFailContinuous(false);
            setCreateError(null);
        } catch (SQLException ex) {
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (RuntimeException ex) {
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (Error ex) {
            createErrorCountUpdater.incrementAndGet(this);
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } finally {
            long nano = System.nanoTime() - connectStartNanos;
            createTimespan += nano;
            creatingCountUpdater.decrementAndGet(this);
        }

        return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
    }
    //最终实际走到的创建连接的方法
    public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
        Connection conn;
        if (getProxyFilters().size() == 0) {
            conn = getDriver().connect(url, info);
        } else {
            conn = new FilterChainImpl(this).connection_connect(info);
        }

        createCountUpdater.incrementAndGet(this);

        return conn;
    }

看到这里,应该就很清楚了,已经是来到相应的驱动来创建连接的方法了,也就是我们上面分析jdbc中创建连接的方法。

未完待续...

  1. 感谢大佬分享!