序
本文主要研究一下druid的borrow行为
getConnection
com/alibaba/druid/pool/DruidDataSource.java
public DruidPooledConnection getConnection() throws SQLException {return getConnection(maxWait);}public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {init();if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(this);return filterChain.dataSource_connect(this, maxWaitMillis);} else {return getConnectionDirect(maxWaitMillis);}}
DruidDataSource的getConnection方法内部调用的是getConnectionDirect(maxWaitMillis)
getConnectionDirect
com/alibaba/druid/pool/DruidDataSource.java
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {int notFullTimeoutRetryCnt = 0;for (; ; ) {// handle notFullTimeoutRetryDruidPooledConnection poolableConnection;try {poolableConnection = getConnectionInternal(maxWaitMillis);} catch (GetConnectionTimeoutException ex) {if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {notFullTimeoutRetryCnt++;if (LOG.isWarnEnabled()) {LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);}continue;}throw ex;}if (testOnBorrow) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");}discardConnection(poolableConnection.holder);continue;}} else {if (poolableConnection.conn.isClosed()) {discardConnection(poolableConnection.holder); // 传入null,避免重复关闭continue;}if (testWhileIdle) {final DruidConnectionHolder holder = poolableConnection.holder;long currentTimeMillis = System.currentTimeMillis();long lastActiveTimeMillis = holder.lastActiveTimeMillis;long lastExecTimeMillis = holder.lastExecTimeMillis;long lastKeepTimeMillis = holder.lastKeepTimeMillis;if (checkExecuteTime&& lastExecTimeMillis != lastActiveTimeMillis) {lastActiveTimeMillis = lastExecTimeMillis;}if (lastKeepTimeMillis > lastActiveTimeMillis) {lastActiveTimeMillis = lastKeepTimeMillis;}long idleMillis = currentTimeMillis - lastActiveTimeMillis;long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;if (timeBetweenEvictionRunsMillis <= 0) {timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;}if (idleMillis >= timeBetweenEvictionRunsMillis|| idleMillis < 0 // unexcepted branch) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");}discardConnection(poolableConnection.holder);continue;}}}}if (removeAbandoned) {StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();poolableConnection.connectStackTrace = stackTrace;poolableConnection.setConnectedTimeNano();poolableConnection.traceEnable = true;activeConnectionLock.lock();try {activeConnections.put(poolableConnection, PRESENT);} finally {activeConnectionLock.unlock();}}if (!this.defaultAutoCommit) {poolableConnection.setAutoCommit(false);}return poolableConnection;}}public boolean isFull() {lock.lock();try {return this.poolingCount + this.activeCount >= this.maxActive;} finally {lock.unlock();}}
getConnectionDirect在一个for循环里头进行获取连接,首先执行getConnectionInternal(maxWaitMillis),若出现GetConnectionTimeoutException异常,则在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount时会递增notFullTimeoutRetryCnt,然后continue继续循环,否则直接抛出GetConnectionTimeoutException跳出循环
获取到连接之后,判断是否是testOnBorrow,如果是则执行testConnectionInternal,若校验不成功则执行discardConnection,然后继续循环;若非testOnBorrow则判断conn是否closed,若是则执行discardConnection,然后继续循环,若非closed则进入testWhileIdle的逻辑(
druid直接在getConnection的时候执行testWhileIdle有点令人匪夷所思
)
最后是removeAbandoned,维护connectedTimeNano,将当前连接放到activeConnections中
getConnectionInternal
com/alibaba/druid/pool/DruidDataSource.java
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {if (closed) {connectErrorCountUpdater.incrementAndGet(this);throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));}if (!enable) {connectErrorCountUpdater.incrementAndGet(this);if (disableException != null) {throw disableException;}throw new DataSourceDisableException();}final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);final int maxWaitThreadCount = this.maxWaitThreadCount;DruidConnectionHolder holder;for (boolean createDirect = false; ; ) {if (createDirect) {createStartNanosUpdater.set(this, System.nanoTime());if (creatingCountUpdater.compareAndSet(this, 0, 1)) {PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();holder = new DruidConnectionHolder(this, pyConnInfo);holder.lastActiveTimeMillis = System.currentTimeMillis();creatingCountUpdater.decrementAndGet(this);directCreateCountUpdater.incrementAndGet(this);if (LOG.isDebugEnabled()) {LOG.debug("conn-direct_create ");}boolean discard;lock.lock();try {if (activeCount < maxActive) {activeCount++;holder.active = true;if (activeCount > activePeak) {activePeak = activeCount;activePeakTime = System.currentTimeMillis();}break;} else {discard = true;}} finally {lock.unlock();}if (discard) {JdbcUtils.close(pyConnInfo.getPhysicalConnection());}}}try {lock.lockInterruptibly();} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);throw new SQLException("interrupt", e);}try {if (maxWaitThreadCount > 0&& notEmptyWaitThreadCount >= maxWaitThreadCount) {connectErrorCountUpdater.incrementAndGet(this);throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "+ lock.getQueueLength());}if (onFatalError&& onFatalErrorMaxActive > 0&& activeCount >= onFatalErrorMaxActive) {connectErrorCountUpdater.incrementAndGet(this);StringBuilder errorMsg = new StringBuilder();errorMsg.append("onFatalError, activeCount ").append(activeCount).append(", onFatalErrorMaxActive ").append(onFatalErrorMaxActive);if (lastFatalErrorTimeMillis > 0) {errorMsg.append(", time '").append(StringUtils.formatDateTime19(lastFatalErrorTimeMillis, TimeZone.getDefault())).append("'");}if (lastFatalErrorSql != null) {errorMsg.append(", sql \n").append(lastFatalErrorSql);}throw new SQLException(errorMsg.toString(), lastFatalError);}connectCount++;if (createScheduler != null&& poolingCount == 0&& activeCount < maxActive&& creatingCountUpdater.get(this) == 0&& createScheduler instanceof ScheduledThreadPoolExecutor) {ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;if (executor.getQueue().size() > 0) {createDirect = true;continue;}}if (maxWait > 0) {holder = pollLast(nanos);} else {holder = takeLast();}if (holder != null) {if (holder.discard) {continue;}activeCount++;holder.active = true;if (activeCount > activePeak) {activePeak = activeCount;activePeakTime = System.currentTimeMillis();}}} catch (InterruptedException e) {connectErrorCountUpdater.incrementAndGet(this);throw new SQLException(e.getMessage(), e);} catch (SQLException e) {connectErrorCountUpdater.incrementAndGet(this);throw e;} finally {lock.unlock();}break;}if (holder == null) {long waitNanos = waitNanosLocal.get();final long activeCount;final long maxActive;final long creatingCount;final long createStartNanos;final long createErrorCount;final Throwable createError;try {lock.lock();activeCount = this.activeCount;maxActive = this.maxActive;creatingCount = this.creatingCount;createStartNanos = this.createStartNanos;createErrorCount = this.createErrorCount;createError = this.createError;} finally {lock.unlock();}StringBuilder buf = new StringBuilder(128);buf.append("wait millis ").append(waitNanos / (1000 * 1000)).append(", active ").append(activeCount).append(", maxActive ").append(maxActive).append(", creating ").append(creatingCount);if (creatingCount > 0 && createStartNanos > 0) {long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);if (createElapseMillis > 0) {buf.append(", createElapseMillis ").append(createElapseMillis);}}if (createErrorCount > 0) {buf.append(", createErrorCount ").append(createErrorCount);}List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();for (int i = 0; i < sqlList.size(); ++i) {if (i != 0) {buf.append('\n');} else {buf.append(", ");}JdbcSqlStatValue sql = sqlList.get(i);buf.append("runningSqlCount ").append(sql.getRunningCount());buf.append(" : ");buf.append(sql.getSql());}String errorMessage = buf.toString();if (createError != null) {throw new GetConnectionTimeoutException(errorMessage, createError);} else {throw new GetConnectionTimeoutException(errorMessage);}}holder.incrementUseCount();DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);return poolalbeConnection;}
getConnectionInternal方法先判断是否closed,如果是则抛出DataSourceClosedException,接着判断是否enable,如果不是则抛出DataSourceDisableException,紧接着for循环,它主要根据createDirect来执行不同逻辑,第一次默认createDirect为false;
createDirect为false,对于notEmptyWaitThreadCount大于等于maxWaitThreadCount则抛出SQLException,对于poolingCount为0且activeCount小于maxActive,createScheduler的queue大小大于0的,则设置createDirect为true;否则对于maxWait大于0的,执行pollLast(nanos),否则执行takeLast()
createDirect为true,会通过DruidDataSource.this.createPhysicalConnection()创建物理连接,对于activeCount小于maxActive的,则维护activeCount跳出循环,否则标记discard为true,通过JdbcUtils.close(pyConnInfo.getPhysicalConnection())关闭连接
pollLast
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {long estimate = nanos;for (; ; ) {if (poolingCount == 0) {emptySignal(); // send signal to CreateThread create connectionif (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);}if (estimate <= 0) {waitNanosLocal.set(nanos - estimate);return null;}notEmptyWaitThreadCount++;if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}try {long startEstimate = estimate;estimate = notEmpty.awaitNanos(estimate); // signal by// recycle or// creatornotEmptyWaitCount++;notEmptyWaitNanos += (startEstimate - estimate);if (!enable) {connectErrorCountUpdater.incrementAndGet(this);if (disableException != null) {throw disableException;}throw new DataSourceDisableException();}} catch (InterruptedException ie) {notEmpty.signal(); // propagate to non-interrupted threadnotEmptySignalCount++;throw ie;} finally {notEmptyWaitThreadCount--;}if (poolingCount == 0) {if (estimate > 0) {continue;}waitNanosLocal.set(nanos - estimate);return null;}}decrementPoolingCount();DruidConnectionHolder last = connections[poolingCount];connections[poolingCount] = null;long waitNanos = nanos - estimate;last.setLastNotEmptyWaitNanos(waitNanos);return last;}}
pollLast方法在poolingCount为0时执行emptySignal,另外主要是处理notEmpty这个condition,然后取connections[poolingCount]
takeLast
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {try {while (poolingCount == 0) {emptySignal(); // send signal to CreateThread create connectionif (failFast && isFailContinuous()) {throw new DataSourceNotAvailableException(createError);}notEmptyWaitThreadCount++;if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {notEmptyWaitThreadPeak = notEmptyWaitThreadCount;}try {notEmpty.await(); // signal by recycle or creator} finally {notEmptyWaitThreadCount--;}notEmptyWaitCount++;if (!enable) {connectErrorCountUpdater.incrementAndGet(this);if (disableException != null) {throw disableException;}throw new DataSourceDisableException();}}} catch (InterruptedException ie) {notEmpty.signal(); // propagate to non-interrupted threadnotEmptySignalCount++;throw ie;}decrementPoolingCount();DruidConnectionHolder last = connections[poolingCount];connections[poolingCount] = null;return last;}
takeLast方法在poolingCount为0的时候执行emptySignal,然后通过notEmpty.await()进行阻塞等待,最后返回connections[poolingCount]
emptySignal
private void emptySignal() {if (createScheduler == null) {empty.signal();return;}if (createTaskCount >= maxCreateTaskCount) {return;}if (activeCount + poolingCount + createTaskCount >= maxActive) {return;}submitCreateTask(false);}
emptySignal方法,对于createScheduler为null的执行empty.signal(),之后判断task数量即maxActive判断,最后执行submitCreateTask(false)
submitCreateTask
private void submitCreateTask(boolean initTask) {createTaskCount++;CreateConnectionTask task = new CreateConnectionTask(initTask);if (createTasks == null) {createTasks = new long[8];}boolean putted = false;for (int i = 0; i < createTasks.length; ++i) {if (createTasks[i] == 0) {createTasks[i] = task.taskId;putted = true;break;}}if (!putted) {long[] array = new long[createTasks.length * 3 / 2];System.arraycopy(createTasks, 0, array, 0, createTasks.length);array[createTasks.length] = task.taskId;createTasks = array;}this.createSchedulerFuture = createScheduler.submit(task);}
submitCreateTask会创建CreateConnectionTask,然后提交到createScheduler执行
CreateConnectionTask
com/alibaba/druid/pool/DruidDataSource.java
public class CreateConnectionTask implements Runnable {private int errorCount;private boolean initTask;private final long taskId;public CreateConnectionTask() {taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);}public CreateConnectionTask(boolean initTask) {taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);this.initTask = initTask;}@Overridepublic void run() {runInternal();}private void runInternal() {for (; ; ) {// addLastlock.lock();try {if (closed || closing) {clearCreateTask(taskId);return;}boolean emptyWait = true;if (createError != null && poolingCount == 0) {emptyWait = false;}if (emptyWait) {// 必须存在线程等待,才创建连接if (poolingCount >= notEmptyWaitThreadCount //&& (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive场景不能放弃创建&& (!initTask) // 线程池初始化时的任务不能放弃创建&& !isFailContinuous() // failContinuous时不能放弃创建,否则会无法创建线程&& !isOnFatalError() // onFatalError时不能放弃创建,否则会无法创建线程) {clearCreateTask(taskId);return;}// 防止创建超过maxActive数量的连接if (activeCount + poolingCount >= maxActive) {clearCreateTask(taskId);return;}}} finally {lock.unlock();}PhysicalConnectionInfo physicalConnection = null;try {physicalConnection = createPhysicalConnection();} catch (OutOfMemoryError e) {LOG.error("create connection OutOfMemoryError, out memory. ", e);errorCount++;if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {// fail over retry attemptssetFailContinuous(true);if (failFast) {lock.lock();try {notEmpty.signalAll();} finally {lock.unlock();}}if (breakAfterAcquireFailure) {lock.lock();try {clearCreateTask(taskId);} finally {lock.unlock();}return;}this.errorCount = 0; // reset errorCountif (closing || closed) {lock.lock();try {clearCreateTask(taskId);} finally {lock.unlock();}return;}createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);return;}} catch (SQLException e) {LOG.error("create connection SQLException, url: " + jdbcUrl, e);errorCount++;if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {// fail over retry attemptssetFailContinuous(true);if (failFast) {lock.lock();try {notEmpty.signalAll();} finally {lock.unlock();}}if (breakAfterAcquireFailure) {lock.lock();try {clearCreateTask(taskId);} finally {lock.unlock();}return;}this.errorCount = 0; // reset errorCountif (closing || closed) {lock.lock();try {clearCreateTask(taskId);} finally {lock.unlock();}return;}createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);return;}} catch (RuntimeException e) {LOG.error("create connection RuntimeException", e);// unknow fatal exceptionsetFailContinuous(true);continue;} catch (Error e) {lock.lock();try {clearCreateTask(taskId);} finally {lock.unlock();}LOG.error("create connection Error", e);// unknow fatal exceptionsetFailContinuous(true);break;} catch (Throwable e) {lock.lock();try {clearCreateTask(taskId);} finally {lock.unlock();}LOG.error("create connection unexecpted error.", e);break;}if (physicalConnection == null) {continue;}physicalConnection.createTaskId = taskId;boolean result = put(physicalConnection);if (!result) {JdbcUtils.close(physicalConnection.getPhysicalConnection());LOG.info("put physical connection to pool failed.");}break;}}}
CreateConnectionTask通过for循环,然后加锁处理minIdle及maxActive,最后通过createPhysicalConnection创建物理连接
createPhysicalConnection
com/alibaba/druid/pool/DruidAbstractDataSource.java
public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {String url = this.getUrl();Properties connectProperties = getConnectProperties();String user;if (getUserCallback() != null) {user = getUserCallback().getName();} else {user = getUsername();}String password = getPassword();PasswordCallback passwordCallback = getPasswordCallback();if (passwordCallback != null) {if (passwordCallback instanceof DruidPasswordCallback) {DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;druidPasswordCallback.setUrl(url);druidPasswordCallback.setProperties(connectProperties);}char[] chars = passwordCallback.getPassword();if (chars != null) {password = new String(chars);}}Properties physicalConnectProperties = new Properties();if (connectProperties != null) {physicalConnectProperties.putAll(connectProperties);}if (user != null && user.length() != 0) {physicalConnectProperties.put("user", user);}if (password != null && password.length() != 0) {physicalConnectProperties.put("password", password);}Connection conn = null;long connectStartNanos = System.nanoTime();long connectedNanos, initedNanos, validatedNanos;Map<String, Object> variables = initVariants? new HashMap<String, Object>(): null;Map<String, Object> globalVariables = initGlobalVariants? new HashMap<String, Object>(): null;createStartNanosUpdater.set(this, connectStartNanos);creatingCountUpdater.incrementAndGet(this);try {conn = createPhysicalConnection(url, physicalConnectProperties);connectedNanos = System.nanoTime();if (conn == null) {throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);}initPhysicalConnection(conn, variables, globalVariables);initedNanos = System.nanoTime();validateConnection(conn);validatedNanos = System.nanoTime();setFailContinuous(false);setCreateError(null);} catch (SQLException ex) {setCreateError(ex);JdbcUtils.close(conn);throw ex;} catch (RuntimeException ex) {setCreateError(ex);JdbcUtils.close(conn);throw ex;} catch (Error ex) {createErrorCountUpdater.incrementAndGet(this);setCreateError(ex);JdbcUtils.close(conn);throw ex;} finally {long nano = System.nanoTime() - connectStartNanos;createTimespan += nano;creatingCountUpdater.decrementAndGet(this);}return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);}
createPhysicalConnection通过try catch去创建物理连接,若有异常则会通过JdbcUtils.close(conn)去关闭连接
testConnectionInternal
protected boolean testConnectionInternal(DruidConnectionHolder holder, Connection conn) {String sqlFile = JdbcSqlStat.getContextSqlFile();String sqlName = JdbcSqlStat.getContextSqlName();if (sqlFile != null) {JdbcSqlStat.setContextSqlFile(null);}if (sqlName != null) {JdbcSqlStat.setContextSqlName(null);}try {if (validConnectionChecker != null) {boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);long currentTimeMillis = System.currentTimeMillis();if (holder != null) {holder.lastValidTimeMillis = currentTimeMillis;holder.lastExecTimeMillis = currentTimeMillis;}if (valid && isMySql) { // unexcepted branchlong lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);if (lastPacketReceivedTimeMs > 0) {long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;if (lastPacketReceivedTimeMs > 0 //&& mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {discardConnection(holder);String errorMsg = "discard long time none received connection. "+ ", jdbcUrl : " + jdbcUrl+ ", version : " + VERSION.getVersionNumber()+ ", lastPacketReceivedIdleMillis : " + mysqlIdleMillis;LOG.warn(errorMsg);return false;}}}if (valid && onFatalError) {lock.lock();try {if (onFatalError) {onFatalError = false;}} finally {lock.unlock();}}return valid;}if (conn.isClosed()) {return false;}if (null == validationQuery) {return true;}Statement stmt = null;ResultSet rset = null;try {stmt = conn.createStatement();if (getValidationQueryTimeout() > 0) {stmt.setQueryTimeout(validationQueryTimeout);}rset = stmt.executeQuery(validationQuery);if (!rset.next()) {return false;}} finally {JdbcUtils.close(rset);JdbcUtils.close(stmt);}if (onFatalError) {lock.lock();try {if (onFatalError) {onFatalError = false;}} finally {lock.unlock();}}return true;} catch (Throwable ex) {// skipreturn false;} finally {if (sqlFile != null) {JdbcSqlStat.setContextSqlFile(sqlFile);}if (sqlName != null) {JdbcSqlStat.setContextSqlName(sqlName);}}}
testConnectionInternal主要通过validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout)来校验连接,如果validConnectionChecker为null则通过jdbc执行validationQuery进行校验
discardConnection
public void discardConnection(DruidConnectionHolder holder) {if (holder == null) {return;}Connection conn = holder.getConnection();if (conn != null) {JdbcUtils.close(conn);}lock.lock();try {if (holder.discard) {return;}if (holder.active) {activeCount--;holder.active = false;}discardCount++;holder.discard = true;if (activeCount <= minIdle) {emptySignal();}} finally {lock.unlock();}}
discardConnection方法主要是关闭connection,之后枷锁处理一些统计标记
小结
DruidDataSource的getConnection方法内部调用的是getConnectionDirect(maxWaitMillis)
getConnectionDirect在一个for循环里头进行获取连接,首先执行getConnectionInternal(maxWaitMillis),若出现GetConnectionTimeoutException异常,则在notFull且notFullTimeoutRetryCnt小于等于this.notFullTimeoutRetryCount时会递增notFullTimeoutRetryCnt,然后continue继续循环,否则直接抛出GetConnectionTimeoutException跳出循环
获取到连接之后,判断是否是testOnBorrow,如果是则执行testConnectionInternal,若校验不成功则执行discardConnection,然后继续循环;若非testOnBorrow则判断conn是否closed,若是则执行discardConnection,然后继续循环,若非closed则进入testWhileIdle的逻辑
最后是removeAbandoned,维护connectedTimeNano,将当前连接放到activeConnections中
整体代码看下来感觉跟commons-pool相比,druid代码的实现感觉有点粗糙,抽象层级不够高,代码充斥大量统计标记、状态位的处理,维护起来得很小心,另外druid直接在getConnection的时候执行testWhileIdle有点令人匪夷所思