聊聊AsyncHttpClient的ListenableFuture

本文主要研究一下AsyncHttpClient的ListenableFuture

ListenableFuture

org/asynchttpclient/ListenableFuture.java

public interface ListenableFuture<V> extends Future<V> {/*** Terminate and if there is no exception, mark this Future as done and release the internal lock.*/void done();/*** Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}** @param t the exception*/void abort(Throwable t);/*** Touch the current instance to prevent external service to times out.*/void touch();/*** Adds a listener and executor to the ListenableFuture.* The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed* to the executor} for execution when the {@code Future}'s computation is* {@linkplain Future#isDone() complete}.* <br>* Executor can be <code>null</code>, in that case executor will be executed* in the thread where completion happens.* <br>* There is no guaranteed ordering of execution of listeners, they may get* called in the order they were added and they may get called out of order,* but any listener added through this method is guaranteed to be called once* the computation is complete.** @param listener the listener to run when the computation is complete.* @param exec     the executor to run the listener in.* @return this Future*/ListenableFuture<V> addListener(Runnable listener, Executor exec);CompletableFuture<V> toCompletableFuture();//......
}  

ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法

CompletedFailure

org/asynchttpclient/ListenableFuture.java

  class CompletedFailure<T> implements ListenableFuture<T> {private final ExecutionException e;public CompletedFailure(Throwable t) {e = new ExecutionException(t);}public CompletedFailure(String message, Throwable t) {e = new ExecutionException(message, t);}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return true;}@Overridepublic boolean isCancelled() {return false;}@Overridepublic boolean isDone() {return true;}@Overridepublic T get() throws ExecutionException {throw e;}@Overridepublic T get(long timeout, TimeUnit unit) throws ExecutionException {throw e;}@Overridepublic void done() {}@Overridepublic void abort(Throwable t) {}@Overridepublic void touch() {}@Overridepublic ListenableFuture<T> addListener(Runnable listener, Executor exec) {if (exec != null) {exec.execute(listener);} else {listener.run();}return this;}@Overridepublic CompletableFuture<T> toCompletableFuture() {CompletableFuture<T> future = new CompletableFuture<>();future.completeExceptionally(e);return future;}}

CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true

NettyResponseFuture

org/asynchttpclient/netty/NettyResponseFuture.java

public final class NettyResponseFuture<V> implements ListenableFuture<V> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "redirectCount");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "currentRetry");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isCancelled");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inAuth");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inProxyAuth");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "contentProcessed");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");@SuppressWarnings("rawtypes")private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");@SuppressWarnings("rawtypes")private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");private final long start = unpreciseMillisTime();private final ChannelPoolPartitioning connectionPoolPartitioning;private final ConnectionSemaphore connectionSemaphore;private final ProxyServer proxyServer;private final int maxRetry;private final CompletableFuture<V> future = new CompletableFuture<>();          //......@Overridepublic V get() throws InterruptedException, ExecutionException {return future.get();}@Overridepublic V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {return future.get(l, tu);}          
}          

NettyResponseFuture实现了ListenableFuture接口

done

  public final void done() {if (terminateAndExit())return;try {loadContent();} catch (ExecutionException ignored) {} catch (RuntimeException t) {future.completeExceptionally(t);} catch (Throwable t) {future.completeExceptionally(t);throw t;}}private boolean terminateAndExit() {releasePartitionKeyLock();cancelTimeouts();this.channel = null;this.reuseChannel = false;return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;}  private void loadContent() throws ExecutionException {if (future.isDone()) {try {future.get();} catch (InterruptedException e) {throw new RuntimeException("unreachable", e);}}// No more retryCURRENT_RETRY_UPDATER.set(this, maxRetry);if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {try {future.complete(asyncHandler.onCompleted());} catch (Throwable ex) {if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {try {try {asyncHandler.onThrowable(ex);} catch (Throwable t) {LOGGER.debug("asyncHandler.onThrowable", t);}} finally {cancelTimeouts();}}future.completeExceptionally(ex);}}future.getNow(null);}  

done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调

abort

  public final void abort(final Throwable t) {if (terminateAndExit())return;future.completeExceptionally(t);if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {try {asyncHandler.onThrowable(t);} catch (Throwable te) {LOGGER.debug("asyncHandler.onThrowable", te);}}}

abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调

touch

  public void touch() {touch = unpreciseMillisTime();}

touch方法用当前时间戳更新touch属性

addListener

  public ListenableFuture<V> addListener(Runnable listener, Executor exec) {if (exec == null) {exec = Runnable::run;}future.whenCompleteAsync((r, v) -> listener.run(), exec);return this;}

addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)

