Netty—FuturePromise

Netty—Future&Promise

  • 一、JDK原生 Future
  • 二、Netty包下的 Future
  • 三、Promise
    • 1、使用Promise同步获取结果
    • 2、使用Promise异步获取结果
    • .3、使用Promise同步获取异常 - sync & get
    • 4、使用Promise同步获取异常 - await
    • 5、使用Promise异步获取异常

在异步处理时,经常用到这两个接口

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。

在这里插入图片描述

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
isCancellable-是否可以取消执行
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
removeListener-删除回调,异步接收结果
setSuccess--设置成功结果
setFailure--设置失败结果

一、JDK原生 Future

关于 java.util.concurrent包下的Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()get() 方法。

// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成,完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果,指定超时时间
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

接下来,演示一下使用jdk原生Future获取执行结果~

@Slf4j
public class JdkFutureTest01 {public static void main(String[] args) {// 线程池ExecutorService service = newFixedThreadPool(2);// 提交任务Future<Object> future = service.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 50;}});try {System.out.println(future.get());service.shutdownNow();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}

二、Netty包下的 Future

原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:

// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法,异步接收结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 添加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法,异步接收结果
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待任务结束,如果任务失败,抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果,非阻塞,还未产生结果时返回 null
V getNow();

通过以上扩展的方法我们可以发现,Netty的Future增加了 sync()await() 方法用于阻塞等待,还提供了 addListener() 方法用于添加回调方法,异步接收结果。

sync() 方法内部会先调用 await() 方法,等待 await() 方法返回后,会检查该任务是否失败,如果失败则将失败的异常抛出来。即使用await()方法等待任务结束,如果任务失败,不会抛异常,而是需要通过 isSuccess 判断。然而 sync() 方法是直接抛出异常!

@Override
public Promise<V> sync() throws InterruptedException {await();rethrowIfFailed();return this;
}
private void rethrowIfFailed() {Throwable cause = cause();if (cause == null) {return;}PlatformDependent.throwException(cause);
}

接下来,演示一下使用Netty包下的Future获取执行结果~

@Slf4j
public class NettyFutureTest01 {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();EventLoop eventLoop = eventLoopGroup.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 66;}});// 阻塞等待future.sync();log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
}

又或者使用 addListener() 方法用于添加回调方法,异步接收结果。

future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
});

三、Promise

Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。

// 设置成功结果并回调
Promise<V> setSuccess(V result);
// 同上,区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
Promise<V> setFailure(Throwable cause);
// 同上,区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();

可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。以下是DefaultPromise的继承关系:
在这里插入图片描述

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。

// result 字段的原子更新器
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// Promise所在的线程
private final EventExecutor executor;
// 一个或多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;@Override
public Promise<V> setSuccess(V result) {if (setSuccess0(result)) {return this;}throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {// 原子修改result字段为 objResultif (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {if (checkNotifyWaiters()) {notifyListeners();}return true;}return false;
}
private synchronized boolean checkNotifyWaiters() {if (waiters > 0) {// 唤醒其他等待线程notifyAll();}return listeners != null;
}

1、使用Promise同步获取结果

@Slf4j
public class PromiseDemo01 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}log.info("set success");promise.setSuccess(10);});log.info("start...");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

2、使用Promise异步获取结果

@Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

.3、使用Promise同步获取异常 - sync & get

Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

4、使用Promise同步获取异常 - await

@Slf4j
public class PromiseDemo04 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());promise.await();if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}}
}

5、使用Promise异步获取异常

@Slf4j
public class PromiseDemo05 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);promise.addListener(future -> {if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}});eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/68754.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Img标签的src地址自动拼接本地域名(localhost:8080)导致图片不显示问题

摘要&#xff1a;做Vueelement ui项目的时候&#xff0c;发现使用element ui的upload上传图片时&#xff0c;不显示的问题。我项目的图片是上传到七牛云&#xff0c;长传成功后返回存储在七牛云中的地址。后面发现是因为返回的地址是外部地址&#xff0c;需要完整的URL&#xf…

