scala 异步调用_非阻塞异步Java 8和Scala的Try / Success / Failure

scala 异步调用

受Heinz Kabutz最近的时事通讯以及我在最近的书中研究的Scala的期货的启发,我着手使用Java 8编写了一个示例,该示例如何将工作提交给执行服务并异步地响应其结果,并使用了回调。无需阻止任何线程等待执行服务的结果。

理论认为,调用拦截方法,如getjava.util.concurrent.Future是坏的,因为系统会需要超过线程的最佳数量,如果它是不断做工作,并在浪费时间与结果上下文切换 。

在Scala世界中,像Akka这样的框架都使用编程模型,这意味着这些框架永远不会阻塞-线程阻塞的唯一时间是用户对阻塞的对象进行编程时,他们不愿意这样做。 通过永不阻塞,该框架可以避免每个内核使用大约一个线程,这比说说标准JBoss Java EE Application Server(在启动后最多拥有400个线程)要少得多。 很大程度上归功于Akka框架的工作,Scala 2.10添加了Futures和Promises ,但是Java中还不存在这些东西。

以下代码显示了我的预期目标。 它分为三个部分。 首先,使用在类ch.maxant.async.Future找到的static future方法将新任务添加到执行服务中。 它返回一个Future ,但不是从java.util.concurrent包中返回一个Future ,而是从ch.maxant.async包中返回其子类。 其次, Future具有一种名为map的方法,该方法遵循Scala或新的Java 8 Stream类的功能样式。 map方法使您可以注册回调,或更准确地说,可以将第一个future包含的值映射(转换)为新值。 在第一个Future完成后,映射将在将来的其他时间执行,因此会产生新的Future 。 第三,我们在Future类中使用另一种方法注册一个回调,一旦我们创建的所有期货都完成,该回调将运行。 任何时候都不会使用Future API的任何阻止方法!

final Random random = new Random();
int numTasks = 10;
List<Future<Integer>> futures = new ArrayList<>();for(int i = 0; i < numTasks; i++){final int j = i;log("adding future " + i);// PART 1//start some work async / in the futureFuture<String> f = future(new Task<String>( () -> {sleep(random.nextInt(1000));if(j < 5){log("working success");return "20";}else{log("working failure");throw new Exception();}}));// PART 2//register a callback, to be called when the work is donelog("adding mapping callback to future");final Future<Integer> f2 = f.map( (Try<String> stringNumber) -> {return stringNumber.map( (String s) -> {log("mapping '" + s + "' to int");return Integer.parseInt(s);}).recover( (Exception e) -> {log("recovering");return -10;}).get(); //wont throw an exception, because we provided a recovery!});futures.add(f2);
}// PART 3
log("registering callback for final result");
Future.registerCallback(futures, (List<Try<Integer>> results) -> {Integer finalResult = results.stream().map( (Try<Integer> t) -> {log("mapping " + t);try {return t.get();} catch (Exception e) {return 0;}}).reduce(0, (Integer i1, Integer i2) -> {log("reducing " + i1 + " and " + i2);return i1 + i2;});log("final result is " + finalResult);Future.shutdown();if(finalResult != 50){throw new RuntimeException("FAILED");}else{log("SUCESS");}
});System.out.println("Completed submitting all tasks on thread " + Thread.currentThread().getId());//this main thread will now die, but the Future executor is still up and running.  the callback will shut it down and with it, the jvm.

第11行调用了future方法来注册一个新Task ,该Task是使用Work实例构造的,在这里是使用Java 8 lambda构造的。 工作会睡一会儿,然后要么返回数字20(作为字符串),要么抛出异常,以演示如何处理错误。

使用第11行从执行服务返回的Future ,第25行将其值从字符串映射为整数,从而生成Future<Integer>而不是Future<String> 。 该结果将添加到第35行的Future列表中,第3部分在第40行中使用该列表registerCallback方法将确保在最后一个future完成后调用给定的回调。

第25-33行的映射使用传递给Try对象的lambda完成。 Try有点像Java 8 Optional ,它是SuccessFailure类的抽象(超类),我是根据对Scala的了解而实现的。 与必须显式检查错误相比,它可使程序员更轻松地处理故障。 我对Try接口的实现如下:

public interface Try<T> {/** returns the value, or throws an exception if its a failure. */T get() throws Exception;/** converts the value using the given function, resulting in a new Try */<S> Try<S> map(Function1<T, S> func);/** can be used to handle recovery by converting the exception into a {@link Try} */Try<T> recover(Recovery<T> r);}

