CountDownLatch和CyclicBarrier源码详解

 其他系列文章导航

Java基础合集

设计模式合集

多线程合集

分布式合集

ES合集


文章目录

其他系列文章导航

文章目录

前言

一、CountDownLatch和CyclicBarrier

二、CountDownLatch源码分析

三、CyclicBarrier源码分析

四、总结


前言

我现在有个场景:现在我有50个任务,这50个任务在完成之后,才能执行下一个函数,要是你,你怎么设计?

可以用JDK给我们提供的线程工具类,CountDownLatch和CyclicBarrier都可以完成这个需求。


一、CountDownLatch和CyclicBarrier

CountDownLatch允许一个或多个线程直等待,直到这些线程完成它们的操作。

而CyclicBarrier不一样,它往往是当线程到达某状态后,暂停下来等待其他线程等到所有线程均到达以后,才继续执行。

可以发现这两者的等待主体是不一样的。

CountDownLatch调用await()通常是主线程/调用线程,而CyclicBarrier调用await()是在任务线程调用的。

所以,CyclicBarrier中的阻塞的是任务的线程,而主线程是不受影响的。


二、CountDownLatch源码分析

CountDownLatch也是基于AQS实现的,它的实现机制很简单。

Java 中 AQS(AbstractQueuedSynchronizer)的 state 一般有以下值:

  1. 0:表示当前没有任何线程占有锁或者资源。

  2. 1:表示当前有一个线程占有锁或者资源。

  3. 大于等于 2:表示当前有多个线程占有锁或者资源,即存在竞争状态。

  4. 负数:表示当前有等待获取锁或者资源的线程,这些线程处于阻塞状态。

  5. 特殊值:AQS 实现类可以根据自己的需求自定义特殊的 state 值,例如 ReentrantLock 就使用了一个表示重入次数的 state 值。

当我们在构建CountDownLatch对象时,传入的值其实就会赋值给AQS的关键变量state。

如下:

    Sync(int count) {setState(count);}/*** Constructs a {@code CountDownLatch} initialized with the given count.** @param count the number of times {@link #countDown} must be invoked*        before threads can pass through {@link #await}* @throws IllegalArgumentException if {@code count} is negative*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}

执行countDown方法时,其实就是利用CAS将state -1。

如下:

/*** Decrements the count of the latch, releasing all waiting threads if* the count reaches zero.** <p>If the current count is greater than zero then it is decremented.* If the new count is zero then all waiting threads are re-enabled for* thread scheduling purposes.** <p>If the current count equals zero then nothing happens.*/public void countDown() {sync.releaseShared(1);}

执行await方法时,其实就是判断state是否为0,不为0则加入到队列中,将该线程阻塞掉(除了头结点)。

如下:

    public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}

因为头节点会一直自旋等待state为0,当state为0时,头节点把剩余的在队列中阻塞的节点也一并唤醒。

如下:

    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

三、CyclicBarrier源码分析

回到CyclicBarrier代码也不难,重点也就是await方法。

从源码不难发现的是,它没有像CountDownLatch和ReentrantLock使用AQS的state变量,而CyclicBarrier是直接借助ReentrantLock加上Condition 等待唤醒的功能进而实现的。

如下:

    /** The lock for guarding barrier entry */private final ReentrantLock lock = new ReentrantLock();/** Condition to wait on until tripped */private final Condition trip = lock.newCondition();

在构建CyclicBarrier时,传入的值会赋值给CyclicBarrier内部维护count变量,也会赋值给parties变量(这是可以复用的关键)。

如下:

    /*** Creates a new {@code CyclicBarrier} that will trip when the* given number of parties (threads) are waiting upon it, and which* will execute the given barrier action when the barrier is tripped,* performed by the last thread entering the barrier.** @param parties the number of threads that must invoke {@link #await}*        before the barrier is tripped* @param barrierAction the command to execute when the barrier is*        tripped, or {@code null} if there is no action* @throws IllegalArgumentException if {@code parties} is less than 1*/public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}

每次调用await时,会将count -1,操作count值是直接使用ReentrantLock来保证线程安全性。

如下:

    public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {...int index = --count;   ...          }

如果count不为0,则添加则condition队列中。

如下:

            for (;;) {try {if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}...}