Android大厂需要刷的(999道)面试题

想必大家都在为今年的金九银十做准备&#xff0c;今年也是最为艰难的一年。作为程序员从未感觉到如此艰难&#xff0c;身边不是被辞退就是找不到工作。先不说2023年应届生毕业即失业&#xff0c;作为开发15年的老Android程序员&#xff0c;现在也在和300个人挣一个岗位。 肉少…

使用Dbeaver连接GaussDB

1.下载DBeaver&#xff0c;官网地址 2.安装软件&#xff0c;打开软件&#xff0c;点击数据库->驱动管理器&#xff0c;具体操作如下图&#xff1a; 3、选择新建后进行参数设置&#xff0c;如下图&#xff1a; 具体参数如下图 驱动名称: GS #随便定义 驱动类型&#…

Docker-安装(Linux,Windows)

目录 前言安装版本Docker版本说明前提条件Linux安装使用YUM源部署获取阿里云开源镜像站YUM源文件安装Docker-ce配置Docker Daemon启动文件启动Docker服务并查看已安装版本 使用二进制文件部署 Windows安装实现原理安装步骤基本使用 参考说明 前言 本文主要说明Docker及其相关组…

如何使用代理配置快速定位接口测试脚本问题?

在调试接口用例过程中&#xff0c;如果响应结果和预期结果不一致&#xff0c;则需要检查请求信息。通过代理获取自动化测试中的请求响应信息&#xff0c;对比与正常请求响应的区别&#xff0c;就能够更直观的排查请求错误&#xff0c;相当于编写代码时的 debug 功能。 实战练习…

软件上线测评报告怎么做?

软件上线测试 软件上线前必须经过一个整体的测评&#xff0c;从而帮助企业了解软件的运行情况。软件上线测评检测报告&#xff08;软件产品测试报告&#xff09;也通常被称为&#xff1a;科技项目验收测试报告、&#xff08;软件类&#xff09;科技成果鉴定测试、软件检测报告…

cron介绍

cron表达式在线生成 在使用定时调度任务的时候&#xff0c;我们最常用的&#xff0c;就是cron表达式了。通过cron表达式来指定任务在某个时间点或者周期性的执行。 cron表达式的组成 cron表达式是一个字符串&#xff0c;由6到7个字段组成&#xff0c;用空格分隔。其中前6个字…

解决uniapp下拉框 内容被覆盖的问题

1. 下拉框 内容被覆盖的问题 场景: 现在是下拉框被表格覆盖了 解决办法: 在表格上添加css 样式来解决这个问题 .add-table{display: static;overflow: visible; } display: static: 将元素会按照默认的布局方式进行显示&#xff0c;不会分为块状或行内元素。 overflow: vi…

NAT地址转换,路由器作为出口设备,实现负载分担

路漫漫其修远兮&#xff0c;吾将上下而求索 一个善于创造的人&#xff0c;一定是一个善于分享的人。 今天整理了一个实验&#xff0c;具备NAT地址转换&#xff0c;路由器作为出口设备&#xff0c;实现负载分担&#xff0c;实现路由策略 目录 实验图 实验要求 实验配置 基…

Linux--I/O复用之select

目录 一&#xff1a;概念 二&#xff1a;使用 三&#xff1a;参数介绍&#xff1a; 1.ndfs&#xff1a; 2.fd_set类型&#xff1a; 3.readfds&#xff1a; 4.writefds&#xff1a; 5.exceptfds&#xff1a; 6.timeout&#xff1a; 7.返回值&#xff1a; 四&#xff1…

2023年7月京东投影仪行业品牌销售排行榜(京东大数据)

