网站首页 文章专栏 深入druid之连接的维护与生命周期
通过前面的分析,我们知道,在druid中连接是用数组保存的,因为连接池是要配置最大可用连接数量的,所以也不用扩容,直接初始化最大就完事了,数组用起来更节省点空间。
// store private volatile DruidConnectionHolder[] connections;
同时druid还有两组非常重要的线程,一组用来创建线程保存到数组,一组用来销毁连接,并从数组移除。
// 创建连接类 public class CreateConnectionTask implements Runnable public class CreateConnectionThread extends Thread // 销毁连接类 public class DestroyTask implements Runnable public class DestroyConnectionThread extends Thread
为什么每组都有两个呢,这个后面再分析。
所有的连接都是上面的两个创建线程类实现,且使用一个reentrantlock,和一对condition去相互干预,保证创建线程不会一直创建,也不会一直不创建。
protected ReentrantLock lock; protected Condition notEmpty; protected Condition empty;
先补充复习下Lock与condition和countdownlatch
countDownLatch:
- countDownLatch是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier、Semaphore、concurrentHashMap和BlockingQueue。
- 存在于java.util.cucurrent包下。
- countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
- 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
Lock与condition:
- 在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait()、notify()系列方法可以实现等待/通知模式。在Java SE5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。
- Condition提供了一系列的方法来对阻塞和唤醒线程,常见的有await()和signal()。
- Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。
- Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
整体的流程前面也分析了,这里我们简单看下流程图:
CreateConnectionThread 该线程与DestroyConnectionThread 都是守护线程,默默在后端进行连接的创造与销毁,通过empty与notEmpty两个condition来控制什么时候该去创建连接。
public CreateConnectionThread(String name){ super(name); this.setDaemon(true); } public DestroyConnectionThread(String name){ super(name); this.setDaemon(true); }
且他俩在druid初始化的init时就会进行启动
// 同时启动还有一个log的线程 createAndLogThread(); createAndStartCreatorThread(); createAndStartDestroyThread(); initedLatch.await();
启动中会通过initedLatch,强制让两个线程都启动成功了再走下面步骤。initedLatch初始化为2,每个线程启动后都会减一。
protected void createAndStartCreatorThread() { if (createScheduler == null) { String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); createConnectionThread = new CreateConnectionThread(threadName); createConnectionThread.start(); return; } initedLatch.countDown(); }
创建线程启动后,会在线程内启动一个死循环,我们看下这个线程:
public class CreateConnectionThread extends Thread { public CreateConnectionThread(String name){ super(name); // 设置守护线程 this.setDaemon(true); } public void run() { // 启动后减一,等待销毁线程的也减一,才确保这两个线程都成功创建了 initedLatch.countDown(); long lastDiscardCount = 0; int errorCount = 0; // 死循环 for (;;) { // addLast try { lock.lockInterruptibly(); } catch (InterruptedException e2) { break; } long discardCount = DruidDataSource.this.discardCount; boolean discardChanged = discardCount - lastDiscardCount > 0; lastDiscardCount = discardCount; try { // 等待创建连接,默认true,表示要等待,不创建 boolean emptyWait = true; // 如果连接池是空的,则需要创建 if (createError != null && poolingCount == 0 && !discardChanged) { emptyWait = false; } // 如果开启异步创建,且创建的数量小于初始化大小,也需要创建 if (emptyWait && asyncInit && createCount < initialSize) { emptyWait = false; } if (emptyWait) { // 必须存在线程等待,才创建连接 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) && !isFailContinuous() ) { // 表示现在不需要创建,开始等待empty开启信号 empty.await(); } // 防止创建超过maxActive数量的连接 if (activeCount + poolingCount >= maxActive) { empty.await(); continue; } } } catch (InterruptedException e) { lastCreateError = e; lastErrorTimeMillis = System.currentTimeMillis(); if ((!closing) && (!closed)) { LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e); } break; } finally { lock.unlock(); } PhysicalConnectionInfo connection = null; try { // 创建物理连接 connection = createPhysicalConnection(); } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() + ", state " + e.getSQLState(), e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { break; } try { Thread.sleep(timeBetweenConnectErrorMillis); } catch (InterruptedException interruptEx) { break; } } } catch (RuntimeException e) { LOG.error("create connection RuntimeException", e); setFailContinuous(true); continue; } catch (Error e) { LOG.error("create connection Error", e); setFailContinuous(true); break; } if (connection == null) { continue; } // 没啥问题就保存到连接池 boolean result = put(connection); if (!result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); } errorCount = 0; // reset errorCount if (closing || closed) { break; } } } }
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) { DruidConnectionHolder holder = null; try { // 包装成holder holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo); } catch (SQLException ex) { lock.lock(); try { if (createScheduler != null) { clearCreateTask(physicalConnectionInfo.createTaskId); } } finally { lock.unlock(); } LOG.error("create connection holder error", ex); return false; } // 放入连接池 return put(holder, physicalConnectionInfo.createTaskId, false); }
放入连接池
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) { // 加锁,保证其他线程别同时放,覆盖了 lock.lock(); try { if (this.closing || this.closed) { return false; } // 连接池如果大于最大数量,那么清理这个创建任务 if (poolingCount >= maxActive) { if (createScheduler != null) { clearCreateTask(createTaskId); } return false; } // 检测是否已存在 if (checkExists) { for (int i = 0; i < poolingCount; i++) { if (connections[i] == holder) { return false; } } } // 放入连接池,并加1 connections[poolingCount] = holder; incrementPoolingCount(); if (poolingCount > poolingPeak) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } // 通知连接池现在不是空的了 notEmpty.signal(); notEmptySignalCount++; if (createScheduler != null) { // 如果创建线程不是null,那么清理创建队列的任务,因为这个方法从创建任务队列也可进入 clearCreateTask(createTaskId); if (poolingCount + createTaskCount < notEmptyWaitThreadCount // && activeCount + poolingCount + createTaskCount < maxActive) { emptySignal(); } } } finally { lock.unlock(); } return true; }
到此,就创建完了,但是上面我们提到除了这个守护线程一直在背后创建连接,还有一个实现了runnable的CreateConnectionTask,这个是干嘛的呢?其实它和守护线程创建连接基本是一样的,不同的点是它不是守护线程,一直在背后,而且需要通过手动触发,在一些场合我们需要立即创建一个连接的话,就可以提交到创建连接的任务队列,然后去创建一个连接。
public class CreateConnectionTask implements Runnable { private int errorCount = 0; private boolean initTask = false; private final long taskId; public CreateConnectionTask() { taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this); } public CreateConnectionTask(boolean initTask) { taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this); this.initTask = initTask; } @Override public void run() { runInternal(); } private void runInternal() { for (;;) { // addLast lock.lock(); try { if (closed || closing) { clearCreateTask(taskId); return; } boolean emptyWait = true; if (createError != null && poolingCount == 0) { emptyWait = false; } if (emptyWait) { // 必须存在线程等待,才创建连接 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive场景不能放弃创建 && (!initTask) // 线程池初始化时的任务不能放弃创建 && !isFailContinuous() // failContinuous时不能放弃创建,否则会无法创建线程 && !isOnFatalError() // onFatalError时不能放弃创建,否则会无法创建线程 ) { // 逻辑都一样,不同点在这,上面的守护线程会hang住,这里不会,直接清理掉当前任务队列,然后结束 clearCreateTask(taskId); return; } // 防止创建超过maxActive数量的连接 if (activeCount + poolingCount >= maxActive) { clearCreateTask(taskId); return; } } } finally { lock.unlock(); } PhysicalConnectionInfo physicalConnection = null; try { physicalConnection = createPhysicalConnection(); } catch (OutOfMemoryError e) { LOG.error("create connection OutOfMemoryError, out memory. ", e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } this.errorCount = 0; // reset errorCount if (closing || closed) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); return; } } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl, e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } this.errorCount = 0; // reset errorCount if (closing || closed) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); return; } } catch (RuntimeException e) { LOG.error("create connection RuntimeException", e); // unknow fatal exception setFailContinuous(true); continue; } catch (Error e) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } LOG.error("create connection Error", e); // unknow fatal exception setFailContinuous(true); break; } catch (Throwable e) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } LOG.error("create connection unexecpted error.", e); break; } if (physicalConnection == null) { continue; } physicalConnection.createTaskId = taskId; boolean result = put(physicalConnection); if (!result) { JdbcUtils.close(physicalConnection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); } break; } } }
核心区别点,就在于当连接不需要被创建时,不会hang住,等待唤醒,而且直接清除创建任务队列,然后结束。该线程的触发是在该方法中:
private void submitCreateTask(boolean initTask) { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask(initTask); if (createTasks == null) { createTasks = new long[8]; } boolean putted = false; for (int i = 0; i < createTasks.length; ++i) { if (createTasks[i] == 0) { createTasks[i] = task.taskId; putted = true; break; } } if (!putted) { long[] array = new long[createTasks.length * 3 / 2]; System.arraycopy(createTasks, 0, array, 0, createTasks.length); array[createTasks.length] = task.taskId; createTasks = array; } this.createSchedulerFuture = createScheduler.submit(task); }
提交一个创建连接的任务,用一个初始化长度为8的long数组表示任务队列,这里没太明白,这个队列有啥用,也起不到缓存的作用,只能是统计当前创建任务队列用?好像也没用到。
我们再来看下DestroyConnectionThread,这个线程作用就是用来一直扫描是否存在需要被回收处理的连接,满足条件则会被销毁,且如果连接池为空了就会通知CreateConnectionThread创建线程。
public class DestroyConnectionThread extends Thread { public DestroyConnectionThread(String name){ super(name); // 守护线程 this.setDaemon(true); } public void run() { initedLatch.countDown(); for (;;) { // 从前面开始删除 try { if (closed || closing) { break; } if (timeBetweenEvictionRunsMillis > 0) { Thread.sleep(timeBetweenEvictionRunsMillis); } else { Thread.sleep(1000); // } if (Thread.interrupted()) { break; } // 调用另一个task线程实际去执行 destroyTask.run(); } catch (InterruptedException e) { break; } } } }
public class DestroyTask implements Runnable { public DestroyTask() { } @Override public void run() { // 大体逻辑是扫描所有连接池中未活动的连接,判断满足条件的就保留,不满足条件的如超时等连接就会被销毁 shrink(true, keepAlive); if (isRemoveAbandoned()) { removeAbandoned(); } } }
public void shrink(boolean checkTime, boolean keepAlive) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return; } boolean needFill = false; int evictCount = 0; int keepAliveCount = 0; int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink; fatalErrorCountLastShrink = fatalErrorCount; try { if (!inited) { return; } final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); // 遍历所有连接 for (int i = 0; i < poolingCount; ++i) { DruidConnectionHolder connection = connections[i]; if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) { // 需要保活的就放入keepAliveConnections中 keepAliveConnections[keepAliveCount++] = connection; continue; } if (checkTime) { if (phyTimeoutMillis > 0) { long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue; } } long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break; } // 超过存活时间,则放入evictConnections if (idleMillis >= minEvictableIdleTimeMillis) { if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { evictConnections[evictCount++] = connection; continue; } } if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } } else { if (i < checkCount) { evictConnections[evictCount++] = connection; } else { break; } } } int removeCount = evictCount + keepAliveCount; if (removeCount > 0) { System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); poolingCount -= removeCount; } keepAliveCheckCount += keepAliveCount; if (keepAlive && poolingCount + activeCount < minIdle) { needFill = true; } } finally { lock.unlock(); } if (evictCount > 0) { // 遍历已经超时的连接,关闭连接,并重置evictConnections for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } Arrays.fill(evictConnections, null); } if (keepAliveCount > 0) { // keep order for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { // 保活的要进行检测连接可用性 this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } // skip } boolean discard = !validate; if (validate) { holer.lastKeepTimeMillis = System.currentTimeMillis(); boolean putOk = put(holer, 0L, true); if (!putOk) { discard = true; } } if (discard) { try { connection.close(); } catch (Exception e) { // skip } lock.lock(); try { discardCount++; if (activeCount + poolingCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } } } this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); Arrays.fill(keepAliveConnections, null); } if (needFill) { lock.lock(); try { // 如果需要填满,则计算差值,发起创建连接的任务,这个是创建一次性任务,并不是守护线程那个 int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); } } finally { lock.unlock(); } } else if (onFatalError || fatalErrorIncrement > 0) { lock.lock(); try { emptySignal(); } finally { lock.unlock(); } } }
看到这里有个疑问,在遍历每个连接的时候,如果发现超时了,就直接给关了连接,而不是在连接池移除,这样的话,这个holder还是存在的,但是连接已经被关闭了,只能在使用的时候发现不能用再抛弃掉,换一个连接。这样的话为什么不在这就给置为null,从连接池移除了呢,还省的后面再判断是否能用?
再回头看下线程:
public class DestroyTask implements Runnable { public DestroyTask() { } @Override public void run() { shrink(true, keepAlive); // 这段代码还没看 if (isRemoveAbandoned()) { removeAbandoned(); } } }
看了removeAbandoned(),这里是对连接是否泄露回收的处理。这里有几个配置:
- removeAbandoned:如果连接泄露,是否需要回收泄露的连接,默认false
- logAbandoned:如果回收了泄露的连接,是否要打印一条log,默认false;
- removeAbandonedTimeoutMillis:连接回收的超时时间,默认5分钟
意思就是是否开启自动清理被租借的连接但是又没有还回线程池,当我们借出去一个连接,这个连接有没有被正确使用是不知道的,加了这个配置,那么就会检测借出去的是否超时,超时了就认为泄露了,会回收掉。
// 遍历借出去的连接 Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator(); for (; iter.hasNext();) { DruidPooledConnection pooledConnection = iter.next(); // 判断该连接是否还在运行,只回收不运行的连接 // Druid会在连接执行query,update的时候设置为正在运行, // 并在回收后设置为不运行 if (pooledConnection.isRunning()) { continue; } long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000); //判断连接借出去的时间大小 if (timeMillis >= removeAbandonedTimeoutMillis) { iter.remove(); pooledConnection.setTraceEnable(false); abandonedList.add(pooledConnection); } } // 泄露的连接关了 if (abandonedList.size() > 0) { for (DruidPooledConnection pooledConnection : abandonedList) { final ReentrantLock lock = pooledConnection.lock; lock.lock(); try { if (pooledConnection.isDisable()) { continue; } } finally { lock.unlock(); } JdbcUtils.close(pooledConnection); pooledConnection.abandond(); removeAbandonedCount++; removeCount++;
同样,这里也只是将连接close掉,连接池数组中的holder并没有置空。
线程的创建维护,销毁就看到这,还有几个疑问,后面再看
1>. druid应用关闭的过程,如何将线程池的线程挨个关闭的,如果应用强杀了,不是优雅关闭,这些连接在mysql那边会关闭吗?
2>. 当druid连接池达到最大,且使用中的连接也达到最大了,相当于此时没有连接可用了,那么druid会怎么办?等待连接还是主动回收一个最先使用的连接
版权声明:本文由星尘阁原创出品,转载请注明出处!