java parallelstream_关于Java8 parallelStream并发安全的深入讲解

背景

Java8的stream接口极大地减少了for循环写法的复杂性,stream提供了map/reduce/collect等一系列聚合接口,还支持并发操作:parallelStream。

在爬虫开发过程中,经常会遇到遍历一个很大的集合做重复的操作,这时候如果使用串行执行会相当耗时,因此一般会采用多线程来提速。Java8的paralleStream用fork/join框架提供了并发执行能力。但是如果使用不当,很容易陷入误区。

Java8的paralleStream是线程安全的吗

一个简单的例子,在下面的代码中采用stream的forEach接口对1-10000进行遍历,分别插入到3个ArrayList中。其中对第一个list的插入采用串行遍历,第二个使用paralleStream,第三个使用paralleStream的同时用ReentryLock对插入列表操作进行同步:

private static List list1 = new ArrayList<>();

private static List list2 = new ArrayList<>();

private static List list3 = new ArrayList<>();

private static Lock lock = new ReentrantLock();

public static void main(String[] args) {

IntStream.range(0, 10000).forEach(list1::add);

IntStream.range(0, 10000).parallel().forEach(list2::add);

IntStream.range(0, 10000).forEach(i -> {

lock.lock();

try {

list3.add(i);

}finally {

lock.unlock();

}

});

System.out.println("串行执行的大小:" + list1.size());

System.out.println("并行执行的大小:" + list2.size());

System.out.println("加锁并行执行的大小:" + list3.size());

}

执行结果:

串行执行的大小:10000

并行执行的大小:9595

加锁并行执行的大小:10000

并且每次的结果中并行执行的大小不一致,而串行和加锁后的结果一直都是正确结果。显而易见,stream.parallel.forEach()中执行的操作并非线程安全。

那么既然paralleStream不是线程安全的,是不是在其中的进行的非原子操作都要加锁呢?我在stackOverflow上找到了答案:

https://codereview.stackexchange.com/questions/60401/using-java-8-parallel-streams

https://stackoverflow.com/questions/22350288/parallel-streams-collectors-and-thread-safety

在上面两个问题的解答中,证实paralleStream的forEach接口确实不能保证同步,同时也提出了解决方案:使用collect和reduce接口。

http://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html

在Javadoc中也对stream的并发操作进行了相关介绍:

The Collections Framework provides synchronization wrappers, which add automatic synchronization to an arbitrary collection, making it thread-safe.

Collections框架提供了同步的包装,使得其中的操作线程安全。

所以下一步,来看看collect接口如何使用。

stream的collect接口

闲话不多说直接上源码吧,Stream.java中的collect方法句柄:

R collect(Collector super T, A, R> collector);

在该实现方法中,参数是一个Collector对象,可以使用Collectors类的静态方法构造Collector对象,比如Collectors.toList(),toSet(),toMap(),etc,这块很容易查到API故不细说了。

除此之外,我们如果要在collect接口中做更多的事,就需要自定义实现Collector接口,需要实现以下方法:

Supplier supplier();

BiConsumer accumulator();

BinaryOperator combiner();

Function finisher();

Set characteristics();

要轻松理解这三个参数,要先知道fork/join是怎么运转的,一图以蔽之:

eac370d76c28539821dca5f577353c48.png

上图来自:http://www.infoq.com/cn/articles/fork-join-introduction

简单地说就是大任务拆分成小任务,分别用不同线程去完成,然后把结果合并后返回。所以第一步是拆分,第二步是分开运算,第三步是合并。这三个步骤分别对应的就是Collector的supplier,accumulator和combiner。talk is cheap show me the code,下面用一个例子来说明:

输入是一个10个整型数字的ArrayList,通过计算转换成double类型的Set,首先定义一个计算组件:

Compute.java:

public class Compute {

public Double compute(int num) {

return (double) (2 * num);

}

}

接下来在Main.java中定义输入的类型为ArrayList的nums和类型为Set的输出结果result:

private List nums = new ArrayList<>();

private Set result = new HashSet<>();

定义转换list的run方法,实现Collector接口,调用内部类Container中的方法,其中characteristics()方法返回空set即可:

public void run() {

// 填充原始数据,nums中填充0-9 10个数

IntStream.range(0, 10).forEach(nums::add);

//实现Collector接口

result = nums.stream().parallel().collect(new Collector>() {

@Override

public Supplier supplier() {

return Container::new;

}

@Override

public BiConsumer accumulator() {

return Container::accumulate;

}

@Override

public BinaryOperator combiner() {

return Container::combine;

}

@Override

public Function> finisher() {

return Container::getResult;

}

@Override

public Set characteristics() {

// 固定写法

return Collections.emptySet();

}

});

}

