聊聊AsyncHttpClient的ChannelPool

本文主要研究一下AsyncHttpClient的ChannelPool

ChannelPool

org/asynchttpclient/channel/ChannelPool.java

public interface ChannelPool {/*** Add a channel to the pool** @param channel      an I/O channel* @param partitionKey a key used to retrieve the cached channel* @return true if added.*/boolean offer(Channel channel, Object partitionKey);/*** Remove the channel associated with the uri.** @param partitionKey the partition used when invoking offer* @return the channel associated with the uri*/Channel poll(Object partitionKey);/*** Remove all channels from the cache. A channel might have been associated* with several uri.** @param channel a channel* @return the true if the channel has been removed*/boolean removeAll(Channel channel);/*** Return true if a channel can be cached. A implementation can decide based* on some rules to allow caching Calling this method is equivalent of* checking the returned value of {@link ChannelPool#offer(Channel, Object)}** @return true if a channel can be cached.*/boolean isOpen();/*** Destroy all channels that has been cached by this instance.*/void destroy();/*** Flush partitions based on a predicate** @param predicate the predicate*/void flushPartitions(Predicate<Object> predicate);/*** @return The number of idle channels per host.*/Map<String, Long> getIdleChannelCountPerHost();
}

ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool

NoopChannelPool

org/asynchttpclient/channel/NoopChannelPool.java

public enum NoopChannelPool implements ChannelPool {INSTANCE;@Overridepublic boolean offer(Channel channel, Object partitionKey) {return false;}@Overridepublic Channel poll(Object partitionKey) {return null;}@Overridepublic boolean removeAll(Channel channel) {return false;}@Overridepublic boolean isOpen() {return true;}@Overridepublic void destroy() {}@Overridepublic void flushPartitions(Predicate<Object> predicate) {}@Overridepublic Map<String, Long> getIdleChannelCountPerHost() {return Collections.emptyMap();}
}

NoopChannelPool是个枚举,用枚举实现了单例,其方法默认为空操作

DefaultChannelPool

/*** A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap}*/
public final class DefaultChannelPool implements ChannelPool {private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation;private final AtomicBoolean isClosed = new AtomicBoolean(false);private final Timer nettyTimer;private final int connectionTtl;private final boolean connectionTtlEnabled;private final int maxIdleTime;private final boolean maxIdleTimeEnabled;private final long cleanerPeriod;private final PoolLeaseStrategy poolLeaseStrategy;public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) {this(config.getPooledConnectionIdleTimeout(),config.getConnectionTtl(),hashedWheelTimer,config.getConnectionPoolCleanerPeriod());}public DefaultChannelPool(int maxIdleTime,int connectionTtl,Timer nettyTimer,int cleanerPeriod) {this(maxIdleTime,connectionTtl,PoolLeaseStrategy.LIFO,nettyTimer,cleanerPeriod);}public DefaultChannelPool(int maxIdleTime,int connectionTtl,PoolLeaseStrategy poolLeaseStrategy,Timer nettyTimer,int cleanerPeriod) {this.maxIdleTime = maxIdleTime;this.connectionTtl = connectionTtl;connectionTtlEnabled = connectionTtl > 0;channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null;this.nettyTimer = nettyTimer;maxIdleTimeEnabled = maxIdleTime > 0;this.poolLeaseStrategy = poolLeaseStrategy;this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE));if (connectionTtlEnabled || maxIdleTimeEnabled)scheduleNewIdleChannelDetector(new IdleChannelDetector());}//......
}  

DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行

offer

  public boolean offer(Channel channel, Object partitionKey) {if (isClosed.get())return false;long now = unpreciseMillisTime();if (isTtlExpired(channel, now))return false;boolean offered = offer0(channel, partitionKey, now);if (connectionTtlEnabled && offered) {registerChannelCreation(channel, partitionKey, now);}return offered;}private boolean isTtlExpired(Channel channel, long now) {if (!connectionTtlEnabled)return false;ChannelCreation creation = channelId2Creation.get(channel.id());return creation != null && now - creation.creationTime >= connectionTtl;}  private boolean offer0(Channel channel, Object partitionKey, long now) {ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);if (partition == null) {partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>());}return partition.offerFirst(new IdleChannel(channel, now));}  private void registerChannelCreation(Channel channel, Object partitionKey, long now) {ChannelId id = channel.id();if (!channelId2Creation.containsKey(id)) {channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey));}}  

offer接口先判断isTtlExpired,如果channel的存活时间超过connectionTtl则返回false,否则执行offer0,往ConcurrentLinkedDeque添加,若添加成功且connectionTtlEnabled则执行registerChannelCreation,维护创建时间

