同步代码和异步代码
Quasar是一个将真正的轻量级线程(纤维)添加到JVM的库。 它们非常便宜且非常快-实际上,光纤的行为就像Erlang进程或Go goroutines-并允许您编写简单的阻塞代码,同时享受与复杂异步代码相同的性能优势。
在本文中,我们将学习如何将任何基于回调的异步API转换为漂亮的(光纤)阻塞API。 它适用于希望将自己的或第三方库与Quasar光纤集成的用户。 如果您仅将Quasar光纤与通道或演员一起使用,或者利用Comsat项目中已经提供的许多集成功能,则不需要了解这些知识(下面提供的代码是应用程序开发人员从未看到的代码)。 但是,即使您不这样做,您也可能会发现这篇文章对理解Quasar如何发挥其魔力很有帮助。
为什么异步?
首先,许多库提供异步API的原因是OS可以处理的正在运行的1个线程的数量远远少于OS可以维护的开放TCP连接的数量。 也就是说,您的机器可以支持比线程所提供的更高的并发性,因此库以及使用它们的开发人员会放弃线程作为用于软件并发2单元的抽象。 异步API不会阻塞线程,并且会导致显着的性能提升(通常在吞吐量和服务器容量方面,而在延迟方面却没有那么多)。
但是,使用异步API也会创建正确获得“回调地狱”名称的代码。 在缺乏多核处理的环境(例如Javascript)中,回调地狱已经很糟糕了。 在诸如JVM之类的地方,您需要关心内存可见性和同步性会变得更糟。
编写在光纤上运行的阻塞代码具有与异步代码相同的优点,但没有缺点:您使用了不错的阻塞API(甚至可以继续使用现有的API),但是却获得了非阻塞代码的所有性能优势。
可以肯定的是,异步API还有一个优势:它们使您可以同时分派多个IO操作(例如HTTP请求)。 因为这些操作通常需要很长时间才能完成,而且通常是独立的,所以我们可以同时等待其中的几个完成。 但是,Java期货也可以使用此有用的功能,而无需回调。 稍后,我们将看到如何制作博克期货。
光纤异步
许多现代的Java IO /数据库库/驱动程序都提供两种API:一种是同步(线程)阻塞的API,另一种是基于回调的异步API(对于NIO,JAX-RS客户端,Apache HTTP客户端以及更多的API来说都是如此。 )。 同步API更好。
Quasar有一个编程工具,可以将任何基于回调的异步API转换为一个很好的阻止光纤的API: FiberAsync
。 本质上, FiberASync
作用是阻止当前光纤,安装异步回调,并在触发该回调时,它将再次唤醒光纤,并返回操作结果(如果失败,则引发异常)。
为了了解如何使用FiberAsync
,我们将看一个API示例: FooClient
。 FooClient
是一种现代的IO API,因此有两种形式,一种是同步的,线程阻塞的一种,另一种是异步的。 他们来了:
interface FooClient {String op(String arg) throws FooException, InterruptedException;
}interface AsyncFooClient {Future<String> asyncOp(String arg, FooCompletion<String> callback);
}interface FooCompletion<T> {void success(T result);void failure(FooException exception);
}
请注意异步操作(如许多现代库中的情况)如何都需要回调并返回前途。 现在,让我们忽略未来。 我们稍后再讲。
FooClient
比AsyncFooClient
更好,更简单,但是它阻塞了线程并大大降低了吞吐量。 我们想要创建一个FooClient
接口的实现,该接口可以在光纤中运行并阻塞光纤,因此我们获得了简单的代码和出色的吞吐量。 为此,我们将在AsyncFooClient
使用AsyncFooClient
,但将其转换为阻止光纤的FooClient
。 这是我们需要的所有代码(我们将进一步对其进行简化):
public class FiberFooClient implements FooClient {private final AsyncFooClient asyncClient;public FiberFooClient(AsyncFooClient asyncClient) {this.asyncClient = asyncClient;}@Override@SuspendableString op(final String arg) throws FooException, InterruptedException {try {return new FiberAsync<String, FooException>() {@Overrideprotected void requestAsync() {asyncClient.asyncOp(arg, new FooCompletion<String>() {public void success(String result) {FiberAsync.this.asyncCompleted(result);}public void failure(FooException exception) {FiberAsync.this.asyncFailed(exception);}});}}.run();} catch(SuspendExecution e) {throw new AssertionError(e);}}
}
现在,这是怎么回事? 我们正在实施的FooClient
接口,但我们正在做op
纤维粘连,而不是线程阻塞。 我们需要告诉Quasar我们的方法是光纤阻塞(或“可挂起”),因此我们使用@Suspendable
对其进行@Suspendable
。
然后,我们将FiberAsync
子类FiberAsync
并实现requestAsync
方法( FiberAsync
接受的两个通用类型参数是返回类型和操作可能抛出的已检查异常的类型(如果有的话);对于未检查的异常,第二个通用参数应为RuntimeException
)。 requestAsync
负责启动异步操作并注册回调。 然后,回调需要调用asyncCompleted
(如果操作成功)并将其传递给我们希望返回的结果,或者asyncFailed
(如果操作失败)并将失败原因的异常传递给它。
最后,我们调用FiberAsync.run()
。 这将阻止当前光纤,并调用requestAsync
以安装回调。 纤维将保持阻塞,直到回调被触发,它会释放出FiberAsync
通过调用或者asyncCompleted
或asyncFailed
。 run
方法还具有一个带超时参数的版本,如果我们想对阻塞操作进行时间限制(通常是个好主意),该方法很有用。
需要解释的另一件事是try/catch
块。 有两种方法可声明为可@Suspendable
的方法:用@Suspendable
对其进行注释,或声明其引发已检查的异常SuspendExecution
。 FiberAsync
的run
方法使用了后者,因此为了编译代码,我们需要捕获SuspendExecution
,但是由于它不是真正的异常,因此我们永远无法真正捕获它(嗯,至少在Quasar运行正常的情况下,至少不是这样) –因此为AssertionError
。
完成后,您可以在任何光纤中使用op
,如下所示:
new Fiber<Void>(() ->{// ...String res = client.op();// ...
}).start();
顺便说一句,所有的要短很多与脉冲星 (类星体的Clojure的API),其中异步操作:
(async-op arg #(println "result:" %))
使用Pulsar的await
宏将其转换为以下同步的光纤阻塞代码:
(println "result:" (await (async-op arg)))
简化和批量生产
通常,像FooClient
这样的接口将具有许多方法,并且通常, AsyncFooClient
大多数方法将采用相同类型的回调( FooCompletion
)。 如果是这种情况,我们可以将我们已经看到的许多代码封装到FiberAsync
的命名子类中:
abstract class FooAsync<T> extends FiberAsync<T, FooException> implements FooCompletion<T> {@Overridepublic void success(T result) {asyncCompleted(result);}@Overridepublic void failure(FooException exception) {asyncFailed(exception);}@Override@Suspendablepublic T run() throws FooException, InterruptedException {try {return super.run();} catch (SuspendExecution e) {throw new AssertionError();}}@Override@Suspendablepublic T run(long timeout, TimeUnit unit) throws FooException, InterruptedException, TimeoutException {try {return super.run(timeout, unit);} catch (SuspendExecution e) {throw new AssertionError();}}
}
请注意,我们如何使FiberAsync
直接实现FooCompletion
回调–不是必需的,但这是一个有用的模式。 现在,我们的光纤阻塞op
方法要简单得多,并且该接口中的其他操作也可以轻松实现:
@Override
@Suspendable
public String op(final String arg) throws FooException, InterruptedException {return new FooAsync<String>() {protected void requestAsync() {asyncClient.asyncOp(arg, this);}}.run();
}
有时,我们可能希望在常规线程而不是光纤上调用op
方法。 默认情况下,如果在线程上调用FiberAsync.run()
, FiberAsync.run()
引发异常。 为了解决这个问题,我们要做的就是实现另一个FiberAsync
方法requestSync
,如果在光纤上调用run
,它将调用原始的同步API。 我们的最终代码如下(我们假设FiberFooClass
具有类型为FooClient
的syncClient
字段):
@Override
@Suspendable
public String op(final String arg) throws FooException, InterruptedException {return new FooAsync<String>() {protected void requestAsync() {asyncClient.asyncOp(arg, this);}public String requestSync() {return syncClient.op(arg);}}.run();
}
就是这样!
期货
期货是一种方便的方式,可以在我们等待所有独立的IO操作完成时同时开始。 我们希望我们的纤维能够阻挡期货。 许多Java库通过其异步操作返回期货,因此用户可以在完全异步,基于回调的用法和采用期货的“半同步”用法之间进行选择。 我们的AsyncFooClient
接口就是这样。
这是我们实现AsyncFooClient
版本的AsyncFooClient
,该版本返回阻塞光纤的期货:
import co.paralleluniverse.strands.SettableFuture;public class FiberFooAsyncClient implements FooClient {private final AsyncFooClient asyncClient;public FiberFooClient(AsyncFooClient asyncClient) {this.asyncClient = asyncClient;}@Overridepublic Future<String> asyncOp(String arg, FooCompletion<String> callback) {final SettableFuture<T> future = new SettableFuture<>();asyncClient.asyncOp(arg, callbackFuture(future, callback))return future;}private static <T> FooCompletion<T> callbackFuture(final SettableFuture<T> future, final FooCompletion<T> callback) {return new FooCompletion<T>() {@Overridepublic void success(T result) {future.set(result);callback.completed(result);}@Overridepublic void failure(Exception ex) {future.setException(ex);callback.failed(ex);}@Overridepublic void cancelled() {future.cancel(true);callback.cancelled();}};}
}
如果返回, co.paralleluniverse.strands.SettableFuture
返回co.paralleluniverse.strands.SettableFuture
,如果我们在光纤或普通线程(即任何类型的绞线上 )上对其进行阻塞,则效果同样良好。
JDK 8的CompletableFuture和Guava的ListenableFuture
可以使用预先构建的FiberAsync
使返回CompletionStage
(或实现它的CompletableFuture
)的API(在JDK 8中添加到Java中)变得更容易实现光纤阻塞。 例如,
CompletableFuture<String> asyncOp(String arg);
通过以下方式变成光纤阻塞呼叫:
String res = AsyncCompletionStage.get(asyncOp(arg));
返回Google Guava的方法类似地转换为光纤阻塞同步,因此:
ListenableFuture<String> asyncOp(String arg);
通过以下方式变成光纤阻塞:
String res = AsyncListenableFuture.get(asyncOp(arg));
期货的替代品
尽管期货是有用且熟悉的,但我们实际上并不需要使用纤维时返回它们的特殊API。 产生的纤维是如此便宜( Fiber
类实现了Future
,因此纤维本身可以代替“手工”的期货。 这是一个例子:
void work() {Fiber<String> f1 = new Fiber<>(() -> fiberFooClient.op("first operation"));Fiber<String> f2 = new Fiber<>(() -> fiberFooClient.op("second operation"));String res1 = f1.get();String res2 = f2.get();
}
因此,即使我们使用的API不提供,光纤也可以为我们提供期货。
如果没有异步API怎么办?
有时我们很不幸地遇到一个仅提供同步的线程阻塞API的库。 JDBC是此类API的主要示例。 尽管Quasar不能提高使用此类库的吞吐量,但仍然值得使API光纤兼容(实际上非常容易)。 为什么? 因为调用同步服务的光纤也可能做其他事情。 实际上,它们可能很少调用该服务(仅当发生高速缓存未命中时,才考虑从RDBMS读取数据的光纤)。
实现此目的的方法是通过在专用线程池中执行实际的调用,然后通过FiberAsync
封装该假的异步API,将阻塞API转变为异步API。 这个过程是如此机械, FiberAsync
有一些静态方法可以为我们处理所有事情。 因此,假设我们的服务仅公开了阻塞的FooClient
API。 要使其成为光纤阻塞,我们要做的是:
public class SadFiberFooClient implements FooClient {private final FooClient client;private static final ExecutorService FOO_EXECUTOR = Executors.newCachedThreadPool();public FiberFooClient(FooClient client) {this.client = client;}@Override@SuspendableString op(final String arg) throws FooException, InterruptedException {try {return FiberAsync.runBlocking(FOO_EXECUTOR, () -> client.op());} catch(SuspendExecution e) {throw new AssertionError(e);}}
}
FooClient
此实现可以安全地用于线程和光纤。 实际上,当在普通线程上调用该方法时,该方法将不会麻烦将操作分配给提供的线程池,而是在当前线程上执行该操作-就像我们使用原始FooClient
实现时那样。
结论
此处显示的技术FiberAsync
和cpstrands.SettableFuture
正是构成Comsat项目的集成模块的工作方式。 Comsat包括Servlet,JAX-RS(服务器和客户端),JDBC,JDBI,jOOQ,MongoDB,Retrofit和Dropwizard的集成。
重要的是要查看如何-创建简单且高性能的光纤阻塞API-我们确实重新实现了API 接口 ,但没有实现其内部工作:仍然仅通过其异步API来使用原始库代码,其丑陋之处在于现在对图书馆用户隐藏了。
额外信用:单子怎么样?
除了纤程外,还有其他方法可以处理回调地狱。 JVM世界中最著名的机制是Scala的可组合期货,RxJava的可观察对象以及JDK 8的CompletionStage
/ CompletableFuture
。 这些都是单子和单子组成的例子。 Monad可以工作,有些人喜欢使用它们,但是我认为对于大多数编程语言而言,它们是错误的方法。
您会看到,单子是从基于lambda演算的编程语言中借用的。 Lambda演算是一种理论计算模型,与Turing机器完全不同,但完全类似。 但是与图灵机模型不同,lambda演算计算没有步骤,动作或状态的概念。 这些计算没有做任何事情; 他们只是。 那么,Monads是Haskell等基于LC的语言将动作,状态,时间等描述为纯计算的一种方式。 它们是LC语言告诉计算机“先执行然后再执行”的一种方法。
问题是,命令式语言已经有了“先做然后再做”的抽象,而这种抽象就是线程。 不仅如此,而且是必须的语言通常有一个非常简单的符号“这样做,然后做”:声明此后跟该语句。 命令性语言甚至考虑采用这种外来概念的唯一原因是因为(通过OS内核)线程的实现不令人满意。 但是,与其采用一个陌生,陌生的概念(并且该概念需要完全不同的API类型),不如采用一个相似但细微不同的抽象,最好是修复(线程)的实现。 光纤保留抽象并修复实现。
Java和Scala等语言中的monad的另一个问题是,这些语言不仅势在必行,而且还允许不受限制的共享状态突变和副作用-Haskell却没有。 无限制的共享状态突变和“线程”单核的结合可能是灾难性的。 在纯FP语言中-由于副作用是受控的-计算单位(即函数)也是并发单位:您可以安全地同时执行任何一对函数。 当您不受限制的副作用时,情况并非如此。 函数执行的顺序,两个函数是否可以同时执行以及一个函数是否以及何时可以观察到另一个函数执行的共享状态突变都是非常重要的问题。 结果,作为“线程” monad的一部分运行的函数要么必须是纯函数(没有任何副作用),要么必须非常小心如何执行这些副作用。 这正是我们要避免的事情。 因此,尽管单子组合确实比回调地狱生成了更好的代码,但它们不能解决异步代码引入的任何并发问题。
聚苯乙烯
上一节不应理解为像Haskell这样的纯“ FP”语言的认可,因为我实际上认为它们带来了太多其他问题。 我相信(不久的将来)命令性语言3将允许共享状态变异,但具有一些事务语义。 我相信那些未来的语言将主要从Clojure和Erlang等语言中获得灵感。
- 通过运行我的意思是线程往往不够可运行↩
- 见利特尔法则,可扩展性和容错 ↩
- 它们是否“功能性”是一个难题,因为没有人对功能性编程语言是什么以及它与非功能性语言的区别提出了很好的定义。 ↩
翻译自: https://www.javacodegeeks.com/2015/04/farewell-to-asynchronous-code.html
同步代码和异步代码