Java并发工具类

JDK并发包中常用并发工具类:

CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段;

Exchanger工具类则提供了在线程间交换数据的一种手段。

等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

需求:解析一个Excel里多个sheet的数据,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。

实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join()方法

public class JoinCountDownLatchTest {public static void main(String[] args) throws InterruptedException {Thread parser1 = new Thread(() -> {});Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));parser1.start();parser2.start();parser1.join();parser2.join();System.out.println("all parser finish");}
}

join用于让当前执行线程等待join线程执行结束实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中,wait(0)表示永远等待下去。代码片段如下:

            while (isAlive()) {wait(0);}

直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里实现的,在JDK里看不到。

CountDownLatch也可以实现join的功能,并且比join的功能更多

public class CountDownLatchTest {static CountDownLatch c = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {new Thread(() -> {System.out.println(1);c.countDown();System.out.println(2);c.countDown();}).start();c.await();System.out.println("3");}
}

CountDownLatch的构造函数接收一个int类型的参数作为计数器,等待N个点完成,这里就传入N。

调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零

countDown方法可以用在任何地方,N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。

如果有某个解析sheet的线程处理得比较慢,不可能让主线程一直等待,可以使用另外一个带指定时间的await方法——await(long time,TimeUnit unit),这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。

同步屏障CyclicBarrier

让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

public class CyclicBarrierTest {static CyclicBarrier c = new CyclicBarrier(2);public static void main(String[] args) {new Thread(() -> {try {c.await();} catch (Exception e) {}System.out.println(1);}).start();try {c.await();} catch (Exception e) {}System.out.println(2);}
}

主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行。

输出结果可能有两种:

一种是:

image-20230824230526154

另一种:

image-20230824230510366

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction)用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

public class CyclicBarrierTest2 {static CyclicBarrier c = new CyclicBarrier(2, new A());public static void main(String[] args) {new Thread(() -> {try {c.await();} catch (Exception e) {}System.out.println(1);}).start();try {c.await();} catch (Exception e) {}System.out.println(2);}static class A implements Runnable {@Overridepublic void run() {System.out.println(3);}}
}

初始值设为2,等代码中的第一个线程和线程A都执行完之后,才会继续执行主线程,然后输出2。结果一定是:

image-20230824231118034

CyclicBarrier的应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