发生的情况是SuccessFailure的实现可以优雅地处理错误。 例如,如果第一个清单的第11行上的Future完成但有例外,则将第一个清单的第25行上的lambda传递给Failure对象,并且在Failure上调用map方法绝对没有任何作用。 没有例外,没有任何问题。 为了补偿,您可以调用recover方法,例如在第一个清单的第29行,该方法允许您处理异常并返回程序可以继续使用的值,例如默认值。

另一方面, Success类以不同的方式实现Try接口的maprecover方法,这样,调用map会导致给定的函数被调用,但是调用recover绝对不会执行任何操作。 maprecover方法无需显式编码try / catch块,而是提供了一种更好的语法,该语法在读取或查看代码时更容易验证(与编写相比,这种情况在代码中更常见)。

由于maprecover方法将函数的结果包装在Try s中,因此您可以将调用链接在一起,例如第Try和32行。Scala的Try API具有比我在这里实现的三种方法更多的方法。 请注意,我选择在Try API java.util.function.Function不使用java.util.function.Function ,因为它的apply方法不会throw Exception ,这意味着第一个清单中显示的代码不像现在那样好。 相反,我写了
Function1接口。

难题的第3部分是如何使程序在所有Future完成之后做一些有用的事情,而又不会像对Future#get()方法那样讨厌调用。 解决方案是注册一个回调,如第40行所示。该回调与此处显示的所有其他回调一样,都已提交给执行服务。 这意味着我们无法保证哪个线程将运行它,这会带来副作用,即线程本地存储(TLS)不再起作用-某些框架((的较旧版本?)Hibernate依赖TLS,而它们只会胜任)。在这里工作。 Scala有一个很好的方法可以使用implicit关键字来解决该问题,而Java还没有(但是…?),因此需要使用其他机制。 我在提到它,只是为了让您知道它。

因此,当最后一个Future完成时,将调用40-60行,并传递包含Integer而不是FutureTryListregisterCallback方法将期货转换为适当的SuccessFailure 。 但是,我们如何将它们转换成有用的东西呢? 幸运的是,Java 8现在有了一个简单的map / reduce,就支持了Stream类,该类通过调用stream()方法从第42行的Try集合中Try了。 首先,我将Try映射(转换)为它们的值,然后在第49行上将流减少为单个值。我本可以使用不使用自己的求和值的lambda实现
Integer::sum ,例如someStream.reduce(0, Integer::sum)

我上次运行该程序时,它输出以下内容:

Thread-1 says: adding future 0
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 1
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 2
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 3
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 4
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 5
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 6
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 7
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 8
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 9
Thread-1 says: adding mapping callback to future
Thread-1 says: registering callback for final result
Thread-10 says: working success
Completed submitting all tasks on thread 1
Thread-14 says: working success
Thread-10 says: working failure
Thread-14 says: working failure
Thread-12 says: working success
Thread-10 says: working failure
Thread-10 says: mapping '20' to int
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-10 says: recovering
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-11 says: working success
Thread-11 says: mapping '20' to int
Thread-13 says: working success
Thread-10 says: mapping '20' to int
Thread-12 says: working failure
Thread-12 says: recovering
Thread-14 says: working failure
Thread-14 says: recovering
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: final result is 50
Thread-14 says: SUCESS

如您所见,主线程添加了所有任务并注册了所有映射功能(第1-20行)。 然后,它注册回调(输出的第21行,与清单的第39行相对应),最后从清单的第63行输出文本,此后它死了,因为它无事可做。 然后,输出的第22行和第24-42行显示了池中的各个线程(包含5个线程),这些线程处理工作以及从String到Integer的映射或从异常中恢复。 这是第一个清单的第1部分和第2部分中的代码。 您可以看到它是完全异步的,在所有初始工作完成之前会发生一些映射/恢复(将第38行或第40行分别映射和恢复到输出的第41行,此行随后发生并且是最后一行)最初的工作)。 第43-52行是map / reduce的输出,它是主列表的第3部分。 请注意,没有记录reduce,因为我运行的代码(位于Github上)使用上面提到的Integer::sum快捷方式,而不是上面显示的第一个清单的第50-51行。

尽管使用Java 6(甚至5?)可以实现所有这些功能,例如通过获取提交到池中的任务来自己提交回调,但是一旦完成,执行该操作所需的代码量就会更大,并且该代码本身将比此处显示的代码更丑陋。 可以使用回调进行映射的Java 8 lambda, Future以及具有简洁错误处理功能的Try API都可以使此处所示的解决方案更具可维护性。

