AsyncHttpClient的连接池结构很简单, NettyConnectionsPool内部重要的几个变量如下
// 连接池, 通过 host 区分不同的池private final ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>> connectionsPool = new ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>>();// 原生channel跟IdleChannel对象的映射, IdleChannel主要是包含一些请求信息, 请求url以及请求开始时间private final ConcurrentHashMap<Channel, IdleChannel> channel2IdleChannel = new ConcurrentHashMap<Channel, IdleChannel>();// 记录了Channel的创建时间, 用于做Channel生命周期检测, 如果生命周期是-1, 此Map无用private final ConcurrentHashMap<Channel, Long> channel2CreationDate = new ConcurrentHashMap<Channel, Long>();
主要逻辑都位于NettyAsyncHttpProvider下
1. 取出连接池连接(doConnection阶段)
先从连接池取出连接, 取出连接后会将连接从connectionsPool的数量会减少
synchronized (idleConnectionForHost) {idleChannel = idleConnectionForHost.poll();if (idleChannel != null) {channel2IdleChannel.remove(idleChannel.channel);} }
如果连接存在, 取出来以后直接就会返回future. 否则进入下列流程
2. 对池内连接的控制 (doConnect阶段)
在doConnect的时候会判断connectionsPool是否可cache, 如下
public boolean canCacheConnection() {if (!isClosed.get() && maxTotalConnections != -1 && channel2IdleChannel.size() >= maxTotalConnections) {return false;} else {return true;}}
其中channel2IdleChannel在连接池poll的时候会remove channel, 也就是说判断的连接数是在池内的channel数
加入返回false, 则会调用asyncHandler的onThrowable()方法, 并抛出 "Too many connections " 异常
// Do not throw an exception when we need an extra connection for a redirect.if (!reclaimCache && !connectionsPool.canCacheConnection()) {IOException ex = new IOException(String.format("Too many connections %s", config.getMaxTotalConnections()));try {asyncHandler.onThrowable(ex);} catch (Throwable t) {log.warn("!connectionsPool.canCacheConnection()", t);}throw ex;}
provider对这一步的判断在 3) 的判断之前
3. 对池外连接的控制 (doConnect阶段)
池外连接使用
private Semaphore freeConnections = null;
进行控制, 他的值为 MaxTotalConnections, 这个值和连接池的是一样的, 逻辑如下
if (trackConnections) {if (!reclaimCache) {if (!freeConnections.tryAcquire()) {IOException ex = new IOException(String.format("Too many connections %s", config.getMaxTotalConnections()));try {asyncHandler.onThrowable(ex);} catch (Throwable t) {log.warn("!connectionsPool.canCacheConnection()", t);}throw ex;} else {acquiredConnection = true;}}}
默认调用的
public <T> ListenableFuture<T> execute(Request request, AsyncHandler<T> handler) throws IOException;
方法, reclaimCache 都为 false
4. 向连接池添加连接逻辑 (Protocol handle()阶段)
在provider的HttpProtocol类里会调finishUpdate()方法, 这里会执行向连接池添加连接的操作, 调用offer方法
private void finishUpdate(final NettyResponseFuture<?> future, final ChannelHandlerContext ctx, boolean lastValidChunk) throws IOException {if (lastValidChunk && future.getKeepAlive()) {drainChannel(ctx, future);} else {if (future.getKeepAlive() && ctx.getChannel().isReadable() && connectionsPool.offer(getPoolKey(future), ctx.getChannel())) {markAsDone(future, ctx);return;}finishChannel(ctx);}markAsDone(future, ctx);}
连接池的offer方法没有对maxTotalConnections的判断, 只对maxConnectionPerHost做判断