public class BankWaterService implements Runnable {/*** 创建4个屏障,处理完之后执行当前类的run方法*/private CyclicBarrier c = new CyclicBarrier(4, this);/*** 假设只有4个sheet,只启动4个线程*/private final Executor executor = Executors.newFixedThreadPool(4);/*** 保存每个sheet计算出的银流结果*/private final ConcurrentHashMap<String, Integer> sheetBankWaterCount = newConcurrentHashMap<>();private void count() {for (int i = 0; i < 4; i++) {executor.execute(() -> {// 计算当前sheet的银流数据,计算代码省略sheetBankWaterCount.put(Thread.currentThread().getName(), 1);// 银流计算完成,插入一个屏障try {c.await();} catch (InterruptedException |BrokenBarrierException e) {e.printStackTrace();}});}}@Overridepublic void run() {int result = 0;// 汇总每个sheet计算出的结果for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {result += sheet.getValue();}// 将结果输出sheetBankWaterCount.put("result", result);System.out.println(result);}public static void main(String[] args) {BankWaterService bankWaterCount = new BankWaterService();bankWaterCount.count();}
}

计算银行流水,一个sheet开启一个线程,所有线程执行完毕,将所有计算结果相加得银行总流水。

最后输出结果为4。

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量isBroken()方法用来了解阻塞的线程是否被中断

控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

应用场景

Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。

需求:要读取几万个文件的数据,因为都是IO密集型任务,可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。此时就使用Semaphore来做流量控制。

public class SemaphoreTest {private static final int THREAD_COUNT = 30;private static final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);private static final Semaphore s = new Semaphore(10);public static void main(String[] args) {for (int i = 0; i < THREAD_COUNT; i++) {threadPool.execute(() -> {try {s.acquire();System.out.println("save data");s.release();} catch (InterruptedException e) {}});}threadPool.shutdown();}
}

有30个线程在执行,但是只允许10个并发执行。

构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。

一些其他方法:

image-20230824233007777

线程间交换数据的Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。

用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。

两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

Exchanger可以用于遗传算法。遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。

Exchanger也可以用于校对工作

需求:将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。

两个线程间数据传递。

public class ExchangerTest {private static final Exchanger<String> exgr = new Exchanger<>();private static final ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {threadPool.execute(() -> {try {// A录入银行流水数据String A = "银行流水A";String B = exgr.exchange(A);System.out.println("B----- " + B);} catch (InterruptedException e) {}});threadPool.execute(() -> {try {// B录入银行流水数据String B = "银行流水B";String A = exgr.exchange("B");System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"+ A + ",B录入是:" + B);} catch (InterruptedException e) {}});threadPool.shutdown();}
}

输出结果:

image-20230824233759768

如果两个线程有一个没有执行exchange()方法,则会一直等待,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/51467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Electron学习3 使用serialport操作串口

Electron学习3 使用serialport操作串口 一、准备工作二、 SerialPort 介绍1. 核心软件包(1) serialport(2) serialport/stream(3) serialport/bindings-cpp(4) serialport/binding-mock(5) serialport/bindings-interface 2. 解析器包3. 命令行工具 三、创建一个demo程序1. 创建…

Dapper

介绍 dapper是一款轻量级的ORM Dapper 被称为 ORM 之王。 以下是 Dapper 的主要功能&#xff1a; 速度快&#xff0c;性能快。 更少的代码行。 对象映射器。 静态对象绑定。 动态对象绑定。 轻松处理 SQL 查询。 易于处理存储过程。 直接对 IDBConnection 类进行操作&#xf…

重注微电子产业,“三大齿轮”能否带起香港经济的“第三轮”

文 | 智能相对论 作者 | 佘凯文 众所周知&#xff0c;微电子产业早已成为现代科技领域的关键钥匙&#xff0c;谁能掌握微电子产业&#xff0c;谁就能拥有全球科技领域的话语权。 从上世纪开始&#xff0c;微电子产业曾经历过几次重大转移&#xff0c;如70年代从美国转向日本…

零拷贝技术详解

当涉及到网络编程和IO操作时&#xff0c;数据拷贝是一个常见的性能瓶颈。传统的数据拷贝过程中&#xff0c;数据需要从内核缓冲区复制到用户空间缓冲区&#xff0c;然后再从用户空间缓冲区复制到内核缓冲区&#xff0c;这个过程会耗费大量的CPU时间和内存带宽&#xff0c;降低系…

tensorRT安装

官方指导文档&#xff1a;Installation Guide :: NVIDIA Deep Learning TensorRT Documentation 适配很重要&#xff01;&#xff01;&#xff01;&#xff01; 需要cuda, cuDNN, tensorRT三者匹配。我的cuda11.3 所以对应的cuDNN和tensorRT下载的是如下版本&#xff1a; cud…

【业务功能篇77】微服务-OSS对象存储-上传下载图片

3. 图片管理 文件存储的几种方式 单体架构可以直接把图片存储在服务器中 但是在分布式环境下面直接存储在WEB服务器中的方式就不可取了&#xff0c;这时我们需要搭建独立的文件存储服务器。 3.1 开通阿里云服务 针对本系统中的相关的文件&#xff0c;图片&#xff0c;文本等…

07-Numpy基础-伪随机数生成

numpy.random模块对Python内置的random进行了补充&#xff0c;增加了一些用于高效生成多种概率分布的样本值的函数。 例如&#xff0c;你可以用normal来得到一个标准正态分布的44样本数组&#xff1a; 而Python内置的random模块则只能一次生成一个样本值。从下面的测试结果中可…

Mybatis查询一条数据

上一篇我们介绍了在pom文件中引入mybatis依赖&#xff0c;配置了mybatis配置文件&#xff0c;通过读取配置文件创建了会话工厂&#xff0c;使用会话工厂创建会话获取连接对象读取到了数据库的基本信息。 如果您需要对上面的内容进行了解&#xff0c;可以参考Mybatis引入与使用…

Python爬虫(十五)_案例:使用bs4的爬虫

本章将从Python案例讲起&#xff1a;所使用bs4做一个简单的爬虫案例&#xff0c;更多内容请参考:Python学习指南 案例&#xff1a;使用BeautifulSoup的爬虫 我们已腾讯社招页面来做演示&#xff1a;http://hr.tencent.com/position.php?&start10#a 使用BeautifulSoup4解析…

[MyBatis系列④]核心配置文件

目录 1、简介 2、DTD 3、typeHandlers 3.1、默认类型处理器 3.2、自定义类型处理器 4、plugins ⭐MyBatis系列①&#xff1a;增删改查 ⭐MyBatis系列②&#xff1a;两种Dao开发方式 ⭐MyBatis系列③&#xff1a;动态SQL 1、简介 MyBatis的核心配置文件&#xff08;通常命…

基于IDEA使用maven创建hibernate项目

1、创建maven项目 2、导入hibernate需要的jar包 <!--hibernate核心依赖--><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-core</artifactId><version>5.4.1.Final</version></dependency><!--…

基于亚马逊云科技服务,构建大语言模型问答知识库

随着大语言模型效果明显提升&#xff0c;其相关的应用不断涌现呈现出越来越火爆的趋势。其中一种比较被广泛关注的技术路线是大语言模型&#xff08;LLM&#xff09;知识召回&#xff08;Knowledge Retrieval&#xff09;的方式&#xff0c;在私域知识问答方面可以很好的弥补通…

ARM64函数调用流程分析

ARM64函数调用流程分析 1 ARM64 函数调用实例2 对应代码的分析2.1 main函数及其对应的汇编程序2.1.1 main的C代码实现2.1.2 main函数对应汇编及其分析2.1.3 执行完成之后栈的存放情况 2.2 test_fun_a函数及其对应的汇编程序2.2.1 test_fun_a函数的C实现2.2.2 test_fun_a函数对应…

Oracle的学习心得和知识总结(二十八)|Oracle数据库数据库回放功能之论文二翻译及学习

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《Oracle Database SQL Language Reference》 2、参考书籍&#xff1a;《PostgreSQL中文手册》 3、EDB Postgres Advanced Server User Gui…

MAC电脑外放没有声音解决方案

烦人呐&#xff0c;我的mac外接显示屏幕&#xff0c;显示器没有音频输出&#xff0c;需要mac笔记本的音频输出&#xff0c;但是经常打开后&#xff0c;mac没有声音输出&#xff0c;需要重启电脑才能生效。亲测一下方法有效&#xff0c;请参考&#xff1a; 文章目录 一、短期方案…

基于mha+mycat2+gtid的半同步主从复制双vip高可用MySQL集群

目录 项目名称 项目架构图 项目概述 项目准备 项目步骤 一、使用ansible编写palybook实现4台二进制安装MySQL环境的部署&#xff0c;并把master上的基础数据下发到所有slave服务器上 1. 建立免密通道 2.安装ansible在ansible服务器上&#xff0c;并写好主机清单 3.将…

汽车电子笔记之:AUTOSA架构下的OS概述

目录 1、实时操作系统&#xff08;RTOS&#xff09; 2、OSEK操作系统 2.1、OSEK概述 2.2、OSEK处理等级 2.3、OSEK任务符合类 2.4、OSEK优先级天花板模式 3、AUTOSAR OS 3.1、 AUTOSAR OS对OSEK OS的继承和扩展 3.2、AUTOSAR OS的调度表 3.3、AUTOSAR OS的时间保护 3…

冷冻冷藏自动化立体库|HEGERLS四向穿梭车助力打造冷链智能仓储新力量

随着中国仓储物流整体规模和低温产品消费需求的稳步增长&#xff0c;冷链市场应用潜力不断释放。而在实际运行中&#xff0c;由于冷库容量不足、基础设施落后、管理机制欠缺等原因&#xff0c;经常出现“断链”现象&#xff0c;严重威胁到产品质量和消费者安全。 河北沃克金属…

尚硅谷大数据项目《在线教育之离线数仓》笔记004

视频地址&#xff1a;尚硅谷大数据项目《在线教育之离线数仓》_哔哩哔哩_bilibili 目录 第9章 数仓开发之DWD层 P049 P050 P051 P052 P053 P054 P055 P056 P057 P058 P059 P060 P061 P062 P063 P064 P065 P066 P067 P068 P069 P070 第9章 数仓开发之DWD…

Wlan——锐捷零漫游网络解决方案以及相关配置

目录 零漫游介绍 一代零漫游 二代单频率零漫游 二代双频率零漫游 锐捷零漫游方案总结 锐捷零漫游方案的配置 配置无线信号的信道 开启关闭5G零漫游 查看配置 零漫游介绍 普通的漫游和零漫游的区别 普通漫游 漫游是由一个AP到另一个AP或者一个射频卡到另一个射频卡的漫…