对象池?
让任意对象实现池功能,只要结合使用两个类GenericObjectPool
和PooledObjectFactory
,这个池子可以实现:
(1)minIdle个数保证:通过配置,测试并清除池子中的空闲对象,以保证有 minIdle 对象在池子中。这个清除操作是由一个 “idle object eviction” 线程异步执行的。但配置清除功能时要十分小心:因为清除线程会和客户端线程竞争池子中对象的访问,所以如果频繁触发清除对象,可能会导致性能问题。 在清除空闲对象时,还可以配置成发现并移除 “abandoned” 状态的对象
(2)实现细节:GenericObjectPool
类是线程安全的
(3)关键参数
// info 池中多有状态的对象,是一个并发 HashMap。为的是能快速找到池子中的对象而没用 list 结构
// allObjects 中的对象个数永远 <= _maxActive
private final ConcurrentHashMap<IdentityWrapper<T>, PooledObject<T>> allObjects = new ConcurrentHashMap<>();// 已创建和创建中的对象个数,该数值可能会超过 maxActive,但 create() 方法会确保创建的对象个数不会超过 maxActivemaxActive
private final AtomicLong createCount = new AtomicLong();private long makeObjectCount;private final Object makeObjectCountLock = new Object(); // 空闲对象的 queue: 线程安全的双端队列(可以配置先进先出或后进先出,用来控制优先使用新对象还是老对象)private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
关键方法?
- 对象创建:borrowObject()时会创建对象,主要是 create() 方法完成
private PooledObject<T> create() throws Exception {System.out.println(Thread.currentThread().getName()+" => "+createdCount.get()+"|"+makeObjectCount+"|"+createCount.get());int localMaxTotal = getMaxTotal();// This simplifies the code later in this methodif (localMaxTotal < 0) {localMaxTotal = Integer.MAX_VALUE;}final Instant localStartInstant = Instant.now();final Duration maxWaitDurationRaw = getMaxWaitDuration();final Duration localMaxWaitDuration = maxWaitDurationRaw.isNegative() ? Duration.ZERO : maxWaitDurationRaw;// Flag that indicates if create should:// - TRUE: call the factory to create an object// - FALSE: return null// - null: loop and re-test the condition that determines whether to// call the factoryBoolean create = null; // 用于循环检测while (create == null) {// 同步代码块只锁计数器,没有锁后面的创建对象过程。所以创建对象是并发的。只要正在创建对象的线程数,小于设置的maxTotalCount,就可以不用等待并发创建对象synchronized (makeObjectCountLock) {final long newCreateCount = createCount.incrementAndGet(); // createCount: 线程安全的计数器,也相当于给线程编号System.out.println("【synchronized (makeObjectCountLock)】"+Thread.currentThread().getName()+":"+createCount.get());if (newCreateCount > localMaxTotal) {// The pool is currently at capacity or in the process of// making enough new objects to take it to capacity.//池子中对象个数已经达到上限,不用该线程再创建对象,所以先把创建中的个数 -1createCount.decrementAndGet();System.out.println("newCreateCount > localMaxTotal");// makeObjectCount 是非线程安全的计数器,表示调用工厂的 makeObject() 方法的个数// 因为该方法,整个在 synchronized (makeObjectCountLock) 锁的同步下,所以 makeObjectCount 就是真正的调用次数// makeObjectCount = 0 表示之前的线程全都正确的创建出了对象,因此该线程不用再去创建if (makeObjectCount == 0) {// There are no makeObject() calls in progress so the// pool is at capacity. Do not attempt to create a new// object. Return and wait for an object to be returned// makeObjectCount是0create = Boolean.FALSE;} else {// There are makeObject() calls in progress that might// bring the pool to capacity. Those calls might also// fail so wait until they complete and then re-test if// the pool is at capacity or not.// makeObjectCount != 0 表示之前的线程还没创建完对象(因为创建对象可能很耗时间),有可能会创建失败,// 所以该线程调用 wait() 释放锁,让前面的线程可以继续创建,该线程等待 notify 补位wait(makeObjectCountLock, localMaxWaitDuration);}} else {// The pool is not at capacity. Create a new object.// createCount < maxTotal,于是可以创建。makeObjectCount 是在同步代码块里增加的makeObjectCount++;create = Boolean.TRUE;}}// Do not block more if maxWaitTimeMillis is set.// create = null 的情况,发生在之前的线程创建对象失败后, notify 了该线程;或是该线程 wait 到了时间,自动唤醒// localMaxWaitDuration.compareTo(Duration.ZERO) > 0 :设置了超时时间// Duration.between(localStartInstant, Instant.now()).compareTo(localMaxWaitDuration) >= 0: 检测确实是是 wait 超时了,自动唤醒后拿到锁if (create == null && localMaxWaitDuration.compareTo(Duration.ZERO) > 0 &&Duration.between(localStartInstant, Instant.now()).compareTo(localMaxWaitDuration) >= 0) {create = Boolean.FALSE;}}// 不用创建对象,返回空if (!create.booleanValue()) {return null;}// 需要创建对象final PooledObject<T> p;try {p = factory.makeObject();// 创建出了空对象:失败,抛出异常,走 catch .. finally: createCount减2(catch那里还会把 createCount 再减一次),makeObjectCount减1,并唤醒其他等待 makeObjectCountLock 锁的线程// createCount减2:1个是这个创建对象的线程推出了,2是因为上面有 “createCount > localMaxTotal” 的判断,相当于在线程繁忙时,再让出一个计数器,呼吁新的线程代替这个线程可以并发创建对象。少等待一个成功的线程再if (PooledObject.isNull(p)) {createCount.decrementAndGet();System.out.println("【null】"+Thread.currentThread().getName()+":"+createCount.get());throw new NullPointerException(String.format("%s.makeObject() = null", factory.getClass().getSimpleName()));}System.out.println("【not null】"+Thread.currentThread().getName()+":"+createCount.get());// 创建的对象校验未通过:失败。 createCount 减 1,不走 catch ,只走 finallyif (getTestOnCreate() && !factory.validateObject(p)) {createCount.decrementAndGet();return null;}} catch (final Throwable e) {createCount.decrementAndGet();System.out.println("【catch】"+Thread.currentThread().getName()+":"+createCount.get());throw e;} finally {// 减小了锁粒度:只锁计数makeObjectCount,不锁创建对象synchronized (makeObjectCountLock) {makeObjectCount--;makeObjectCountLock.notifyAll();}System.out.println("【finally】"+Thread.currentThread().getName()+":"+createCount.get());}final AbandonedConfig ac = this.abandonedConfig;if (ac != null && ac.getLogAbandoned()) {p.setLogAbandoned(true);p.setRequireFullStackTrace(ac.getRequireFullStackTrace());}createdCount.incrementAndGet(); // (区分createdCount 和 createCount)以创建的对象数量allObjects.put(new IdentityWrapper<>(p.getObject()), p);System.out.println("【return】"+Thread.currentThread().getName()+":"+createCount.get());return p;
}
- 对象归还
/*** {@inheritDoc}* <p>* If {@link #getMaxIdle() maxIdle} is set to a positive value and the* number of idle instances has reached this value, the returning instance* is destroyed.* 当归还对象是时,如果发现 idle 实例的个数达到了 maxIdle,那么归还的对象会被销毁* </p>* <p>* If {@link #getTestOnReturn() testOnReturn} == true, the returning* instance is validated before being returned to the idle instance pool. In* this case, if validation fails, the instance is destroyed.* 如果 testOnReturn 设置为 true,那么在归还对象之前,会先验证对象的有效性* </p>* <p>* Exceptions encountered destroying objects for any reason are swallowed* but notified via a {@link SwallowedExceptionListener}.* </p>*/
@Override
public void returnObject(final T obj) {final PooledObject<T> p = getPooledObject(obj);if (p == null) {if (!isAbandonedConfig()) {throw new IllegalStateException("Returned object not currently part of this pool");}return; // Object was abandoned and removed}//(1) 从 allObjects 这个 ConcurrentHashMap<IdentityWrapper<T>, PooledObject<T>> 中找到归还的对象,// 将他的状态改为 PooledObjectState.RETURNING (DefaultPooledObject)markReturningState(p);final Duration activeTime = p.getActiveDuration();// (2) 如果配置了 testOnReturn 且归还时对象校验失败,就销毁这个对象并创建一个新对象放到 idleObjects 中if (getTestOnReturn() && !factory.validateObject(p)) {try {destroy(p, DestroyMode.NORMAL);} catch (final Exception e) {swallowException(e);}try {ensureIdle(1, false);} catch (final Exception e) {swallowException(e);}updateStatsReturn(activeTime);return;}try {// (3)对象执行失效动作(Factory中一般写个空方法)factory.passivateObject(p);} catch (final Exception e1) {swallowException(e1);try {destroy(p, DestroyMode.NORMAL);} catch (final Exception e) {swallowException(e);}try {ensureIdle(1, false);} catch (final Exception e) {swallowException(e);}updateStatsReturn(activeTime);return;}if (!p.deallocate()) {throw new IllegalStateException("Object has already been returned to this pool or is invalid");}final int maxIdleSave = getMaxIdle();// (4)归还对象时,如果超过 maxIdleSave,就销毁该对象(maxIdleSave <= idleObjects.size())if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {try {destroy(p, DestroyMode.NORMAL);} catch (final Exception e) {swallowException(e);}try {ensureIdle(1, false);} catch (final Exception e) {swallowException(e);}} else {// (5)idleObjects 中加入归还的对象if (getLifo()) {idleObjects.addFirst(p);} else {idleObjects.addLast(p);}if (isClosed()) {// Pool closed while object was being added to idle objects.// Make sure the returned object is destroyed rather than left// in the idle object pool (which would effectively be a leak)clear();}}updateStatsReturn(activeTime);
}
- 清除线程保证 minIdle
// BaseGenericObjectPool 会启动一个 Evictor 线程,确保对象池中有 minIdle 个对象
@Override
public void evict() throws Exception {assertOpen();if (!idleObjects.isEmpty()) {PooledObject<T> underTest = null;final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();synchronized (evictionLock) {final EvictionConfig evictionConfig = new EvictionConfig(getMinEvictableIdleDuration(),getSoftMinEvictableIdleDuration(),getMinIdle());final boolean testWhileIdle = getTestWhileIdle();for (int i = 0, m = getNumTests(); i < m; i++) {if (evictionIterator == null || !evictionIterator.hasNext()) {evictionIterator = new EvictionIterator(idleObjects);}if (!evictionIterator.hasNext()) {// Pool exhausted, nothing to do herereturn;}try {underTest = evictionIterator.next();} catch (final NoSuchElementException e) {// Object was borrowed in another thread// Don't count this as an eviction test so reduce i;i--;evictionIterator = null;continue;}if (!underTest.startEvictionTest()) {// Object was borrowed in another thread// Don't count this as an eviction test so reduce i;i--;continue;}// User provided eviction policy could throw all sorts of// crazy exceptions. Protect against such an exception// killing the eviction thread.boolean evict;try {evict = evictionPolicy.evict(evictionConfig, underTest,idleObjects.size());} catch (final Throwable t) {// Slightly convoluted as SwallowedExceptionListener// uses Exception rather than ThrowablePoolUtils.checkRethrow(t);swallowException(new Exception(t));// Don't evict on error conditionsevict = false;}if (evict) {destroy(underTest, DestroyMode.NORMAL);destroyedByEvictorCount.incrementAndGet();} else {if (testWhileIdle) {boolean active = false;try {factory.activateObject(underTest);active = true;} catch (final Exception e) {destroy(underTest, DestroyMode.NORMAL);destroyedByEvictorCount.incrementAndGet();}if (active) {boolean validate = false;Throwable validationThrowable = null;try {validate = factory.validateObject(underTest);} catch (final Throwable t) {PoolUtils.checkRethrow(t);validationThrowable = t;}if (!validate) {destroy(underTest, DestroyMode.NORMAL);destroyedByEvictorCount.incrementAndGet();if (validationThrowable != null) {if (validationThrowable instanceof RuntimeException) {throw (RuntimeException) validationThrowable;}throw (Error) validationThrowable;}} else {try {factory.passivateObject(underTest);} catch (final Exception e) {destroy(underTest, DestroyMode.NORMAL);destroyedByEvictorCount.incrementAndGet();}}}}underTest.endEvictionTest(idleObjects);// TODO - May need to add code here once additional// states are used}}}}final AbandonedConfig ac = this.abandonedConfig;// 清除掉被丢弃的对象if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {removeAbandoned(ac);}
}/*** Tries to ensure that {@code idleCount} idle instances exist in the pool.* 保证池子中的 idle 实例达到目标个数* <p>* Creates and adds idle instances until either {@link #getNumIdle()} reaches {@code idleCount}* or the total number of objects (idle, checked out, or being created) reaches* {@link #getMaxTotal()}. If {@code always} is false, no instances are created unless* there are threads waiting to check out instances from the pool.* </p>* <p>* If the factory returns null when creating an instance, a {@code NullPointerException}* is thrown.* </p>** @param idleCount the number of idle instances desired* @param always true means create instances even if the pool has no threads waiting* @throws Exception if the factory's makeObject throws*/
private void ensureIdle(final int idleCount, final boolean always) throws Exception {if (idleCount < 1 || isClosed() || !always && !idleObjects.hasTakeWaiters()) {return;}while (idleObjects.size() < idleCount) {final PooledObject<T> p = create();if (PooledObject.isNull(p)) {// Can't create objects, no reason to think another call to// create will work. Give up.break;}if (getLifo()) {idleObjects.addFirst(p);} else {idleObjects.addLast(p);}}if (isClosed()) {// Pool closed while object was being added to idle objects.// Make sure the returned object is destroyed rather than left// in the idle object pool (which would effectively be a leak)clear();}
}@Override
void ensureMinIdle() throws Exception {ensureIdle(getMinIdle(), true);
}