上面显示的代码以及ch.maxant.async包中类的代码在Apache License Version 2.0下可用,并且可以从我的Github帐户下载。

参考: Zoo博客The Kitchen中来自JCG合作伙伴 Ant Kutschera 的非阻塞式异步Java 8和Scala的Try / Success / Failure 。

翻译自: https://www.javacodegeeks.com/2013/10/non-blocking-asynchronous-java-8-and-scalas-trysuccessfailure.html

scala 异步调用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/346377.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java简单的事务单元_junit 单元测试事务自动回滚(亲测有效)

junit 单元测试事务会自动回滚。通过Rollback(true)注解来实现&#xff0c;默认是true&#xff0c;事务会回滚&#xff0c;可以不写。false时事务不会回滚&#xff0c;数据会写到数据库中。实例&#xff1a;package com.xiaolyuh.service;import java.util.Date;import org.jun…

linux系统如何打开python_手把手教你在Linux系统下使用Python虚拟环境

点击上方“Python爬虫与数据挖掘”&#xff0c;进行关注回复“书籍”即可获赠Python从入门到进阶共10本电子书今日鸡汤夕阳无限好&#xff0c;只是近黄昏。/1 前言/Hello小伙伴们&#xff0c;你们好&#xff0c;又是日常get新技能的一天&#xff0c;今天&#xff0c;咱们来整一…

Java EE 7中的资源和依赖注入

1.概述 上下文和依赖注入&#xff08;CDI&#xff09;是Java EE的一项功能&#xff0c;可帮助融合Java EE 6和更高版本中包含的平台的Web层和事务层。 从技术角度来看&#xff0c;这意味着CDI提供了依赖项注入框架&#xff0c;并且还管理了依赖项的生命周期。 今天在本教程中&…

java wrap方法_Java WritableCellFormat.setWrap方法代码示例

import jxl.write.WritableCellFormat; //导入方法依赖的package包/类/*** param workSheet to add the help to* param table to fetch metadata from* param startRow to start adding rows at* param helpTextRowNumbers - map to insert row numbers for each help field i…

python创建tcp socket_Python Socket如何建立TCP连接

在 Python 程序中创建 TCP 服务器时&#xff0c;创建通用 TCP 服务器的一般演示代码如下。需要记住的是&#xff0c;这仅是设计服务器的一种方式。一旦熟悉了服务器设计&#xff0c;可以修改下面的代码来操作服务器。ss socket() #创建服务器套接字ss.bind() #绑定套接字与地址…

ubantu java编辑器_Linux Ubuntu中最好的代码编辑器 程序员都这么看吗?

【IT168 应用】Linux Ubuntu 中最好的代码编辑器是哪个 ? 这要看程序员的使用习惯和喜好了&#xff0c;Atom、Brackets、Sublime Text 是最常用的三个&#xff0c;也许程序员们还有更喜欢的。AtomAtom 是流行&#xff0c;而且十分友好的文本编辑器&#xff0c;而且还可以嵌入到…

apache camel_Apache Camel –从头开始开发应用程序(第1部分/第2部分)

apache camel开始之前 前段时间&#xff0c;我写了一篇关于Spring Integration的教程&#xff0c;以演示如何在受现实发票处理系统启发的示例应用程序中使用Spring Integration。 我对此非常满意&#xff0c;因此我决定向您展示如何使用Apache Camel&#xff08;Spring Integra…

python websocket异步高并发_高并发异步uwsgi+web.py+gevent

为什么用web.py&#xff1f;python的web框架有很多&#xff0c;比如webpy、flask、bottle等&#xff0c;但是为什么我们选了webpy呢&#xff1f;想了好久&#xff0c;未果&#xff0c;硬要给解释&#xff0c;我想可能原因有两个&#xff1a;第一个是兄弟项目组用webpy&#xff…

提示:通过URL激活并发送参数

世界上最安全的密码是不存在的密码。 使用完全随机的密钥从等式中删除用户。 公平地说&#xff0c;这有一些缺点&#xff0c;并且密码仍然存在于某个地方&#xff08;在您的电话/电子邮件中&#xff09;&#xff0c;但通常效果很好。 诀窍很简单&#xff0c;如果我们想对用户进…

weblogic创建域后启动不了_摩托车淋雨后启动不了什么原因?如何解决?