poll

  /*** {@inheritDoc}*/public Channel poll(Object partitionKey) {IdleChannel idleChannel = null;ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);if (partition != null) {while (idleChannel == null) {idleChannel = poolLeaseStrategy.lease(partition);if (idleChannel == null)// pool is emptybreak;else if (!Channels.isChannelActive(idleChannel.channel)) {idleChannel = null;LOGGER.trace("Channel is inactive, probably remotely closed!");} else if (!idleChannel.takeOwnership()) {idleChannel = null;LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!");}}}return idleChannel != null ? idleChannel.channel : null;}

poll方法是根据partitionKey找到对应的ConcurrentLinkedDeque,然后循环执行poolLeaseStrategy.lease(partition),若idleChannel为null直接break,若isChannelActive为false则重置为null继续循环,若idleChannel.takeOwnership()为false也重置为null继续循环

removeAll

  /*** {@inheritDoc}*/public boolean removeAll(Channel channel) {ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channel.id()) : null;return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE));}

removeAll方法会将指定的channel从channelId2Creation及ConcurrentLinkedDeque中移除

isOpen

  /*** {@inheritDoc}*/public boolean isOpen() {return !isClosed.get();}

isOpen则取的isClosed变量

destroy

  /*** {@inheritDoc}*/public void destroy() {if (isClosed.getAndSet(true))return;partitions.clear();if (connectionTtlEnabled) {channelId2Creation.clear();}}

destroy会设置isClosed为true,然后清空partitions及channelId2Creation

flushPartitions

  public void flushPartitions(Predicate<Object> predicate) {for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {Object partitionKey = partitionsEntry.getKey();if (predicate.test(partitionKey))flushPartition(partitionKey, partitionsEntry.getValue());}}private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) {if (partition != null) {partitions.remove(partitionKey);for (IdleChannel idleChannel : partition)close(idleChannel.channel);}}private void close(Channel channel) {// FIXME pity to have to do this hereChannels.setDiscard(channel);if (connectionTtlEnabled) {channelId2Creation.remove(channel.id());}Channels.silentlyCloseChannel(channel);}    

flushPartitions会遍历partitions,然后执行predicate.test,为true则执行flushPartition,它将从partitions移除指定的partitionKey,然后遍历idleChannels挨个执行close

getIdleChannelCountPerHost

  public Map<String, Long> getIdleChannelCountPerHost() {return partitions.values().stream().flatMap(ConcurrentLinkedDeque::stream).map(idle -> idle.getChannel().remoteAddress()).filter(a -> a.getClass() == InetSocketAddress.class).map(a -> (InetSocketAddress) a).map(InetSocketAddress::getHostName).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));}

getIdleChannelCountPerHost则遍历partitions,然后map出remoteAddress获取hostName,然后进行groupBy

PoolLeaseStrategy

  public enum PoolLeaseStrategy {LIFO {public <E> E lease(Deque<E> d) {return d.pollFirst();}},FIFO {public <E> E lease(Deque<E> d) {return d.pollLast();}};abstract <E> E lease(Deque<E> d);}

PoolLeaseStrategy是个枚举,定义了LIFO及FIFO两个枚举,LIFO则是对Deque执行pollFirst,FIFO则是对Deque执行pollLast