如果count等于0时,则把节点从condition队列添加至AQS的队列中进行全部唤醒并且将parties的值重新赋值为count的值(实现复用)。

如下:

            if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}/*** Updates state on barrier trip and wakes up everyone.* Called only while holding lock.*/private void nextGeneration() {// signal completion of last generationtrip.signalAll();// set up next generationcount = parties;generation = new Generation();}

四、总结

CountDownlatch基于AQS实现,会将构造CountDownLatch的入参传递至statecountDown()就是在利用CAS将state减1,await)实际就是让头节点一直在等待state为0时,释放所有等待的线程。

CyclicBarrier则利用ReentrantLock和Condition,自身维护了count和parties变量。每次调用await将count-1,并将线程加入到condition队列上。等到count为0时,则将condition队列的节点移交至AQS队列,并全部释放。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/178403.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Web】NewStarCtf Week2 个人复现

目录 ①游戏高手 ②include 0。0 ③ez_sql ④Unserialize&#xff1f; ⑤Upload again! ⑥ R!!C!!E!! ①游戏高手 经典前端js小游戏 检索与分数相关的变量 控制台直接修改分数拿到flag ②include 0。0 禁了base64和rot13 尝试过包含/var/log/apache/access.log,ph…

Git 入门指南

什么是 Git&#xff1f; Git 的目前最流行的分布式版本控制软件&#xff0c;可以帮助我们高效敏捷的处理任何项目。 版本管理 要理解 Git 我们首先要理解版本管理。 版本管理就是开发过程中用于管理对文件、目录或者工程等内容的修改历史&#xff0c;可以让我们方便的查看历史…

java学习part20内部类

116-面向对象(高级)-类的成员之五&#xff1a;内部类_哔哩哔哩_bilibili 1.内部类

在Anaconda中用命令行安装环境以及安装包

一、下载Anaconda 下载地址 二、创建环境 1. 打开Anaconda命令行 2.创建环境 conda create -n 环境名称 python3.10(需要的python版本号) 3.激活环境 activate 环境名4.下载安装包 pip install 模块名 -i https://pypi.tuna.tsinghua.edu.cn/simple5.下载torch 官网&…

Python语言学习笔记之三(字符编码)

本课程对于有其它语言基础的开发人员可以参考和学习&#xff0c;同时也是记录下来&#xff0c;为个人学习使用&#xff0c;文档中有此不当之处&#xff0c;请谅解。 什么是字符编码 计算机从本质上来说只认识二进制中的0和1&#xff0c;字符编码(Character Encoding) 是一种将…

大数据-之LibrA数据库系统告警处理(ALM-37006 Coordinator进程异常)

告警解释 当出现如下情况时&#xff0c;产生该告警&#xff1a; CN所在机器发生硬件故障&#xff08;断电、硬盘损坏等&#xff09;。CN实例数据目录中的postgresql.conf配置文件不存在或者其中某个配置参数不正确。CN实例线程无法监听IP&#xff0c;或者无法绑定监听端口。C…

【Makefile】和【CMake】的区别

Makefile 和 CMake 的区别 Makefile 和 CMake 都是用于构建项目的工具&#xff0c;但它们有一些区别。 Makefile: 语法&#xff1a; Makefile 使用自己的语法规则&#xff0c;包括规则、目标、依赖等。这是一种特定于 make 工具的语法。 平台依赖&#xff1a; Makefile 是平…

【JavaScript框架】Vue与React中的组件框架概念

组件框架是用于构建应用程序的工具&#xff0c;以便将UI和逻辑划分为单独的可重用组件。目前的组件框架包括React、Vue、Angular、Ember、Svelte等。 Vue和React使用了常见的框架概念&#xff0c;如处理状态、道具、引用、生命周期挂钩、事件等。这两个框架在当今的web开发中被…

案例:某电子产品电商平台借助监控易保障网络正常运行

一、背景介绍 某电子产品电商平台是一家专注于电子产品销售的电商平台&#xff0c;拥有庞大的用户群体和丰富的产品线。随着业务规模的不断扩大&#xff0c;网络设备的数量和复杂性也不断增加&#xff0c;网络故障和性能问题时有发生&#xff0c;给平台的稳定运行带来了很大的挑…

项目中高并发如何处理