图文是工作&#xff0c;视频是生活。大家好&#xff0c;我是 骑士分享 欢迎您的关注&#xff01;摩托车淋雨后启动不了什么原因&#xff1f;如何解决&#xff1f;这种现象对于电喷车型来说发生的几率并不大&#xff0c;原因就在于电喷车型的线路防水能力会更强&#xff0c;供油…

前端开始学java_[Java教程]开启前端学习之路

[Java教程]开启前端学习之路0 2014-06-10 17:00:06前言第一次在博客园写博客&#xff0c;写写自己开启前端学习之路。应该是受邢师兄的影响吧&#xff0c;不得不说邢师兄人很好&#xff0c;学习也很认真&#xff0c;师兄的前端也是自学的&#xff0c;但是学的很好&#xff0c;大…

python 傅里叶_基于python的图像傅里叶处理

import numpy as npimport matplotlib.pyplot as pltx np.linspace(-10, 10, 1000)a np.cos(x)b a np.cos(3 * x)# d np.log(x)c b np.cos(7 * x)d c - np.cos(10 * x)plt.subplot(2, 2, 1)plt.plot(x, a, label‘$cos(x)$‘, color‘green‘, linewidth1)plt.title(&q…

xalan_如何以10倍速加速Apache Xalan的XPath处理器

xalan一段时间以来&#xff0c; Apache Xalan中存在一个令人尴尬的错误&#xff0c;该错误是XALANJ-2540 。 此错误的后果是Xalan每次XPath表达式求值将内部SPI配置文件加载数千次 &#xff0c;可以很容易地进行如下测量&#xff1a; 这个&#xff1a; Element e (Element)do…

EMUI10安装java_linux ubuntu系统安装java jdk和配置环境,pycharm安装

最近想使用pycharm,可是要想搭建java 环境&#xff0c;搞了很久才搞定&#xff0c;网上很多资料都是没用的。记录下来以后有用。首先加下载jdk安装包。我下的是jdk-6u37-linux-x64.bin&#xff0c;我把jdk安装在usr/lib/jvm1.sudo cp jdk-6u37-linux-x64.bin /usr/lib/jvm #将安…

python gevent async_python的异步初体验(gevent、async、await)

网络爬虫&#xff0c;这种io高密集型的应用由于大部分的时间在等待响应方面&#xff0c;所以CPU的使用率一直不高&#xff0c;速度也不快&#xff0c;为了解决这些问题&#xff0c;我们使用异步的方式来进行爬虫程序。串行的时候&#xff0c;如果我们要爬一个网站&#xff0c;那…

JEP 181不兼容,嵌套类/ 2

JEP 181是基于嵌套的访问控制https://openjdk.java.net/jeps/181 。 它是在Java 11中引入的&#xff0c;它故意引入了与先前版本的不兼容性。 这是一个很好的例子&#xff0c;与Java的先前版本兼容并不是刻板的规则&#xff0c;而是保持语言的一致性和稳定发展。 在本文中&…

abap 导入队列末尾_在C#中将对象添加到队列的末尾-排队操作

要将对象添加到队列的末尾&#xff0c;代码如下-示例using System;using System.Collections.Generic;public class Demo {public static void Main(){Queue queue new Queue();queue.Enqueue("Electronics");queue.Enqueue("Accessories");queue.Enqueue…

vim循环下表复制_Vimrc Init.vim太长了?不存在的

精简配置刚开始接触vim&#xff0c;你会被它各种好看的外观以及实用的插件吸引&#xff0c;各种折腾&#xff0c;不知不觉你的vimrc或者init.vim变得特别长&#xff0c;我之前的init.vim有多长&#xff1f;596行&#xff1f;wtf&#xff1f;每次维护的时候不知道有多麻烦&#…

C语言与JAVA内存管理_C语言内存管理

本章将介绍C语言动态内存管理. C语言编程语言提供了多种功能的内存分配和管理。这些函数可以在头文件中找到。S.N.函数与说明1void *calloc(int num, int size);此函数分配num元素其中每一个字节大小为(size)的数组2void free(void *address);此函数释放由地址指定的存储器块的…

使用LocalDate,LocalTime和LocalDateTime

Java 8对日期和时间API进行了重大更改&#xff0c;这是在JSR 310&#xff1a;日期和时间API的 JDK中包括了Joda Time API 。 此JSR由Joda Time的创建者Stephen Colebourne领导。 有许多惊人的API可用于日期和时间。 在本文中&#xff0c;我将介绍最常用的&#xff1a; java.ti…