构造内部类Container,该类的作用是一个存放输入的容器,定义了三个方法:

accumulate方法对输入数据进行处理并存入本地的结果

combine方法将其他容器的结果合并到本地的结果中

getResult方法返回本地的结果

Container.java:

class Container {

// 定义本地的result

public Set set;

public Container() {

this.set = new HashSet<>();

}

public Container accumulate(int num) {

this.set.add(compute.compute(num));

return this;

}

public Container combine(Container container) {

this.set.addAll(container.set);

return this;

}

public Set getResult() {

return this.set;

}

}

在Main.java中编写测试方法:

public static void main(String[] args) {

Main main = new Main();

main.run();

System.out.println("原始数据:");

main.nums.forEach(i -> System.out.print(i + " "));

System.out.println("\n\ncollect方法加工后的数据:");

main.result.forEach(i -> System.out.print(i + " "));

}

输出:

原始数据:

0 1 2 3 4 5 6 7 8 9

collect方法加工后的数据:

0.0 2.0 4.0 8.0 16.0 18.0 10.0 6.0 12.0 14.0

我们将10个整型数值的list转成了10个double类型的set,至此验证成功~

本程序参考 http://blog.csdn.net/io_field/article/details/54971555。

一言蔽之

总结就是paralleStream里直接去修改变量是非线程安全的,但是采用collect和reduce操作就是满足线程安全的了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/346737.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jakarta EE,EE4J和Java EE之间的关系

Jakarta EE的名称已经存在了一个多月&#xff0c;即使Mike Milinkovich在他的博客文章“名称是……”中很好地解释了名称和概念&#xff0c;但对于它们之间的关系仍然有些困惑&#xff0c;我也提出了疑问只要有话题就可以围绕它。 我试图在这里总结一下。 希望能帮助到你&#…

java readline 超时_跳过Java中的BufferedReader readLine()方法

是否有一种简单的方法可以跳过java中的readLine()方法(如果它花费的时间超过2秒)&#xff1f;这是我问这个问题的背景&#xff1a;public void run(){boolean looping true;while(looping) {for(int x 0; xtry {Comm s clientList.get(x);String str s.recieve();// code t…

minwindow java_Java经典算法:最小窗口子字符串

给定一个字符串S和一个字符串T&#xff0c;找到S中的最小窗口&#xff0c;其中将包含T中所有字符的复杂度为O(n)。例如&#xff0c;S “ ADOBECODEBANC”&#xff0c;T “ ABC”&#xff0c;最小窗口为“ BANC”。Java解决方案public String minWindow(String s, String t) {Ha…

j2ee可以用于前端开发吗_用于J2EE开发的Cloud IDE

j2ee可以用于前端开发吗随着许多传统软件工具进入云计算&#xff0c;我想看看它们与传统工具的比较。 我的兴趣是J2EE技术​​&#xff0c;并开始寻找一种云服务&#xff0c;该服务使我能够开发&#xff0c;测试和部署J2EE应用程序。 我很快发现&#xff0c;许多云IDE是为Web前…

java为什么全是乱码_java一切乱码的解释 以及源头

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼InputStreamReader in new InputStreamReader(new FileInputStream(“demo.txt”),”UTF-8”);给InputStreamReader指定解码编码&#xff0c;这样二者统一就不会出现乱码了。下面说说字符输出流。字符输出流的原理和字符输入流的原…

JAVA连接数据库ij_Derby 客户端 ij使用

Derby是开源的、嵌入式的Java数据库程序&#xff0c;ij是Derby提供的客户端工具&#xff0c;相当于其他数据库提供的sqlplus工具。ij是纯Java的程序&#xff0c;不用安装&#xff0c;使用起来就像运行普通的Java应用程序一样。接下来说下&#xff1a;如何使用ij工具去操作数据库…

使用Google Cloud Storage托管您的Maven工件

如果您使用Google Cloud并将Java用于项目&#xff0c;那么Google Cloud Storage是托管团队工件的理想场所。 它很容易设置&#xff0c;而且很便宜。 如果您对它们的功能不特别感兴趣&#xff0c;那么它比设置现有存储库选项&#xff08;jfrog&#xff0c;nexus&#xff0c;arc…

java+map对象判断空值_java判断map中是否存在指定对象

