Java并发中常用同步工具类

为什么80%的码农都做不了架构师?>>>   hot3.png

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程控制流。阻塞队列(BlockingQueue)可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore),栅栏(Barrier)以及闭锁(Latch)。在平台类库中还包含其他一些同步工具类的类,如果这些类还无法满足需要,那么可以创建自己的同步工具类。

闭锁Latch

闭锁可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关着的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须在这个闭锁上等待。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S以来的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。
  • 等待直到某个操作的所有参与者(例如:在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有的玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况下使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法将递减计数器,表示有一个事件已经发生了,而await方法等待计数器到达零,这表示所有需要的事件都已经发生。如果计数器的值为非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

在下面的TestHarness中给出了闭锁的两种常见用法。TestHarness创建一定数量的线程,利用它们并发的执行指定的任务。它使用两个闭锁,分别表示“起始门(Start Gate)”和“结束门(End Gate)”。起始门计数器的初始值为1,而结束们计数器的初始值为工作线程数量。每个工作线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是调用countDown方法使事件数量减1,这能使主线程高效的等待直到所有工作线程都执行完成,因此可以统计所消耗的时间:

public class TestHarness {public long timeTasks(int nThread, final Runnable task) throws InterruptedException {final CountDownLatch startGate = new CountDownLatch(1);final CountDownLatch endGate = new CountDownLatch(nThread);for (int i = 0; i < nThread; i++) {Thread t = new Thread() {public void run() {try {startGate.await();try {task.run();} finally {endGate.countDown();}} catch(InterruptedException ignored) {}}};t.start();}long start = System.nanoTime();startGate.countDown();long end = System.nanoTime();return end - start;}
}

为什么要在TestHarness中使用闭锁,而不是在线程创建后就立即启动?或许,我们希望测试n个线程并发执行某个任务时需要的时间。如果在创建线程之后立即启动它们,那么先启动的线程将“领先”后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。起始门将使得主线程能够同时释放所有的工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序的等待每个线程执行完成。

FutureTask

FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算)。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和完成运行(Completed)。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态以后,它会永远停止在这个状态上。

Future.get的行为取决于任务的状态。如果任务已经执行完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。下面的程序就使用了FutureTask来执行一个高开销的计算,并且计算结果将在稍后使用。通过提前启动计算,可以减少在等待结果时需要的时间:

public class Preloader{private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>(){public ProductInfo call() throws DataLoadException{return loadProductInfo();}});private final Thread thread = new Thread(future);public void start() {thread.start();}public ProductInfo get() throws DataLoadException, InterruptedException{try {return future.get();} catch(ExecutionException e) {Throwable cause = e.getCause();if (cause instance of DataLoadException) {throw (DataLoadException)cause;} else {throw launderThrowable(cause);}}}
}

Preloader创建了一个FutureTask,其中包含从数据库加载产品信息的任务,以及一个执行运算的线程。由于在构造函数或静态初始化方法中启动线程并不是一种好方法,因此提供了一个start方法来启动线程。当程序虽有需要ProductInfo时,可以调用get方法,如果数据已经加载,那么将返回这些数据,否则将等待加载完成后再返回。

Callable表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个Error。无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中被重新抛出。这将使调用get的代码变得复杂,因为它不仅需要处理可能出现的ExecutionException(以及未检查的CancellationException),而且还由于ExecutionException是作为一个Throwable类返回的,因此处理起来并不容易。

信号量Semaphore

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界

Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放即可。如果没有许可,那么aquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

Semaphore可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将Semaphore的计数值初始化为池的大小,并在从池中获取一个资源之前先调用aquire方法获得一个许可,在将资源返回给池之后调用release释放许可,这样就实现了固定长度了。

同样,你可以使用Semaphore将任何一种容器变成有界阻塞容器,如下代码所示。信号量的计数值会初始化为容器容量的最大值。add操作在向底层容器中添加一个元素之前,首先需要获得一个许可。如果add操作没有添加任何元素,那么会立刻释放许可。同样remove操作释放一个许可,使更多的元素能够添加到容器中。底层的Set实现并不知道关于边界的任何信息,这是由BoundedHashSet来处理的。

public class BoundedHashSet<T> {private final Set<T> set;private final Semaphore sem;public BoundedHashSet(int bound) {this.set = Collections.synchronizedSet(new HashSet<T>());sem = new Semaphore(bound);}public boolean add(T o) throws InterruptedException {sem.aquire();boolean wasAdded = false;try {wasAdded = set.add(o);return wasAdded;} finally {if (!wasAdded) {sem.release();}}}public boolean remove(Object o) {boolean wasRemoved = set.remove(o);if (wasRemoved) {sem.release();}return wasRemoved;}
}

栅栏Barrier

我们已经看到通过闭锁来启动一组相关的操作,或者等待也组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。

栅栏(Barrier)类似于闭锁,它能阻塞线程直到某个事件发生。栅栏与闭锁的关键区别在于:所有线程必须同时到达栅栏位置,才能继续执行。闭锁也用于等待事件,而栅栏也用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人6:00在麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事情。”

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将也一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时,将调用await方法,这个方法将阻塞直到所有线程都到达阻塞位置。如果所有线程都到达了栅栏的位置,那么栅栏将打开,此时所有线程都将被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏参数传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在下一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。