toCompletableFuture

  public CompletableFuture<V> toCompletableFuture() {return future;}

toCompletableFuture方法直接返回future

newNettyResponseFuture

org/asynchttpclient/netty/request/NettyRequestSender.java

  private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,AsyncHandler<T> asyncHandler,NettyRequest nettyRequest,ProxyServer proxyServer) {NettyResponseFuture<T> future = new NettyResponseFuture<>(request,asyncHandler,nettyRequest,config.getMaxRequestRetry(),request.getChannelPoolPartitioning(),connectionSemaphore,proxyServer);String expectHeader = request.getHeaders().get(EXPECT);if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))future.setDontWriteBodyBecauseExpectContinue(true);return future;}private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,AsyncHandler<T> asyncHandler,NettyResponseFuture<T> future,ProxyServer proxyServer,boolean performConnectRequest) {NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,performConnectRequest);Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);return Channels.isChannelActive(channel)? sendRequestWithOpenChannel(newFuture, asyncHandler, channel): sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);}  

NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求

小结

AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/229589.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NBA得分数据可视化

简介 这是上学期的一些课外活动内容&#xff0c;将 NBA 得分数据进行可视化&#xff0c;并进行后续的探索性分析和建模&#xff08;本文未介绍&#xff09;。主要研究动机来源于这篇论文&#xff1a; 该论文使用二元的伽马过程来刻画 NBA 主客场得分数据&#xff0c;并且考虑了…

@RabbitHandler和@RabbitListener的区别

RabbitHandler 和 RabbitListener 是Spring AMQP&#xff08;特别是针对RabbitMQ&#xff09;中常用的两个注解&#xff0c;它们在消息处理中扮演着不同的角色。 RabbitListener 定义&#xff1a;RabbitListener 注解用于标记一个方法&#xff0c;使其成为消息队列的监听器&am…

5.5 Linux Apache服务