Map判断是否包含指定的value使用containsValue方法。(推荐&#xff1a;java视频教程)定义containsValue(Object value) 如果此映射将一个或多个键映射到指定值&#xff0c;则返回 true示例&#xff1a;/**** Map集合判断是否包含value**/public class MapDemo{public static vo…

mysql 主主模式优缺点_mysql主主同步模式

主192.168.56.20 和 从都新建数据库db1 db2 db3(如果数据库在用&#xff0c;需要上锁后手动从主备份&#xff0c;然后在从恢复)mysql> create database db1;Query OK, 1 row affected (0.00 sec)mysql> create database db2;Query OK, 1 row affected (0.00 sec)mysql>…

spring 类型转换器_Spring中的类型转换

spring 类型转换器以下是一些需要类型转换的简单情况&#xff1a; 情况1。 为了帮助简化bean配置&#xff0c;Spring支持属性值与文本值之间的转换。 每个属性编辑器仅设计用于某些类型的属性。 为了使用它们&#xff0c;我们必须在Spring容器中注册它们。 案例2。 同样&…

java+语音识别+谷歌_JAVA使用谷歌语音识别API

我正在尝试使用谷歌语音识别API.这是我写的代码&#xff1a;有用.我从服务器得到答案&#xff1a;{"status":5,"id":"8803471b14a2310dfcf917754e8bd4a7-1","hypotheses":[]}现在的问题是“状态&#xff1a;5”.事实上,这里的状态代码…

java中对象字节数_JAVA中求解对象所占字节大小

该类为cache4j缓存框架中的工具类方法&#xff0c;该方法实现了两个接口接口1&#xff1a;计算对象在内存中所占字节数接口2&#xff1a;复制对象&#xff0c;实现深度克隆效果&#xff0c;实现原理为先序列化对象&#xff0c;然后在反序列化对象&#xff1b;返回一个新的对象&…

excel查重复_毕业季 | 如何降低论文的查重率

毕业季吾日三省吾身实验做完了吗&#xff1f;论文写完了吗&#xff1f;查重能通过吗&#xff1f;学术圈的前辈告诉我们&#xff0c;只有站在巨人的肩膀上才能看得更远。在撰写一篇论文时&#xff0c;为保证质量和可靠性&#xff0c;难免需要引用前人的成果&#xff0c;这也反映…

Sun过去的世界中的JDK 11和代理

使用JDK 11后&#xff0c;就sun.misc.Unsafe的第一种方法。 其中&#xff0c; defineClass方法已删除。 代码生成框架通常使用此方法在现有的类加载器中定义新的类。 尽管此方法易于使用&#xff0c;但它的存在也使JVM本质上不安全&#xff0c;正如其定义类的名称所暗示的那样。…

java中 private final_Java笔记:final与private关键字

记录一个有趣的现象&#xff0c;private修饰的方法子类是访问不了的&#xff0c;且类中所有private修饰的方法都隐式的指定为final(可以对private方法添加final修饰词&#xff0c;但是这并不能给该方法增加任何额外的意义)&#xff0c;final修饰的方法是不可以被重写的。但是如…

Java,JavaFX的流畅设计风格进度栏

按照承诺&#xff0c;刚刚发布的Java JavaFX主题JMetro 4.6版为进度栏带来了新样式。 进度栏有两种可能的状态&#xff1a;确定和不确定&#xff0c;新的JMetro版本具有这两种状态。 在本文中&#xff0c;我还将详细介绍一些我在JMetro中遵守的API设计原则。 JMetro API设计原…

安卓最新系统_成纺移动校园(移动办公系统)V3.2.1 安卓最新版

成纺移动校园(移动办公系统)是额一个非常实用的办公工具。您可以使用该软件及时浏览最新的校园信息&#xff0c;同时涵盖许多功能&#xff0c;例如时间表查询&#xff0c;会议安排&#xff0c;校园地图&#xff0c;校车等。有需要的用户欢迎来绿色先锋网下载。 成纺移动校园简介…

java时间日期格式器_JAVA基础类库(二)-----日期、时间类和格式器

Date类public classDateTest{public static voidmain(String[] args){Date d1 newDate();//获取当前时间之后100ms的时间Date d2 new Date(System.currentTimeMillis() 10000);System.out.println(d1);System.out.println(d2);//比较d1,d2是否相等,相等返回0&#xff0c;大于…

pyqt 获取 UI 中组件_你想知道的React组件设计模式这里都有(上)

本文梳理了容器与展示组件、高阶组件、render props这三类React组件设计模式往期回顾&#xff1a;HBaseCon Asia 2019 Track 3 概要回顾随着 React 的发展&#xff0c;各种组件设计模式层出不穷。React 官方文档也有不少相关文章&#xff0c;但是组织稍显凌乱&#xff0c;本文就…

jvm上的随机数与熵_向您的JVM添加一些熵

jvm上的随机数与熵能否生成真正的随机数取决于系统中的熵。 有人声称&#xff0c;这可以通过掷骰子来保证。 其他人认为&#xff0c;用此主体替换OpenJDK的java.math.Random.nextInt&#xff08;&#xff09;方法将有所帮助&#xff1a; public int nextInt() {return 14; }资…