下面的例子中给出了如何通过栅栏来计算细胞的自动化模拟。在把模拟过程并行化时,为每个元素(这里为每个细胞)分配一个独立的线程是不现实的,也因为这将产生过多的线程,而在协调这些线程上导致的开销将降低计算性能。合理的做法是,将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并起来。CellularAutomata将问题分解为Ncpu(可用的CPU数量)个子问题,并将每个子问题分配给一个线程。在每个步骤中,工作线程都为各自子问题中的所有细胞计算新值。当所有工作线程都到达栅栏时,栅栏会把这些新值交给数据模型。在栅栏的操作执行完成以后,工作线程将开始下一步的计算,包括调用isDone方法来判断是否需要进行下一次迭代。

public class CellularAutomata {private final Board mainBoard;private final CyclicBarrier barrier;private final Worker[] workers;public CellularAutomata (Board board) {this.mainBoard = board;int count = Runtime.getRuntime().availableProcessors();this.barrier = new CyclicBarrier(count, new Runnable(){public void run() {mainBoard.commitNewValues();}});this.workers = new Worker[count];for (int i = 0; i < count; i++) {workers[i] = new Worker(mainBoard.getSubBoard(count, i));}}private class Worker implements Runnable {private final Board board;public Worker(Board board) {this.board = board;}public void run() {while(!board.hasConverged()) {for (int x = 0; x < board.getMaxX(); x++) {for (int y = 0; y < board.getMaxY(); y++) {board.setNewValue(x, y, computeValue(x, y));}}try {barrier.await();} catch(InterruptedException ex) {return;} catch(BrokenBarrierException ex) {return;}}}}public void start() {for (int i = 0; i < workers.length; i++) {new Thread(workers[i]).start();}mainBoard.waitForConvergence();}
}

另一种形式的栅栏就是Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。

数据交换的时机取决于应用程序的响应需求。最简单的方案是,当缓冲区被填满时,由填充任务进行交换,当缓冲区为空时,由清空任务进行交换。这样会把需要交换的次数降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就将也延迟。另一个方法是,不仅当缓冲被填满时进行交换,并且当缓冲被填充到一定程度并保持也一定时间也以后,也进行交换。

转载于:https://my.oschina.net/itblog/blog/775918

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/260267.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux平台Oracle多个实例启动说明

环境说明:oracle实例1的SID为orcl(为默认启动的实例),第二个实例的SID为orcl2 启动步骤&#xff1a; 1&#xff09;启动数据库实例完成后&#xff0c;启动数据库监听服务 #lsnrctl start 2&#xff09;切换到需要启动的数据库实例下&#xff0c;如下表示启动的是orcl数据库…

RTMP协议发送H.264编码及AAC编码的音视频,实现摄像头直播

RTMP协议发送H.264编码及AAC编码的音视频&#xff0c;实现摄像头直播 摘要: RTMP协议发送H.264编码及AAC编码的音视频&#xff0c;实现摄像头直播  RTMP&#xff08;Real Time Messaging Protocol&#xff09;是专门用来传输音视频数据的流媒体协议&#xff0c;最初由Macrome…

java消息顺序执行_Apache Flink:如何并行执行但保持消息顺序?

请在下面找到使用侧输出和插槽组进行本地扩展的示例 .package org.example/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regardi…

python的字符串定界符可以使用_使用Template格式化Python字符串的方法

对Python字符串&#xff0c;除了比较老旧的%&#xff0c;以及用来替换掉%的format&#xff0c;及在python 3.6中加入的f这三种格式化方法以外&#xff0c;还有可以使用Template对象来进行格式化。from string import Template&#xff0c;可以导入Template类。实例化Template类…

【ES实战】ES6.7的tar包离线安装帮助手册

Elasticsearch6.7部署帮助手册 校验时间&#xff1a;2023年12月19日 文章目录 Elasticsearch6.7部署帮助手册安装前准备安装包安装要求锁定内存,修改最大文件描述符,最大线程数内核参数 部署规划端口规划用户规划目录规划 安装步骤每个服务器配置JDK配置文件master角色node角色…

jenkins 部署文档

Jenkins是一个非常出色的持续集成服务器&#xff0c;本文主要介绍在CentOS系统中Jenkins的基本安装配置方法&#xff0c;供参考。一. 软件包&#xff1a;1. 下载apache-maven-2.2.1-bin.tarhttp://www.apache.org/dyn/closer.cgi/maven/binaries/apache-maven-2.2.1-bin.tar.gz…

牛人,多看看他们写的东西

计算机大师 Donald E. Knuth&#xff08;高德纳&#xff09; 算法大师&#xff0c;我最崇拜的计算机科学家&#xff0c;没有之一&#xff01;不认识高爷爷的人别说自己是学计算机的。《The Art of Computer Programming》绝对是计算机科学的圣经。对高爷爷的崇敬&#xff0c;对…

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1)

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1) 在vb里面 等价于ii-1 在C#里面 等价于i-- 是有C#自动转VB时转换的转载于:https://www.cnblogs.com/YaDi/archive/2012/11/08/2759802.html