1、概念介绍 a. Web 服务简介 WEB服务器也称为WWW(WORLD WIDE WEB&#xff0c;万维网)服务器&#xff0c;主要功能是提供网上信息浏览服务。 常用web服务器&#xff1a;httpd&#xff08;apache&#xff09;、nginx、tomcat、IIS 客户端&#xff1a;IE、firefox、chrome b…

高通平台开发系列讲解(AI篇)SNPE工作流程介绍

文章目录 一、转换网络模型二、量化2.1、选择量化或非量化模型2.2、使用离线TensorFlow或Caffe模型2.3、使用非量化DLC初始化SNPE2.4、使用量化DLC初始化SNPE三、准备输入数据四、运行加载网络沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇章主要介绍SNPE模型工作…

Android终端模拟器Termux上使用Ubuntu

Termux 上安装各种 Linux 系统是通过 proot-distro 工具来实现的&#xff0c;所以先安装一下 proot-distro 工具。 ~ $ pkg install proot-distro 查看Termux支持安装那些Linux ~ $ proot-distro listSupported distributions:* Alpine LinuxAlias: alpineInstalled: noComme…

学习Java第70天,过滤器Filter简介

过滤器概述 Filter,即过滤器,是JAVAEE技术规范之一,作用目标资源的请求进行过滤的一套技术规范,是Java Web项目中最为实用的技术之一 Filter接口定义了过滤器的开发规范,所有的过滤器都要实现该接口 Filter的工作位置是项目中所有目标资源之前,容器在创建HttpServletRequest和…

Cmake基础(6)

这篇文章阐述如何添加第三方库 文章目录 包含目录和依赖库注意事项target_link_libraries 基本用法&#xff1a;修饰词&#xff1a;PUBLIC、PRIVATE、INTERFACE 其他注意事项&#xff1a;optimized 和 debug 配置标识&#xff1a;示例&#xff1a; 包含目录和依赖库 把上一篇…

【C++】POCO学习总结(十八):XML

【C】郭老二博文之&#xff1a;C目录 1、XML文件格式简介 1&#xff09;XML文件的开头一般都有个声明&#xff0c;声明是可选 <&#xff1f;xml version"1.0" encoding"UTF-8"?>2&#xff09;根元素&#xff1a;XML文件最外层的元素 3&#xff…

java内置的数据结构

Java语言提供了许多内置的数据结构&#xff0c;包括&#xff1a; 1. 数组&#xff08;Array&#xff09;&#xff1a;数组是最基本的数据结构之一&#xff0c;它是一个有序的元素集合&#xff0c;每个元素都有一个对应的索引。在Java中&#xff0c;数组可以通过声明和初始化来创…

Docker使用2-Update the application

写在前面 主题是Update the application&#xff0c;这里是链接 更新项目 承接上个文章&#xff0c;这个文章主要是学习项目内容更新后重新构建image。 编辑上个项目的src/static/js/app.js文件&#xff0c;将第56行注释&#xff0c;添加下面的代码 <p className"…

【从零开始学习--设计模式--策略模式】

返回首页 前言 感谢各位同学的关注与支持&#xff0c;我会一直更新此专题&#xff0c;竭尽所能整理出更为详细的内容分享给大家&#xff0c;但碍于时间及精力有限&#xff0c;代码分享较少&#xff0c;后续会把所有代码示例整理到github&#xff0c;敬请期待。 此章节介绍策…

uniapp全局事件uni.$on,可以不相邻的组件之间传递参数

目录 传送参数页面接受参数页面最后 uniapp全局事件&#xff0c;也就是说&#xff0c;不相邻的&#xff0c;不是父子组件&#xff0c;也可以传递参数。 一个组件&#xff0c;传递项目内所有文件其中一个里面内&#xff0c;可以接受到参数。 传送参数页面 <template><…

每天五分钟计算机视觉:网络中的网络(NiN)

本文重点 前面的课程中我们学习了众多的经典网络模型&#xff0c;比如LeNet、AlexNet、VGG等等&#xff0c;这些网络模型都有共同的特点。 它们的特点是&#xff1a;先由卷积层构成的模块充分提取空间特征&#xff0c;然后再由全连接层构成的模块来输出分类结果。也就是说它们…

C练习题_3答案

一、单项选择题(本大题共20小题,每小题2分,共40分。在每小题给出的四个备选项中,选出一个正确的答案,并将所选项前的字母填写在答题纸的相应位置上。 以下正确的C语言自定义标识符是(A)A. la B. 2a C. do D. a.12 2.在C语言中,错误的常数表示是(D) A. OL B. 0x6aL C. ‘6’…

数据结构基础小结

数据结构基础小结 概述 什么是算法&#xff1f; 在计算机领域里&#xff0c;算法是一系列程序指令&#xff0c;用于处理特定的运算和逻辑问题。 衡量算法优劣的主要标准是时间复杂度和空间复杂度。 什么是数据结构&#xff1f; 数据结构&#xff0c;对应的英文单词是 dat…

Apache SeaTunne简介

Apache SeaTunne简介 文章目录 1.Apache SeaTunne是什么&#xff1f;1.1[官网](https://seatunnel.apache.org/)1.2 项目地址 2.架构3.特性3.1 丰富且可扩展的连接器和插件机制3.2 支持分布式快照算法以确保数据一致性3.3 支持流、批数据处理&#xff0c;支持全量、增量和实时数…

C#实现一个安全的事件订阅器

1.解释下什么是事件订阅器 在C#的上下文中&#xff0c;事件订阅器是一种用于处理特定事件的机制。 事件&#xff08;Event&#xff09;&#xff1a;事件是在软件应用程序中发生的事物&#xff0c;如按钮被点击、数据被更改等。在C#中&#xff0c;事件是通过使用event关键字声…

Linux_Docker图形化工具Portainer如何安装并结合内网穿透实现远程访问

文章目录 前言1. 部署Portainer2. 本地访问Portainer3. Linux 安装cpolar4. 配置Portainer 公网访问地址5. 公网远程访问Portainer6. 固定Portainer公网地址 前言 本文主要介绍如何本地安装Portainer并结合内网穿透工具实现任意浏览器远程访问管理界面。Portainer 是一个轻量级…

频谱论文:基于张量Tucker分解的频谱地图构建算法

#频谱# [1]陈智博,胡景明,张邦宁 郭道省.(2023).基于张量Tucker分解的频谱地图构建算法.电子与信息学报(11),4161-4169. &#xff08;陆军工程大学&#xff09; 研究内容 将动态电磁环境的时变频谱地图建模为3维频谱张量&#xff0c;通过张量Tucker分解提取出具有物理意义的核…

【MySQL】(DDL) 数据库操作

创建&#xff1a; create database 数据库名称; //创建数据库 create database if not exists 数据库名 ; //创建数据库并添加判断 &#xff08;如果存在就不创建不存在就创建 &#xff09; create database 数据库名 default charset 字符集 ; //创建数据库并设置字符集 查…