网站首页 文章专栏 深入druid之druid连接池源码分析
要找一个类代表druid的话,那么非DruidDataSource这个莫属了,其核心连接的维护,连接的构建,入池,获取,收缩,销毁,以及核心监控数据都在这个类维护,所以在研究之前必须把这个类中的核心成员变量搞清楚含义,不然很难阅读源码。
先来张该类的全局信息:
看图可得知DruidDataSource继承自DruidAbstractDataSource和CommonDataSource。说明DruidDataSrouce是一个DataSource,可以直接getConnection获取连接。且拥有两个task实现runnable接口,三个线程继承thread,分别处理销毁任务,创建连接任务,记录统计信息等信息。
类名 | 描述 |
ExceptionSorter | 用于判断SQLException对象是否致命异常 |
ValidConnectionChecker | 用于校验指定连接对象是否有效 |
CreateConnectionThread | DruidDataSource的内部类,用于异步创建连接对象 |
notEmpty | 调用notEmpty.await()时,当前线程进入等待;当连接创建完成或者回收了连接,会调用notEmpty.signal()时,将等待线程唤醒 |
empty | 调用empty.await()时,CreateConnectionThread进入等待,调用empty.signal()时,CreateConnectionThread被唤醒,并进入创建连接 |
DestroyConnectionThread | DruidDataSource的内部类,用于异步检验连接对象,包括校验空闲连接的phyTimeoutMillis,minEvictableIdleTimeMillis,以及校验借出连接的removeAbandonedTimeoutMillis |
LogStatsThread | DruidDataSource的内部类,用于异步记录统计信息 |
connections | 用于存放所有连接对象 |
evictConnections | 用于存放需要丢弃的连接对象 |
keepAliveConnections | 用于存放需要keepAlive的连接对象 |
activeConnections | 用于存放需要进行removeAbandoned的连接对象 |
poolingCount | 空闲连接对象的数量 |
activeCount | 借出连接对象的数量 |
初始化的过程分为两种情况,一种是在加载druid时初始化,还有就是在获取连接时也会初始话,DruidDataSource的这个初始化时机是可选的,当我们设置init=true时,在createDataSource时就会调用DataSource.init()方法进行初始化,否则,只会在getConnection时再进行初始化,当然这个初始化肯定时只会进行一次的。
public void init() throws SQLException { if (inited) { return; } // bug fixed for dead lock, for issue #2980 DruidDriver.getInstance(); final ReentrantLock lock = this.lock; try { lock.lockInterruptibly(); } catch (InterruptedException e) { throw new SQLException("interrupt", e); } boolean init = false;
1,第一步就是判断是否已经初始化了,如果已经初始化,那么不再进行,保证只初始化一次,inited这个字段是个volatile修饰的布尔类型
2,然后是DruidDriver.getInstance();这一步是为了提前初始化DruidDriver,因为之前有个issue提到这里在多线程下初始化可能存在死锁,所以就给提前显式初始化了
3,加锁,使用一个可重入锁,并使用可处理中断异常的方式获取锁,保证下面的处理只有一个线程能处理
try { if (inited) { return; } initStackTrace = Utils.toString(Thread.currentThread().getStackTrace()); this.id = DruidDriver.createDataSourceId(); if (this.id > 1) { long delta = (this.id - 1) * 100000; this.connectionIdSeedUpdater.addAndGet(this, delta); this.statementIdSeedUpdater.addAndGet(this, delta); this.resultSetIdSeedUpdater.addAndGet(this, delta); this.transactionIdSeedUpdater.addAndGet(this, delta); } if (this.jdbcUrl != null) { this.jdbcUrl = this.jdbcUrl.trim(); initFromWrapDriverUrl(); } for (Filter filter : filters) { filter.init(this); } if (this.dbTypeName == null || this.dbTypeName.length() == 0) { this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null); } DbType dbType = DbType.of(this.dbTypeName); if (dbType == DbType.mysql || dbType == DbType.mariadb || dbType == DbType.oceanbase || dbType == DbType.ads) { boolean cacheServerConfigurationSet = false; if (this.connectProperties.containsKey("cacheServerConfiguration")) { cacheServerConfigurationSet = true; } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) { cacheServerConfigurationSet = true; } if (cacheServerConfigurationSet) { this.connectProperties.put("cacheServerConfiguration", "true"); } }
1,这段代码就已经在锁里了,再次判断是否已经初始化,同时创建一个datasourceId,这个id是原子类,保证安全
2,如果id>1,那么说明不止一个数据源,if里面的代码猜测可能是保留一个区间段id给每个数据源使用的
3,initFromWrapDriverUrl(),是针对druid自定义的一种url格式,以jdbc:wrap-jdbc:开头,进行解析
4,this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null); 根据url前缀,确定dbType
// 一段对配置参数进行检查的代码 ... initFromSPIServiceLoader(); resolveDriver(); initCheck(); initExceptionSorter(); initValidConnectionChecker(); validationQueryCheck();
1,采用SPI机制加载过滤器,这部分过滤器除了放入filters,还会放入autoFilters
2,处理驱动,根据我们配置中的连接地址的协议,得到具体的驱动类型
3,initCheck,只是针对oracle和DB2,需要校验validationQuery
4,根据dbType实例化一个具体的MySqlExceptionSorter,用来处理异常,判断异常等
5,根据dbType初始化一个具体的MySqlValidConnectionChecker,并加载配置,该类会在后面起到检测连接是否有效的作用
6,校验testOnBorrow,testOnReturn,testWhileIdle参数的合法性
connections = new DruidConnectionHolder[maxActive]; evictConnections = new DruidConnectionHolder[maxActive]; keepAliveConnections = new DruidConnectionHolder[maxActive]; SQLException connectError = null; // 创建初始连接数 // 异步创建,createScheduler为null,不进入 if (createScheduler != null && asyncInit) { for (int i = 0; i < initialSize; ++i) { submitCreateTask(true); } } else if (!asyncInit) { // init connections while (poolingCount < initialSize) { try { PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); connections[poolingCount++] = holder; } catch (SQLException ex) { LOG.error("init datasource error, url: " + this.getUrl(), ex); if (initExceptionThrow) { connectError = ex; break; } else { Thread.sleep(3000); } } } if (poolingCount > 0) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } } createAndLogThread(); createAndStartCreatorThread(); createAndStartDestroyThread(); initedLatch.await(); init = true; initedTime = new Date(); registerMbean(); if (connectError != null && poolingCount == 0) { throw connectError; } if (keepAlive) { // async fill to minIdle if (createScheduler != null) { for (int i = 0; i < minIdle; ++i) { submitCreateTask(true); } } else { this.emptySignal(); } }
1,初始化connections,用于存放所有连接对象,evictConnections,用于存放需要丢弃的连接对象,keepAliveConnections,用于存放需要keepAlive的连接对象
2,这里有两种方式创建连接,一种是异步,一种是同步。我这里使用的是同步,createScheduler为null。
3,poolingCount 为空闲连接对象数量,当其小于初始化连接池大小时,不停的调用createPhysicalConnection();创建新连接,并放进去
3.1,createPhysicalConnection(),的流程大致就是读取配置中的url,驱动以及用户密码,实例化一个ConnectionProxyImpl,再使用上面初始话的validConnectionChecker,根据配置对该连接进行校验,判断是否是可用的连接。
4,启动三个线程
// 启动监控数据记录线程 createAndLogThread(); // 启动连接创建线程 createAndStartCreatorThread(); // 启动连接检测线程 createAndStartDestroyThread(); initedLatch.await();
这里initedLatch为一个countdownlatch对象,保证当createConnectionThread和destroyConnectionThread开始run时再继续执行
5,注册MBean,会去注册DruidDataSourceStatManager和DruidDataSource,用来通过jmx监控
6,如果配置了keepAlive,且是异步创建连接,那么会提交创建任务,创建任务队列使用long数组实现,初始化为8个长度,每个位置为0则代表任务为空,不为零则就是具体的任务id,代表加入任务队列,当队列满了时,会进行扩容,大小为1.5倍,然后通过createScheduler,执行任务。
6.1,如果配置了keepAlive,且不是异步创建连接,那么会去调用empty.signal(),会去唤醒处于empty.await()状态的CreateConnectionThread,CreateConnectionThread这个线程只有在需要创建连接时才运行,否则会一直等待。
至此,初始话的流程已经完成,总结下就是初始化驱动实例 -> 加锁 -> 初始化属性 -> 初始化过滤器 -> 校验参数 -> 创建初始化连接并校验后加入池中 -> 创建logStatsThread、createConnectionThread和destroyConnectionThread -> 注册MBean,用于支持JMX -> 如果设置了keepAlive,通知createConnectionThread创建连接对象 -> 解锁
之前我们说过,DruidDataSource这个核心类其实就是一个DataSource,因为其继承了DataSource,所以它也重写了 getConnection方法,实际获取一个连接就是从这获取的
@Override public DruidPooledConnection getConnection() throws SQLException { return getConnection(maxWait); } public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
它调用了本身的一个getConnection方法,并以配置的maxWait作为参数传递进去。获取连接之前会进行druid连接池的初始化动作,这个上面也是说了,进入初始化流程发现已经初始化过了,就不会再初始化了。
接着如果配置了过滤器,也就是stat,wall,log4j那几个插件,那么会进行插件的初始化,这里使用到了责任链模式,很经典的用法了,在网关中也常用到,之前源码分析soul网关时也讲到这中用法。然后用filterChain调用dataSource_connect方法,进行获取连接。
在filterChain中,定义了当前的数据源以及连接池信息,责任链的列表,长度,以及当前访问的位置.
@Override public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException { if (this.pos < filterSize) { DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis); return conn; } return dataSource.getConnectionDirect(maxWaitMillis); }
如上代码,判断当前位置小于责任链长度,则使用nextFilter()获取下一个实际的过滤器获取连接,其实也就是遍历责任链,用每个过滤器插件执行下dataSource_getConnection(this, dataSource, maxWaitMillis); 这个方法在Filter中就定义了,每个过滤器都实现了具体的方法,举个例子:这是statFilter的实现
@Override public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource, long maxWaitMillis) throws SQLException { DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis); if (conn != null) { conn.setConnectedTimeNano(); StatFilterContext.getInstance().pool_connection_open(); } return conn; }
可以看到,很骚的是,他把过滤器本身传进来了,又通过dataSource_connect调用了一次,相当于dfs递归遍历,然后又回到上面的过滤器列表的遍历,只不过这一次pos已经+1了,拿到的就是下一个过滤器。如此这样直到,pos的位置大于插件连的长度。执行这个代码dataSource.getConnectionDirect(maxWaitMillis)
这个方法字面意思就是直连获取连接:
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; }
起一个死循环,在循环里面调用getConnectionInternal(maxWaitMillis);获取连接,关键点就在这:
poolableConnection = getConnectionInternal(maxWaitMillis); private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { if (closed) { connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); } if (!enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); final int maxWaitThreadCount = this.maxWaitThreadCount; DruidConnectionHolder holder; for (boolean createDirect = false;;) { if (createDirect) { createStartNanosUpdater.set(this, System.nanoTime()); if (creatingCountUpdater.compareAndSet(this, 0, 1)) { PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); holder.lastActiveTimeMillis = System.currentTimeMillis(); creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this); if (LOG.isDebugEnabled()) { LOG.debug("conn-direct_create "); } boolean discard = false; lock.lock(); try { if (activeCount < maxActive) { activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } break; } else { discard = true; } } finally { lock.unlock(); } if (discard) { JdbcUtils.close(pyConnInfo.getPhysicalConnection()); } } }
上面代码逻辑就是,起一个死循环,先判断是否是直接创建,刚进来肯定不是,往下走:
try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); } try { if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } if (onFatalError && onFatalErrorMaxActive > 0 && activeCount >= onFatalErrorMaxActive) { connectErrorCountUpdater.incrementAndGet(this); StringBuilder errorMsg = new StringBuilder(); errorMsg.append("onFatalError, activeCount ") .append(activeCount) .append(", onFatalErrorMaxActive ") .append(onFatalErrorMaxActive); if (lastFatalErrorTimeMillis > 0) { errorMsg.append(", time '") .append(StringUtils.formatDateTime19( lastFatalErrorTimeMillis, TimeZone.getDefault())) .append("'"); } if (lastFatalErrorSql != null) { errorMsg.append(", sql \n") .append(lastFatalErrorSql); } throw new SQLException( errorMsg.toString(), lastFatalError); } connectCount++; if (createScheduler != null && poolingCount == 0 && activeCount < maxActive && creatingCountUpdater.get(this) == 0 && createScheduler instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler; if (executor.getQueue().size() > 0) { createDirect = true; continue; } } if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); } if (holder != null) { if (holder.discard) { continue; } activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } }
先加锁,然后是对异常场景的一顿判断,如果配置了异步创建连接,且池中无可用连接,且借出去的连接小于最大活跃连接,且待创建任务的队列>0,则将createDirect置为true,跳过下面代码,回到循环上面,相当于这次连接池没有可用的连接了,通过DruidDataSource.this.createPhysicalConnection();就直接现场创建一个连接。
如果没有配置异步创建
if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); }
如果没超时,如果无可用连接,则发一个信号创建连接,直到可用连接不为0,然后取走最后一个连接对象,下面的超时的逻辑也很类似,就是多了一个对超时时间的处理。拿到链接后,活跃连接数加1,解锁。
如果拿到的连接为null,则代表出错了,组装下错误日志,抛出异常。
正常情况,则将拿到的连接包装为DruidPooledConnection,返回。
回到最上面,我们拿到连接后,通过testOnBorrow配置,对拿到的连接进行校验,这个校验的逻辑和前面创建的连接校验的逻辑是一样的,执行下配置的select 'x'。校验结果如果不合法,则代表此链接没用,就需要抛弃掉
if (testOnBorrow) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); } discardConnection(poolableConnection.holder); continue; } }
抛弃的流程就是,如果连接不为null,就将连接关了,同时将该连接的holer的属性置为抛弃,活跃数量减一,如果此时使用中的连接数量小于配置的最小连接数,则发起emptySignal()信号,进行创建。不合法后,则需要继续循环再从连接池拿一个连接,直到有效。
然后是testWhileIdle配置,该配置意思是申请连接时如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效,逻辑也就这个意思
if (testWhileIdle) { final DruidConnectionHolder holder = poolableConnection.holder; long currentTimeMillis = System.currentTimeMillis(); long lastActiveTimeMillis = holder.lastActiveTimeMillis; long lastExecTimeMillis = holder.lastExecTimeMillis; long lastKeepTimeMillis = holder.lastKeepTimeMillis; if (checkExecuteTime && lastExecTimeMillis != lastActiveTimeMillis) { lastActiveTimeMillis = lastExecTimeMillis; } if (lastKeepTimeMillis > lastActiveTimeMillis) { lastActiveTimeMillis = lastKeepTimeMillis; } long idleMillis = currentTimeMillis - lastActiveTimeMillis; long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis; if (timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis || idleMillis < 0 // unexcepted branch ) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection."); } discardConnection(poolableConnection.holder); continue; } } }
然后就成功的拿到连接了,此时代码回到了这里:
@Override public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException { if (this.pos < filterSize) { DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis); return conn; } // 此处执行完了 return dataSource.getConnectionDirect(maxWaitMillis); }
因为这里的过滤器链是dfs递归调用的,所以回去的时候就是一层一层的会去,回到每一层,执行获取连接后的逻辑,比如StatFilter会执行:
// 拿到连接后 if (conn != null) { conn.setConnectedTimeNano(); StatFilterContext.getInstance().pool_connection_open(); }
层层返回后,这个连接就算成功拿到了,交给spring集成相关的代码,去实际执行sql了。这里使用责任链,通过每个过滤器去获取连接的目的就是,让每个插件在拿到连接的时候都能做一下处理,不想做直接跳过即可,以后也可以很方便拓展插件。
到此获取连接的逻辑结束。
连接的上层获取动作,是从DataSourceUtils中的getConnection获取的,调用我们上面分析的DruidDataSource中获取一个实际的连接。
拿到连接后,将连接包装成一个ConnectionHolder
public static Connection doGetConnection(DataSource dataSource) throws SQLException { Assert.notNull(dataSource, "No DataSource specified"); ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource); if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) { logger.debug("Fetching JDBC Connection from DataSource"); //拿到一个连接 Connection con = fetchConnection(dataSource); if (TransactionSynchronizationManager.isSynchronizationActive()) { logger.debug("Registering transaction synchronization for JDBC Connection"); // 包装下 ConnectionHolder holderToUse = conHolder; if (conHolder == null) { holderToUse = new ConnectionHolder(con); } else { conHolder.setConnection(con); } holderToUse.requested(); TransactionSynchronizationManager.registerSynchronization(new DataSourceUtils.ConnectionSynchronization(holderToUse, dataSource)); holderToUse.setSynchronizedWithTransaction(true); if (holderToUse != conHolder) { TransactionSynchronizationManager.bindResource(dataSource, holderToUse); } } return con; } else { conHolder.requested(); if (!conHolder.hasConnection()) { logger.debug("Fetching resumed JDBC Connection from DataSource"); conHolder.setConnection(fetchConnection(dataSource)); } return conHolder.getConnection(); } }
此时spring的事务管理器,拿到了这个连接,然后到达了mybatis手中,mybatis开始初始化s'tatement。
// 此处为mybatis的 BaseStatementHandler private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Connection connection = this.getConnection(statementLog); Statement stmt = handler.prepare(connection, this.transaction.getTimeout()); handler.parameterize(stmt); return stmt; }
然后使用连接的preparedStatement(String sql)方法,初始化preparedStatement,也就进入了druid的重写的方法。创建一个实际的preparedStatement后,用一个PreparedStatementHolder包装下。并记录一些关键的指标,用来统计。
// druid重写的创建preparedStatement方法 @Override public PreparedStatement prepareStatement(String sql) throws SQLException { checkState(); PreparedStatementHolder stmtHolder = null; PreparedStatementKey key = new PreparedStatementKey(sql, getCatalog(), MethodType.M1); boolean poolPreparedStatements = holder.isPoolPreparedStatements(); if (poolPreparedStatements) { stmtHolder = holder.getStatementPool().get(key); } if (stmtHolder == null) { try { stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql)); holder.getDataSource().incrementPreparedStatementCount(); } catch (SQLException ex) { handleException(ex, sql); } } initStatement(stmtHolder); DruidPooledPreparedStatement rtnVal = new DruidPooledPreparedStatement(this, stmtHolder); holder.addTrace(rtnVal); return rtnVal; }
然后mybatis就拿到了这个实际执行的preparedStatement,开始实际的执行。
执行完成后,就开始归还连接了,最开始肯定是 preparedStatement 进行close,由于实际执行的是我们包装的DruidPooledPreparedStatement,所以close方法也会被重写执行,在DruidPooledConnection中closePoolableStatement(),方法实际去关闭。
public void closePoolableStatement(DruidPooledPreparedStatement stmt) throws SQLException { PreparedStatement rawStatement = stmt.getRawPreparedStatement(); final DruidConnectionHolder holder = this.holder; if (holder == null) { return; } if (stmt.isPooled()) { try { rawStatement.clearParameters(); } catch (SQLException ex) { this.handleException(ex, null); if (rawStatement.getConnection().isClosed()) { return; } LOG.error("clear parameter error", ex); } } PreparedStatementHolder stmtHolder = stmt.getPreparedStatementHolder(); stmtHolder.decrementInUseCount(); if (stmt.isPooled() && holder.isPoolPreparedStatements() && stmt.exceptionCount == 0) { holder.getStatementPool().put(stmtHolder); stmt.clearResultSet(); holder.removeTrace(stmt); stmtHolder.setFetchRowPeak(stmt.getFetchRowPeak()); stmt.setClosed(true); // soft set close } else if (stmt.isPooled() && holder.isPoolPreparedStatements()) { // the PreparedStatement threw an exception stmt.clearResultSet(); holder.removeTrace(stmt); holder.getStatementPool() .remove(stmtHolder); } else { try { //Connection behind the statement may be in invalid state, which will throw a SQLException. //In this case, the exception is desired to be properly handled to remove the unusable connection from the pool. stmt.closeInternal(); } catch (SQLException ex) { this.handleException(ex, null); throw ex; } finally { holder.getDataSource().incrementClosedPreparedStatementCount(); } } }
大致逻辑就是判断stmt的类型,以及参数,处理相应的数据,再调用DruidPooledStatement的close方法,进行实际的关闭。再去清除当前查询的结果的数据,对resultSet进行close。此时数据已经被查出来了,返回,开始对connect进行close。
由于我们这是连接池,如果连接好好的还能用,那么就不用真正关了,只需要给他放入池中就行,下次直接就用了。
// druidPooledConnection @Override public void close() throws SQLException { if (this.disable) { return; } DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } DruidAbstractDataSource dataSource = holder.getDataSource(); boolean isSameThread = this.getOwnerThread() == Thread.currentThread(); if (!isSameThread) { dataSource.setAsyncCloseConnectionEnable(true); } if (dataSource.isAsyncCloseConnectionEnable()) { syncClose(); return; } for (ConnectionEventListener listener : holder.getConnectionEventListeners()) { listener.connectionClosed(new ConnectionEvent(this)); } List filters = dataSource.getProxyFilters(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(dataSource); filterChain.dataSource_recycle(this); } else { recycle(); } this.disable = true; }
这里我们再次看到了这个filter,通过filter进行回收,为啥这么做呢?其实也是为了检测,监控。。这里我们两个filter的实际都没做啥大动作,直接调用了连接的recycle()方法。
@Override public void dataSource_recycle(DruidPooledConnection connection) throws SQLException { if (this.pos < filterSize) { nextFilter().dataSource_releaseConnection(this, connection); return; } connection.recycle(); } public void recycle() throws SQLException { if (this.disable) { return; } DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } if (!this.abandoned) { DruidAbstractDataSource dataSource = holder.getDataSource(); dataSource.recycle(this); } this.holder = null; conn = null; transactionInfo = null; closed = true; }
回收前,先看下这个连接是不是被抛弃了,抛弃了那么就不回收了,因为抛弃的连接都是不能用的了,而且也close了,不用担心资源没关闭问题。然后还是掉用连接的recycle方法。
到达最终的回收前,还是先做一顿检查,然后判断连接是否需要回滚
// check need to rollback? if ((!isAutoCommit) && (!isReadOnly)) { pooledConnection.rollback(); }
然后调用holder的reset方法,将holder中的一些监控参数清理重置下。
holder.reset(); public void reset() throws SQLException { // reset default settings if (underlyingReadOnly != defaultReadOnly) { conn.setReadOnly(defaultReadOnly); underlyingReadOnly = defaultReadOnly; } if (underlyingHoldability != defaultHoldability) { conn.setHoldability(defaultHoldability); underlyingHoldability = defaultHoldability; } if (underlyingTransactionIsolation != defaultTransactionIsolation) { conn.setTransactionIsolation(defaultTransactionIsolation); underlyingTransactionIsolation = defaultTransactionIsolation; } if (underlyingAutoCommit != defaultAutoCommit) { conn.setAutoCommit(defaultAutoCommit); underlyingAutoCommit = defaultAutoCommit; } connectionEventListeners.clear(); statementEventListeners.clear(); lock.lock(); try { for (Object item : statementTrace.toArray()) { Statement stmt = (Statement) item; JdbcUtils.close(stmt); } statementTrace.clear(); } finally { lock.unlock(); } conn.clearWarnings(); }
然后再判断,当前最大使用数量是否超标了,(此时可能又创建了新的连接)超标了则不回收了。
if (holder.discard) { return; } if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) { discardConnection(holder); return; }
接着是testOnReturn,该参数是归还连接时执行validationQuery检测连接是否有效,如果开了,则会校验这个连接是不是还能用,不能用就不回收,然后关闭连接。
连接能用的话,则加锁,对监控参数处理,并将该holder放入连接池末尾:
lock.lock(); try { activeCount--; closeCount++; result = putLast(holder, currentTimeMillis); recycleCount++; } finally { lock.unlock(); }
放入末尾不成功则会将连接关了,同时结束整个回收的流程。
if (!result) { JdbcUtils.close(holder.conn); LOG.info("connection recyle failed."); }
放入成功,那么这个连接就被成功回收了,而且因为放入末尾,下次再来个连接就还会优先用它,这也是一个小设计。
至此,整个回收的逻辑就完了。
总体看下来,整个druid的运行逻辑就是,druid继承了datasource,statement,对这些核心参数用一个holder代理,并放入一个核心数组,这个数组就是连接池,接入spring工程后,mybatis从druid获取一个连接,并通过连接获取一个druid代理的preparedStatement,执行完成后,一层一层的关闭连接,在关闭连接时,如果连接还能用就归还给连接池,完成连接的复用。
同时使用一组责任链模式的filter,在建立连接,关闭连接等动作时,维护,监控一些数据,达到分析连接池中连接的目的。还维护了几个监听器,去监听连接数量的信号,去创建/销毁连接,保证核心连接池中随时有可用的连接。通过维护的监控的数据,我们可以通过控制台清晰的看到连接池的具体数量,以及判断大致的执行情况,分析我们业务的执行情况,以及是否健康。
源码分析基本到此就结束了,其他的也没啥核心逻辑了,后面在分析下,druid的设计上面的小心思,我们能从中学到什么来应用到我们自己的工程,光是这么看源码其实没啥意义,还是要多学别人的实现方式,以及如何写出优雅,拓展性强,可维护性强的代码。
版权声明:本文由星尘阁原创出品,转载请注明出处!