但是,有些对象在创建时肯定会付出高昂的代价。 诸如线程,数据库连接对象等对象不是轻量级对象,并且创建起来会稍微贵一些。 在任何应用程序中,我们都需要使用上述多种对象。 因此,如果有一种非常容易的方法可以轻松创建和维护该类型的对象池,那么可以动态使用和重用对象,而不必担心客户端代码对对象的生存周期的影响,那就太好了。
在实际编写对象池的代码之前,让我们首先确定任何对象池必须回答的主要要求。
- 池必须允许客户端使用对象(如果有)。
- 一旦客户端将对象返回到池中,它必须重新使用对象。
- 如果需要,它必须能够创建更多对象以满足客户不断增长的需求。
- 它必须提供适当的关闭机制,以便在关闭时不会发生内存泄漏。
毋庸置疑,以上几点将构成我们向客户公开的界面的基础。
因此,我们的接口声明如下:
package com.test.pool;/*** Represents a cached pool of objects.* * @author Swaranga** @param < T > the type of object to pool.*/
public interface Pool< T >
{/*** Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation.* * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available.* In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown* when the thread waits for an object to become available.* * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not.* If any object is available the call returns it * else the call returns < code >null< /code >.* * The validity of the objects are determined using the* {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >.* * @return T one of the pooled objects.*/T get();/*** Releases the object and puts it back to the pool.* * The mechanism of putting the object back to the pool is* generally asynchronous, * however future implementations might differ.* * @param t the object to return to the pool*/void release(T t);/*** Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources.* Releasing resources are done * via the < code >invalidate()< /code >* method of the {@link Validator} interface.*/void shutdown();
}
特意使上述接口非常简单通用,以支持任何类型的对象。 它提供了从池中获取对象或将对象返回池中的方法。 它还提供了一种关闭机制来处理对象。
现在,我们尝试创建上述接口的实现。 但在此之前,必须注意,理想的release()方法将首先尝试检查客户端返回的对象是否仍然可重用,这一点很重要。 如果是,则它将返回到池中,否则必须丢弃该对象。 我们希望Pool接口的每个实现都遵循此规则。 因此,在创建具体的实现之前,我们创建一个抽象的实现帽子,将这种限制强加给后续的实现。 我们的抽象实现将被称为Surprise,AbstractPool,其定义如下:
package com.test.pool;/*** Represents an abstract pool, that defines the procedure* of returning an object to the pool.* * @author Swaranga** @param < T > the type of pooled objects.*/
abstract class AbstractPool < T > implements Pool < T >
{/*** Returns the object to the pool. * The method first validates the object if it is* re-usable and then puts returns it to the pool.* * If the object validation fails, * some implementations* will try to create a new one * and put it into the pool; however * this behaviour is subject to change * from implementation to implementation* */@Overridepublic final void release(T t){if(isValid(t)){returnToPool(t);}else{handleInvalidReturn(t);}}protected abstract void handleInvalidReturn(T t);protected abstract void returnToPool(T t);protected abstract boolean isValid(T t);
}
在上面的类中,我们强制对象池必须先验证对象,然后再将其返回到池中。 为了自定义其池的行为,实现可以自由选择它们实现三种抽象方法的方式。 他们将决定使用自己的逻辑,如何检查对象是否对重用有效[validate()方法,如果客户端返回的对象无效[该方法,handleInvalidReturn()方法]以及实际逻辑,该怎么办。将有效对象返回到池中[returnToPool()方法]。
现在有了上面的类集,我们几乎可以进行具体的实现了。 但是要注意的是,由于上述类旨在支持通用对象池,因此上述类的通用实现将不知道如何验证对象[因为对象将是通用的:-)。 因此,我们需要其他可以帮助我们的东西。
我们实际上需要的是一种验证对象的常用方法,这样,具体的Pool实现就不必担心正在验证的对象的类型。 因此,我们引入了一个新的接口Validator,该接口定义了验证对象的方法。 我们对Validator接口的定义如下:
package com.test.pool;/*** Represents the functionality to * validate an object of the pool* and to subsequently perform cleanup activities.* * @author Swaranga** @param < T > the type of objects to validate and cleanup.*/public static interface Validator < T >{/*** Checks whether the object is valid.* * @param t the object to check.* * @returntrue
* if the object is valid elsefalse
.*/public boolean isValid(T t);/*** Performs any cleanup activities * before discarding the object.* For example before discarding * database connection objects,* the pool will want to close the connections. * This is done via the *invalidate()
method.* * @param t the object to cleanup*/public void invalidate(T t);}
上面的接口定义了检查对象是否有效的方法,以及使对象和对象无效的方法。 当我们要丢弃对象并清除该实例使用的任何内存时,应使用invalidate方法。 请注意,此接口本身没有什么意义,仅在对象池的上下文中使用时才有意义。 因此,我们在顶级Pool接口中定义此接口。 这类似于Java Collections库中的Map和Map.Entry接口。 因此,我们的Pool接口如下所示:
package com.test.pool;/*** Represents a cached pool of objects.* * @author Swaranga** @param < T > the type of object to pool.*/ public interface Pool< T > {/*** Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation.* * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available.* In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown* when the thread waits for an object to become available.* * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not.* If any object is available the call returns it * else the call returns < code >null< /code >.* * The validity of the objects are determined using the* {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >.* * @return T one of the pooled objects.*/T get();/*** Releases the object and puts it back to the pool.* * The mechanism of putting the object back to the pool is* generally asynchronous, * however future implementations might differ.* * @param t the object to return to the pool*/void release(T t);/*** Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources.* Releasing resources are done * via the < code >invalidate()< /code >* method of the {@link Validator} interface.*/void shutdown();/*** Represents the functionality to * validate an object of the pool* and to subsequently perform cleanup activities.* * @author Swaranga** @param < T > the type of objects to validate and cleanup.*/public static interface Validator < T >{/*** Checks whether the object is valid.* * @param t the object to check.* * @returntrue
* if the object is valid elsefalse
.*/public boolean isValid(T t);/*** Performs any cleanup activities * before discarding the object.* For example before discarding * database connection objects,* the pool will want to close the connections. * This is done via the *invalidate()
method.* * @param t the object to cleanup*/public void invalidate(T t);} }
我们几乎准备好具体实施了。 但是在此之前,我们需要一种最终的武器,它实际上是对象池中最重要的武器。 这被称为“创建新对象的能力”。c我们的对象池将是通用的,它们必须具有如何创建新对象以填充其池的知识。 此功能也必须不依赖于对象池的类型,并且必须是创建新对象的常用方法。 完成此操作的方法将是一个称为ObjectFactory的接口,该接口仅定义一种方法,即“如何创建新对象”。 我们的ObjectFactory接口如下:
package com.test.pool;/*** Represents the mechanism to create * new objects to be used in an object pool.* * @author Swaranga** @param < T > the type of object to create. */
public interface ObjectFactory < T >
{/*** Returns a new instance of an object of type T.* * @return T an new instance of the object of type T*/public abstract T createNew();
}
最后,我们完成了我们的帮助程序类,现在我们将创建Pool接口的具体实现。 因为我们想要一个可以在并发应用程序中使用的池,所以我们将创建一个阻塞池,如果池中没有可用的对象,它将阻塞客户端。 阻塞机制将无限期阻塞,直到对象可用为止。 这种实现方式催生了另一个方法,该方法将仅在给定的超时时间段内阻塞,如果在返回该对象的超时之前有任何对象可用,否则在超时之后而不是永远等待,则返回一个空对象。 此实现类似于Java Concurrency API的LinkedBlockingQueue实现,因此在实现实际的类之前,我们公开另一个实现BlockingPool,该实现类似于Java Concurrency API的BlockingQueue接口。
因此,Blockingpool接口声明如下:
package com.test.pool;import java.util.concurrent.TimeUnit;/*** Represents a pool of objects that makes the * requesting threads wait if no object is available.* * @author Swaranga** @param < T > the type of objects to pool.*/ public interface BlockingPool < T > extends Pool < T > {/*** Returns an instance of type T from the pool.* * The call is a blocking call, * and client threads are made to wait* indefinitely until an object is available. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented.* * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available,* the current implementations * sets the interrupted state of the thread * totrue
and returns null. * However this is subject to change * from implementation to implementation.* * @return T an instance of the Object * of type T from the pool.*/T get();/*** Returns an instance of type T from the pool, * waiting up to the* specified wait time if necessary * for an object to become available..* * The call is a blocking call, * and client threads are made to wait* for time until an object is available * or until the timeout occurs. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented.* * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available,* the current implementations * set the interrupted state of the thread * totrue
and returns null. * However this is subject to change * from implementation to implementation.* * * @param time amount of time to wait before giving up, * in units of unit* @param unit a TimeUnit determining * how to interpret the* timeout parameter* * @return T an instance of the Object * of type T from the pool.* * @throws InterruptedException * if interrupted while waiting*/T get(long time, TimeUnit unit) throws InterruptedException; }
我们的BoundedBlockingPool实现将如下所示:
package com.test.pool;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public final class BoundedBlockingPool < T > extends AbstractPool < T >implements BlockingPool < T >
{private int size;private BlockingQueue < T > objects;private Validator < T > validator;private ObjectFactory < T > objectFactory;private ExecutorService executor = Executors.newCachedThreadPool();private volatile boolean shutdownCalled;public BoundedBlockingPool(int size, Validator < T > validator, ObjectFactory < T > objectFactory){super();this.objectFactory = objectFactory;this.size = size;this.validator = validator;objects = new LinkedBlockingQueue < T >(size);initializeObjects();shutdownCalled = false;}public T get(long timeOut, TimeUnit unit){if(!shutdownCalled){T t = null;try{t = objects.poll(timeOut, unit);return t;}catch(InterruptedException ie){Thread.currentThread().interrupt();}return t;}throw new IllegalStateException('Object pool is already shutdown');}public T get(){if(!shutdownCalled){T t = null;try{t = objects.take();}catch(InterruptedException ie){Thread.currentThread().interrupt();}return t;}throw new IllegalStateException('Object pool is already shutdown');}public void shutdown(){shutdownCalled = true;executor.shutdownNow();clearResources();}private void clearResources(){for(T t : objects){validator.invalidate(t);}}@Overrideprotected void returnToPool(T t){if(validator.isValid(t)){executor.submit(new ObjectReturner(objects, t));}}@Overrideprotected void handleInvalidReturn(T t){}@Overrideprotected boolean isValid(T t){return validator.isValid(t);}private void initializeObjects(){for(int i = 0; i < size; i++){objects.add(objectFactory.createNew());}}private class ObjectReturner < E > implements Callable < Void >{private BlockingQueue < E > queue;private E e;public ObjectReturner(BlockingQueue < E > queue, E e){this.queue = queue;this.e = e;}public Void call(){while(true){try{queue.put(e);break;}catch(InterruptedException ie){Thread.currentThread().interrupt();}}return null;}}
}
上面是一个非常基本的对象池,在内部由LinkedBlockingQueue支持。 唯一感兴趣的方法是returnToPool()方法。 由于内部存储是一个阻塞池,因此,如果我们尝试将返回的元素直接放入LinkedBlockingPool中,则如果队列已满,它可能会阻塞客户端。 但是我们不希望对象池的客户端仅为了执行普通任务(例如将对象返回到池)而阻塞。 因此,我们完成了将对象作为异步任务插入LinkedBlockingQueue的实际任务,并将其提交给Executor实例,以便客户端线程可以立即返回。
现在,我们将上述对象池用于代码中。 我们将使用对象池来池一些数据库连接对象。 因此,我们将需要一个验证器来验证我们的数据库连接对象。
我们的JDBCConnectionValidator将如下所示:
package com.test;import java.sql.Connection;
import java.sql.SQLException;import com.test.pool.Pool.Validator;public final class JDBCConnectionValidator implements Validator < Connection >
{public boolean isValid(Connection con){ if(con == null){return false;}try{return !con.isClosed();}catch(SQLException se){return false;}}public void invalidate(Connection con){try{con.close();}catch(SQLException se){}}
}
我们的JDBCObjectFactory将使对象池能够创建新对象,如下所示:
package com.test;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;import com.test.pool.ObjectFactory;public class JDBCConnectionFactory implements ObjectFactory < Connection >
{private String connectionURL;private String userName;private String password;public JDBCConnectionFactory(String driver, String connectionURL, String userName, String password){super();try{Class.forName(driver);}catch(ClassNotFoundException ce){throw new IllegalArgumentException('Unable to find driver in classpath', ce);}this.connectionURL = connectionURL;this.userName = userName;this.password = password;}public Connection createNew(){ try{return DriverManager.getConnection(connectionURL, userName, password);}catch(SQLException se){throw new IllegalArgumentException('Unable to create new connection', se);}}
}
现在,我们使用上面的Validator和ObjectFactory创建一个JDBC对象池:
package com.test;
import java.sql.Connection;import com.test.pool.Pool;
import com.test.pool.PoolFactory;public class Main
{public static void main(String[] args){Pool < Connection > pool = new BoundedBlockingPool < Connection > (10, new JDBCConnectionValidator(),new JDBCConnectionFactory('', '', '', ''));//do whatever you like}
}
作为阅读整个帖子的奖励。 我将提供Pool接口的另一种实现,它实际上是一个非阻塞对象池。 此实现与上一个实现的唯一区别在于,如果某个元素不可用,则此实现不会阻塞客户端,而是返回null。 它去了:
package com.test.pool;import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;public class BoundedPool < T > extends AbstractPool < T >
{private int size;private Queue < T > objects;private Validator < T > validator;private ObjectFactory < T > objectFactory;private Semaphore permits;private volatile boolean shutdownCalled;public BoundedPool(int size, Validator < T > validator, ObjectFactory < T > objectFactory){super();this.objectFactory = objectFactory;this.size = size;this.validator = validator;objects = new LinkedList < T >();initializeObjects();shutdownCalled = false;}@Overridepublic T get(){T t = null;if(!shutdownCalled){if(permits.tryAcquire()){t = objects.poll();}}else{throw new IllegalStateException('Object pool already shutdown');}return t;}@Overridepublic void shutdown(){shutdownCalled = true;clearResources();}private void clearResources(){for(T t : objects){validator.invalidate(t);}}@Overrideprotected void returnToPool(T t){boolean added = objects.add(t);if(added){permits.release();}}@Overrideprotected void handleInvalidReturn(T t){}@Overrideprotected boolean isValid(T t){return validator.isValid(t);}private void initializeObjects(){for(int i = 0; i < size; i++){objects.add(objectFactory.createNew());}}
}
考虑到我们现在有两个强大的实现,最好让用户通过带有有意义名称的工厂创建我们的池。 这是工厂:
package com.test.pool;import com.test.pool.Pool.Validator;/*** Factory and utility methods for * {@link Pool} and {@link BlockingPool} classes * defined in this package. * This class supports the following kinds of methods:**
- *
- 创建并返回{@link Pool}接口的默认非阻塞*实现的方法。 *
- * *
- 创建并返回{@link BlockingPool}接口的默认实现的方法。 *
- *
因此,我们的客户现在可以以更易读的方式创建对象池:
package com.test;
import java.sql.Connection;import com.test.pool.Pool;
import com.test.pool.PoolFactory;public class Main
{public static void main(String[] args){Pool < Connection > pool = PoolFactory.newBoundedBlockingPool(10, new JDBCConnectionFactory('', '', '', ''), new JDBCConnectionValidator());//do whatever you like}
}
这样就结束了我们的长篇文章。 这个早就该了。 随时使用,更改,添加更多实现。
祝您编程愉快,别忘了分享!
参考: The Java HotSpot博客上的JCG合作伙伴 Sarma Swaranga 提供的通用并发对象池 。
翻译自: https://www.javacodegeeks.com/2012/09/a-generic-and-concurrent-object-pool.html