鲸参谋监测的京东平台7月份投影仪行业销售数据已出炉&#xff01; 7月份&#xff0c;投影仪市场呈现增长趋势。根据鲸参谋平台的数据可知&#xff0c;7月京东平台投影仪的销量将近20万&#xff0c;同比增长约16%&#xff1b;销售额将近3.8亿&#xff0c;同比增长约4%。 ​*数据…

django.core.exceptions.AppRegistryNotReady: Apps aren‘t loaded yet.

运行django测试用例报错django.core.exceptions.AppRegistryNotReady: Apps arent loaded yet. 解决&#xff1a;在测试文件上方加上 django.setup() django.setup()是Django框架中的一个函数。它用于在非Django环境下使用Django的各种功能、模型和设置。 在常规的Django应用…

Tequila Works x Incredibuild

关于 Tequila Works Tequila Works 是一家位于西班牙马德里的电子游戏开发商&#xff0c;由劳尔鲁比奥 (Raul Rubio) 和卢兹桑乔 (Luz Sancho) 于2009年创立。该公司著名的游戏产品包括《死亡曙光》(Deadlight)、《霜华》(Rime)、《联盟外传&#xff1a;努努之歌》(Song of Nu…

韶音的耳机怎么样,韶音骨传导耳机值得入手吗

韶音关于骨传导耳机的产品在质量方面还是有着不错的表现&#xff0c;其最具代表性的骨传导耳机就是韶音OpenRun Pro&#xff0c;在国产骨传导耳机中是具备了一定的知名度&#xff0c;有着自主研发的声学技术。 最突出的点就在于颜色上多样化&#xff0c;有着经典的黑色&#xf…

第十八章、【Linux】认识与分析登录文件

18.1 什么是登录文件 什么是登录文件&#xff1f;简单地说&#xff0c;就是记录系统活动信息的几个文件&#xff0c;例如&#xff1a;何时何地何人&#xff0c;做了什么工作。换句话说就是&#xff1a;记录系统在什么时候由哪个程序做了什么样的行为时&#xff0c;发生了什么事…

个人炒伦敦银方法大公开

个人炒伦敦银的方法与机构投资者炒这个品种的方法是有不同的&#xff0c;但是双方可能会借鉴一些相同的分析工具&#xff0c;比方说有的机构可能也会使用技术分析&#xff0c;当然&#xff0c;个人投资者对技术分析这个词更是不会陌生。今天我们就从个人投资者的角度出发&#…

LeetCode(力扣)669. 修剪二叉搜索树Python

LeetCode669. 修剪二叉搜索树 题目链接代码 题目链接 https://leetcode.cn/problems/trim-a-binary-search-tree/ 代码 递归 # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # …

如何在IPhone 14、14 Pro和14 Pro Max上添加屏幕锁定

当你第一次获得iPhone时&#xff0c;系统会提示你为它创建一个密码&#xff0c;这样只有你才能访问它。你应该使用一个必须输入的密码&#xff0c;以便在iPhone 14被唤醒或打开时解锁它。这将提供更高级别的保护。当你打开数据保护时&#xff0c;iPhone上的数据会被加密&#x…

Unity Android 之 在Unity 中引入 OkHttp的操作注意(OKHttp4.xx- kotlin 的包)简单记录

Unity Android 之 在Unity 中引入 OkHttp的操作注意(OKHttp4.xx- kotlin 的包)简单记录 目录 Unity Android 之 在Unity 中引入 OkHttp的操作注意(OKHttp4.xx- kotlin 的包)简单记录 一、简单介绍 二、OKHttp 4.xx 的 SDK 封装 aar 给 Unity 的使用注意 三、附录 OKHttp 的…

Linux gdb单步调试的原理

文章目录 一、demo演示二、原理分析参考资料 一、demo演示 .section .data message:.string "Hello, World!\n" len . - message.section .text .globl _start _start:# 调用 write() 函数输出 "Hello, World!"mov $1, %rax # 系统调用号为 1…