IdleChannelDetector

  private final class IdleChannelDetector implements TimerTask {private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) {return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime;}private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) {// lazy createList<IdleChannel> idleTimeoutChannels = null;for (IdleChannel idleChannel : partition) {boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now);boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel);boolean isTtlExpired = isTtlExpired(idleChannel.channel, now);if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) {LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired);if (idleTimeoutChannels == null)idleTimeoutChannels = new ArrayList<>(1);idleTimeoutChannels.add(idleChannel);}}return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList();}private List<IdleChannel> closeChannels(List<IdleChannel> candidates) {// lazy create, only if we hit a non-closeable channelList<IdleChannel> closedChannels = null;for (int i = 0; i < candidates.size(); i++) {// We call takeOwnership here to avoid closing a channel that has just been taken out// of the pool, otherwise we risk closing an active connection.IdleChannel idleChannel = candidates.get(i);if (idleChannel.takeOwnership()) {LOGGER.debug("Closing Idle Channel {}", idleChannel.channel);close(idleChannel.channel);if (closedChannels != null) {closedChannels.add(idleChannel);}} else if (closedChannels == null) {// first non closeable to be skipped, copy all// previously skipped closeable channelsclosedChannels = new ArrayList<>(candidates.size());for (int j = 0; j < i; j++)closedChannels.add(candidates.get(j));}}return closedChannels != null ? closedChannels : candidates;}public void run(Timeout timeout) {if (isClosed.get())return;if (LOGGER.isDebugEnabled())for (Object key : partitions.keySet()) {int size = partitions.get(key).size();if (size > 0) {LOGGER.debug("Entry count for : {} : {}", key, size);}}long start = unpreciseMillisTime();int closedCount = 0;int totalCount = 0;for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {// store in intermediate unsynchronized lists to minimize// the impact on the ConcurrentLinkedDequeif (LOGGER.isDebugEnabled())totalCount += partition.size();List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));if (!closedChannels.isEmpty()) {if (connectionTtlEnabled) {for (IdleChannel closedChannel : closedChannels)channelId2Creation.remove(closedChannel.channel.id());}partition.removeAll(closedChannels);closedCount += closedChannels.size();}}if (LOGGER.isDebugEnabled()) {long duration = unpreciseMillisTime() - start;if (closedCount > 0) {LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration);}}scheduleNewIdleChannelDetector(timeout.task());}}

IdleChannelDetector实现了netty的TimerTask接口,其run方法主要是遍历partitions,通过expiredChannels取出过期的IdleChannel,这里isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都算在内,然后挨个执行takeOwnership及close,再从channelId2Creation及partition中移除,最后再次调度一下IdleChannelDetector

小结

AsyncHttpClient的ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool;DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行。

poll方法会判断是active,不是的话继续循环lease,而IdleChannelDetector则会定期检查,isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都会被close,offer的时候还会判断isTtlExpired,这样子来保证连接的活性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/206539.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java对文件夹,文件操作

1、使用Java自带的File类来创建文件夹 String dirName "/data/file/temporaryfiles/"; File dir new File(dirName); if (!dir.exists()) {try {dir.mkdir();} catch (SecurityException se) {System.out.println("文件创建失败");}} else {System.out.…

视频处理关键知识

1 引言 视频技术发展到现在已经有100多年的历史&#xff0c;虽然比照相技术历史时间短&#xff0c;但在过去很长一段时间之内都是最重要的媒体。由于互联网在新世纪的崛起&#xff0c;使得传统的媒体技术有了更好的发展平台&#xff0c;应运而生了新的多媒体技术。而多媒体技术…

【android开发-21】android中调用系统摄像头camera拍照和相册的用法详解

1&#xff0c;调用摄像头 在Android中&#xff0c;调用系统摄像头拍照需要使用Intent来启动Camera应用&#xff0c;并在应用中设置相应的权限。下面是一个简单的例子&#xff1a; // 创建一个Intent对象&#xff0c;指定要执行的动作是拍照 Intent intent new Intent(Medi…

FITC-Dextran标记的抗体-科研

FITC-Dextran标记的抗体是一种将FITC&#xff08;荧光素异硫氰酸酯&#xff09;共价连接到抗体分子上的生物标记方法。这种标记使抗体具有荧光性质&#xff0c;可以用于免疫组化、流式细胞仪分析、荧光显微镜观察等各种应用中。以下是制备FITC-Dextran标记的抗体的基本步骤&…

php 导入excel

if($_FILES[files]){ $uploadfile$_FILES[files]; $name$uploadfile[name];//文件原名 $type$uploadfile[type]; $tmp_name$uploadfile[tmp_name]; $size$uploadfile[size]; $error$uploadfile[error]; // $uploadurl../../../d/uploads/; //上传路径 $path./uploads/; //上传路…

【教程】逻辑回归怎么做多分类

目录 一、逻辑回归模型介绍 1.1 逻辑回归模型简介 1.2 逻辑回归二分类模型 1.3 逻辑回归多分类模型 二、如何实现逻辑回归二分类 2.1 逻辑回归二分类例子 2.2 逻辑回归二分类实现代码 三、如何实现一个逻辑回归多分类 3.1 逻辑回归多分类问题 3.1 逻辑回归多分类的代…

Leetcode—198.打家劫舍【中等】

2023每日刷题&#xff08;五十二&#xff09; Leetcode—198.打家劫舍 算法思想 具体思路 首先&#xff0c;我们从上面的题目描述中抽象出题意。 ● 从一个非负整数数组中找到一个子序列&#xff0c;并且该子序列的和最大 ● 子序列中每个数的位置不能够相邻。举例来讲&…

Leetcode—1466.重新规划路线【中等】

2023每日刷题&#xff08;五十二&#xff09; Leetcode—1466.重新规划路线 算法思想 实现代码 class Solution { public:int minReorder(int n, vector<vector<int>>& connections) {vector<pair<int, int>> g[n];for(auto e: connections) {in…

JS的变量提升ES6基础

JS的变量提升&ES6基础 变量var关键字var声明作用域实例一实例二多个变量 变量提升 let关键字暂时性死区全局声明for循环中使用let const关键字 变量 ECMAScript变量时松散类型的&#xff0c;意思是变量可以用于保存任何类型的数据。 声明变量&#xff1a;var 、const、let …

阶梯电价1_分支结构 C语言xdoj27

题目&#xff1a;阶梯电价计费 类别&#xff1a;流程控制 时间限制&#xff1a;2S 内存限制&#xff1a;10000Kb 问题描述&#xff1a; 电价分三个档次&#xff0c;[0,110]度电&#xff0c;每度电0.5元&#xff1b;(110,210]度电&#xff0c;超出110部分每度电0.55元&…

git-vscode

git-vscode ctrlshiftp 创建分支 create branch 直接切到新的分支了 切换分支 直接点左下角自己选择 vscode中配置仓库 https://blog.csdn.net/zora_55/article/details/129709251 推送tag tag作用就是在 Git 中&#xff0c;标记存储库历史记录中特定提交的一种方式。t…

【Linux】无法使用 screenfetch 查看系统信息,报错 command not found: screenfetch

问题描述 screenfetch是一个命令行工具&#xff0c;用于在终端显示系统的硬件和软件信息。它会收集各种系统和环境的信息&#xff0c;并以彩色 ASCII 艺术的形式在终端中展示出来。 当你在终端中运行screenfetch命令时&#xff0c;它会检测你的操作系统、主机名、内核版本、C…

IntelliJ IDEA 2023.3发布,更新AI助手,运行相当流畅,再也不卡了

这两天Jetbrains来了一波大的更新&#xff0c;推出了2023.3正式版&#xff0c;均做了不少优化&#xff0c;最重要的是大家期待已久的Ai Assistant插件本次更新也正式推出&#xff0c;助力大家提高Coding效率。但是很遗憾&#xff0c;目前我们无法使用&#xff0c;因为该插件底层…

HTTPS加密协议:保护你的网络安全

引言&#xff1a; 随着互联网的普及&#xff0c;我们越来越依赖网络来获取信息、进行交流和完成各种任务。然而&#xff0c;网络的开放性和便利性也带来了一些安全隐患&#xff0c;如数据泄露、身份盗窃等。为了保护用户的隐私和安全&#xff0c;https加密协议应运而生。本文将…

[架构之路-256]:目标系统 - 设计方法 - 软件工程 - 软件设计 - 架构设计 - 软件系统不同层次的复用与软件系统向越来越复杂的方向聚合

目录 前言&#xff1a; 一、CPU寄存器级的复用&#xff1a;CPU寄存器 二、指令级复用&#xff1a;二进制指令 三、过程级复用&#xff1a;汇编语言 四、函数级复用&#xff1a;C语言 五、对象级复用&#xff1a;C, Java, Python 六、组件级复用 七、服务级复用 八、微…

计算机视觉-03-使用U-Net实现肾脏CT分割(包含数据和代码)

文章目录 0. 数据获取1. 介绍1.1 简介1.2 任务介绍1.3 数据集介绍1.3.1 介绍1.3.2 数据预处理建议 1.4 代码实现参考1.5 训练过程1.5.1 参数设置1.5.2 可视化1.5.3 结果分析 0. 数据获取 关注公众号&#xff1a;『AI学习星球』 回复&#xff1a;肾脏CT分割 即可获取数据下载。…

华为、华三、锐捷、思科巡检命令大全

思科、华为、华三、锐捷网络设备巡检命令 思科01 思科交换机巡检命令02 思科交换机基本配置命令 华三华为锐捷 思科 01 思科交换机巡检命令 show interface stats&#xff1a;查看交换机所有接口当前接口流量show running-config&#xff1a;查看当前设备配置show version&am…

高精度时钟芯片SD2405

概要 SD2405是一款非常优秀的RTC解决方案&#xff0c;为了能让用户在Arduino上有一款方便易用的时钟模块。该模块是一款内置晶振&#xff0c;支持IIC串行接口的高精度时钟模块&#xff1b;内置一次性工业级电池&#xff0c;可保证外部掉电的情况下&#xff0c;可以继续工作5~8…

实例分割 Mask-RCNN

参考文章 使用LabelMe标注目标检测数据集并转换为COCO2017格式_labelme转coco-CSDN博客 数据集选择 voc 这次不选择voc&#xff0c;因为文件组织太难了 voc2012文件夹组织 COCO COCO介绍 MC COCO2017年主要包含以下四个任务&#xff1a;目标检测与分割、图像描述、人体关…

【扩散模型】深入理解图像的表示原理:从像素到张量

【扩散模型】深入理解图像的表示原理&#xff1a;从像素到张量 在深度学习中&#xff0c;图像是重要的数据源之一&#xff0c;而图像的表示方式对于算法的理解和处理至关重要。本文将带你深入探讨图像的底层表示原理&#xff0c;从像素到张量&#xff0c;让你对图像表示有更清…