在项目中处理高并发主要需要考虑以下几个方面的策略&#xff1a; 优化数据库设计&#xff1a;使用合适的数据结构、索引和查询优化技术可以显著提高数据库的响应性能&#xff1b;分库分表使用缓存&#xff1a;缓存是一种非常有效的处理高并发的方法。通过将常用的数据或结果保…

[原创][第I部分][编程基础]我的C++ 98复习并升级到C++20的复习旅途

[简介] 常用网名: 猪头三 出生日期: 1981.XX.XXQQ: 643439947 个人网站: 80x86汇编小站 https://www.x86asm.org 编程生涯: 2001年~至今[共22年] 职业生涯: 20年 开发语言: C/C、80x86ASM、PHP、Perl、Objective-C、Object Pascal、C#、Python 开发工具: Visual Studio、Delphi…

python激活python38

py38是我建立的anaconda下的一个python3.8环境。 命令行里使用conda activate py38会出现&#xff1a; conda activate error: argument COMMAND: invalid choice 因为condat已经移除了该命令。 执行 activate py38没报错&#xff0c;但是没有转换到py38环境。 使用 sour…

Spring源码解读之创建bean

本文章我们会解读一下Spring如何根据beanDefinition创建bean的&#xff1b; 代码入口&#xff1a; AnnotationConfigApplicationContext applicationContext new AnnotationConfigApplicationContext(AppConfig.class);applicationContext.refresh(); 当spring执行refresh(…

关于ai大模型是否开源的讨论

开源和闭源&#xff0c;两种截然不同的开发模式&#xff0c;对于大模型的发展有着重要影响。开源让技术共享&#xff0c;吸引了众多人才加入&#xff0c;推动了大模的创新。而闭源则保护了商业利益和技术优势&#xff0c;为大模型的商业应用提供了更好的保障。 一、开源和闭源的…

人工智能学习1

一.人工智能概述 1.AI的基础学科包括&#xff1a;数学&#xff08;离散、模糊&#xff09;、思维科学&#xff08;认知心理、逻辑思维学、形象思维学&#xff09;和计算机&#xff08;硬件、软件&#xff09;等。 2.新一代人工智能呈现出“深度学习、跨界融合、人机协同、群智…

Jmeter+influxdb+grafana监控平台在windows环境的搭建

原理&#xff1a;Jmeter采集的数据存储在infuxdb数据库中&#xff0c;grafana将数据库中的数据在界面上进行展示 一、grafana下载安装 Download Grafana | Grafana Labs 直接选择zip包下载&#xff0c;下载后解压即可&#xff0c;我之前下载过比较老的版本&#xff0c;这里就…

单机多卡训练

参考几个不错的帖子&#xff08;还没来得及整理&#xff09;&#xff1a; 基于pytorch多GPU单机多卡训练实践_多卡训练效果不如单卡-CSDN博客 关于PyTorch单机多卡训练_能用torch.device()实现多卡训练吗-CSDN博客 Pytorch多机多卡分布式训练 - 知乎 (zhihu.com) 当代研究生…

在 The Sandbox 设置总部,SCB 10X 和 T-POP 为 4EVE 元宇宙音乐会揭幕

协作学习为全球粉丝提供了无限的可能性&#xff0c;让他们通过革命性的元宇宙体验沉浸在泰国流行文化中。 作为 SCBX 集团背后的创新力量&#xff0c;SCB 10X 很高兴宣布与 T-POP Incorporation 展开开创性合作&#xff0c;T-POP Incorporation 是泰国流行文化在全球舞台上的领…

【算法】快速选择算法

目录 1.概述2.代码实现2.1.基于简单交换排序2.2.基于堆排序2.3.基于快速排序 3.应用 更多数据结构与算法的相关知识可以查看数据结构与算法这一专栏。 1.概述 &#xff08;1&#xff09;快速选择算法 (Quick Select Algorithm) 是一种用于在无序数组中寻找第 k 小&#xff08;…

鸿蒙开发已成新趋势

随着华为鸿蒙操作系统的快速崭露头角&#xff0c;鸿蒙开发已然成为当前技术领域的热门新趋势。本文将深入探讨鸿蒙开发的重要性和独特优势&#xff0c;并详细介绍一些关键的鸿蒙开发技术和工具&#xff0c;以及它们对开发者个人和整个行业带来的深远影响。 首先&#xff0c;鸿蒙…