java快速查找中位数_用QuickSort快速查找中位数(median)

中位数(median)是一个排好序的元素中中间位置的元素&#xff0c;如果元素个数为偶数&#xff0c;则是中间两个元素的平均值。例如(3,1,5)的中位数是3&#xff0c;而(2,1,3,5)的中位数是2.5。查找中位数属于SelectionAlgorithms的一种。用快速排序可以做到每次divide之后&#x…

python安装mysql数据库_windows10安装mysql-8.0.13(zip安装)~Python安装mysql

windows10安装mysql-8.0.13(zip安装)安装环境说明系统版本&#xff1a;windows10mysql版本&#xff1a;mysql-8.0.13-winx64.zip下载地址&#xff1a;http://mirrors.163.com/mysql/Downloads/MySQL-8.0/mysql-8.0.13-winx64.zip解压安装包解压路径&#xff1a;D:\develop\soft…

centos 下使用sublime

CentOS 之 Sublime text3 安装及配置&#xff08;不支持中文输入&#xff09; sublime text 的界面友好&#xff0c;自动补全功能也不错。 &#xff08;本来用vimphp_function.txt的形式进行补全的&#xff0c;但是配置后的补全不太满意&#xff0c;放弃了。 具体参见&#xff…

20121108团队博客(苏若)

PS&#xff1a;这本是属于昨晚的帖子&#xff0c;对不住忠仔。现在补上。 忠仔&#xff0c;终于交给了我一个实实在在的任务&#xff0c;很是欣喜&#xff0c;也很是忐忑&#xff0c;生怕自己不能及时完成任务。 好了&#xff0c;废话不多说&#xff0c;步入正题。 接下任务【画…

python 倒排索引 性能_python 实现倒排索引的方法

代码如下&#xff1a;#encoding:utf-8fin open(1.txt, r)建立正向索引:“文档1”的ID > 单词1&#xff1a;出现位置列表&#xff1b;单词2&#xff1a;出现位置列表&#xff1b;…………“文档2”的ID > 此文档出现的关键词列表。forward_index {}for line in fin:line…

pythonnet下载_Python for .NET

Python for .NET 是一个可以让 Python 程序员近乎无缝的集成 .NET 通用语言环境 CLR 和以及为 .NET 开发者提供一个强大的应用脚本工具。通过这个项目你可在 .NET 中完全使用 Python 来编写整个应用&#xff0c;使用 .NET 服务和组件。这个包并没有用 CLR 语言实现一个 Python&…

webService详解

什么是webService WebService&#xff0c;顾名思义就是基于Web的服务。它使用Web(HTTP)方式&#xff0c;接收和响应外部系统的某种请求。从而实现远程调用. 1:从WebService的工作模式上理解的话&#xff0c;它跟普通的Web程序&#xff08;比如ASP、JSP等&#xff09;并没有本…

读《有人负责,才有质量:写给在集市中迷失的一代》总结与感想

在大伙都在吹捧“市集”开发软件的方式的大浪潮下&#xff0c;作者看到了其中的不当之处&#xff0c;发现其中有许多的问题&#xff0c;因此写下这篇文章给予吹捧“市集”的人一个提醒&#xff0c;甚至警告。 在该文章里&#xff0c;作者认为“市集”里的“农民”不可能建造出和…

php 判断是否文件,利用PHP判断文件是否为图片的方法总结

前言在网页设计中&#xff0c;如果需要图片&#xff0c;我们通常拿到的是一个图片的文件名。仅仅通过文件名是无法判断该文件是否是一个图片文件的。或许有的人以为通过后缀名就可以判断&#xff0c;别忘了文件的后缀名是可以随便改动的。更何况&#xff0c;在 Linux 系统下是不…

textedit怎么插入数据_还在手动插入Excel交叉空白行?这个小技巧10秒搞定

导读&#xff1a;前几天有同学在后台提问&#xff0c;怎么快速在Excel中隔行插入一行或者多行空白行&#xff0c;其实在早期我们分享的小视频中有利用过类似的小技巧来制作工资条&#xff0c;今天我们用它来插入空白行。文/ 芒种学院指北针Hello&#xff0c;大家好&#xff0c;…

python制作安装包(setup.py)

1.制作setup.py from distutils.core import setupsetup(nameMyblog,version1.0,descriptionMy Blog Distribution Utilities,authorlujianxing,author_emaillujianxinglujianxing.com,urlhttp://blog.lujianxing.com,py_modules[foo] ) py_modules 定义 需要打包的模块名 2.创…

[Ruby]$: 是什么意思?

ruby comes with a set of predefined variables$: default search path (array of paths)其他Ruby特殊变量&#xff1a; $! 最近一次的错误信息 $ 错误产生的位置 $_ gets最近读的字符串 $. 解释器最近读的行数(line number) $& 最近一次与正则表达式匹配的字符串 $~ 作为…