目录
- 【JAVA】:万字长篇带你了解JAVA并发编程
- 1. 并发编程基础
- 并发与并行
- 并发(Concurrency)
- 并行(Parallelism)
- 线程与进程
- 线程的状态与生命周期
- 线程同步与锁
- 2. Java并发工具类
- 准备:多线程测试工具类
- synchronized关键字
- ReentrantLock
- 基本语法
- 可中断 `lockInterruptibly()`
- 设置超时时间 tryLock()
- 公平锁 new ReentrantLock(true)
- 条件变量 Condition
- ReadWriteLock
- CountDownLatch
- CyclicBarrier
- await方法
- Semaphore
- Exchanger
- Phaser
- 结语
个人主页: 【⭐️个人主页】
需要您的【💖 点赞+关注】支持 💯
【JAVA】:万字长篇带你了解JAVA并发编程
1. 并发编程基础
并发与并行
并行
是在不同实体上的多个事件,并发
是在同一实体上的多个事件。
不过这种解释非常晦涩难懂,对于基础和涉世未深的开发人员可能来说如同天书一样。
那我们就回溯本源,来讲一下他们的区别。
并发(Concurrency)
早期计算机的 CPU 都是单核的,一个 CPU 在同一时间只能执行一个进程/线程(任务),当系统中有多个进程/线程任务等待执行时,CPU 只能执行完一个再执行下一个。
计算机在运行过程中,有很多指令会涉及 I/O 操作,而 I/O 操作又是相当耗时的,速度远远低于 CPU,这导致 CPU 经常处于空闲状态,只能等待 I/O 操作完成后才能继续执行后面的指令。
为了提高 CPU 利用率,减少等待时间,人们提出了一种 CPU 并发工作的理论。
所以谈起并发,我们可以想象成,一条质检员(CPU)想要质检多条流水线(线程/进程)的物品(任务)。这个工人来回在多个生产线上质检货品的操作就是并发
.
将 CPU 资源合理地分配给多个任务共同使用,有效避免了 CPU 被某个任务长期霸占的问题,极大地提升了 CPU 资源利用率。
并行(Parallelism)
并发是针对单核 CPU 提出的,而并行则是针对多核 CPU 提出的。和单核 CPU 不同,多核 CPU 真正实现了“同时执行多个任务”。
多核 CPU 的每个核心都可以独立地执行一个任务,而且多个核心之间不会相互干扰。在不同核心上执行的多个任务,是真正地同时运行,这种状态就叫做并行
。
理解:
并行
的话,就更好理解了。直接增加质检员。每个质检员之间的工作独立。每个质检员都有自己处理的流水线。极大的提高了整个系统的性能。相当于直接是物理层面的改变。
并发针对单核 CPU 而言,它指的是 CPU 交替执行不同任务的能力;并行针对多核 CPU 而言,它指的是多个核心同时执行多个任务的能力。
单核 CPU 只能并发,无法并行;换句话说,并行只可能发生在多核 CPU 中。
在多核 CPU 中,并发和并行一般都会同时存在,它们都是提高 CPU 处理任务能力的重要手段。
线程与进程
【JAVA】多线程:一文快速了解多线程
线程的状态与生命周期
【JAVA】多线程:一文快速了解多线程
线程同步与锁
Java中的同步和锁是多线程编程中重要的概念,用于保证线程安全,避免竞态条件。
在多线程编程中,如果多个线程同时访问共享资源,就可能出现竞态条件,导致数据不一致或其他问题。因此,需要采取措施来保证线程安全,这就是同步和锁的作用。
同步
:是指在多线程中,为了保证线程安全,使得一组线程按照一定的顺序执行,不会出现竞态条件。在Java中,可以使用synchronized
关键字实现同步。
锁
:是指对共享资源的访问控制,同一时刻只能有一个线程持有锁,并且其他线程无法访问该资源。在Java中,可以使用synchronized
关键字、Lock
接口及其实现类等方式实现锁。
可以阅读另一篇文章
❤️🔥 一文让你看懂并发编程中的锁
2. Java并发工具类
准备:多线程测试工具类
public class SimpleThreadUtils {/*** 创建循环测试线程** @param threadCount 线程数量* @param threadName 线程名* @param loopCount 循环次数* @param consumer 执行代码 参数*/public static List<Thread> newLoopThread(int threadCount, String threadName, long loopCount, Consumer<Thread> consumer) {List<Thread> list = new ArrayList<>();for (int i = 0; i < threadCount; i++) {final int threadNo = i;Thread thread = new Thread(() -> {System.out.println("当前线程-"+ Thread.currentThread().getName()+": 开始");int j = 0;while (j++ < loopCount) {try {Thread.sleep(100);} catch (InterruptedException e) {}consumer.accept(Thread.currentThread());}System.out.println("当前线程-"+ Thread.currentThread().getName()+": 结束");}, threadName + ":" + i);thread.start();list.add(thread);}return list;}public static void wait(long gap , Runnable runnable) {while (true) {try {Thread.sleep(gap);runnable.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}
synchronized关键字
和其他的线程安全技术一样,synchronized
关键字的作用也是为了保障数据的原子性、可见性和有序性,只是相比于其他技术,synchronized
资历更老,历史更久,而且也更基础,基本上我们在学习线程相关内容的时候,就会学习这个关键字。
特点:
- 互斥性(共享锁)
- 可重入
在用法上,synchronized
关键字可以修饰变量、方法和代码块,修饰不同的对象最终产生的影响范围也有所不同,下面我们通过一些简单示例,来看下synchronized修饰不同的对象所产生的效果:
加在方法上,默认的共享锁变量是当前对象实例this
public synchronized void updateSafe(int value) {int sum = this.value + value;try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}this.value = sum;}
加载代码块中
public void updateSafeBlock(int value) {synchronized (this) {int sum = this.value + value;try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}this.value = sum;}}
ReentrantLock
相对于synchronized
它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
ReentrantLock是可重入的互斥锁,虽然具有与synchronized相同功能,但是会比synchronized更加灵活
基本语法
// 获取ReentrantLock对象ReentrantLock lock = new ReentrantLock();public void saveSafe(int money) {// 加锁 获取不到锁一直等待直到获取锁lock.lock();try {// 临界区save(money);// 需要执行的代码} finally {// 释放锁 如果不释放其他线程就获取不到锁lock.unlock();}}
可中断 lockInterruptibly()
synchronized
和reentrantlock.lock()
的锁, 是不可被打断的; 也就是说别的线程已经获得了锁, 线程就需要一直等待下去,不能中断,直到获得到锁才运行。
一个线程等待锁的时候,是可以被中断。通过中断异常报出。处理中断使其继续执行逻辑
@Testpublic void testLockInterupted() {ReentrantLock lock = new ReentrantLock();// 主线程获取锁lock.lock();AtomicReference<Thread> t1 = new AtomicReference<>();SimpleThreadUtils.newLoopThread(2, "Kx", 1000, (t) -> {if (t.getName().contains("0")) {if (t1.get() == null){t1.set(t);}try {System.out.println("获取锁 线程:" + t.getName() + ":::");// 获取锁: 阻塞 科中断lock.lockInterruptibly();// 不可中断lock.lock();System.out.println("获取锁成功 线程:" + t.getName() + ":::");}catch (InterruptedException e){System.out.println(" 被打断 线程:" + t.getName() + ":::");}} else {if (t1.get() != null) {t1.get().interrupt();System.out.println("中断 线程:" + t.getName() + ":::");}else {System.out.println("中断 线程 没有准备:" + t.getName() + ":::");}}});SimpleThreadUtils.wait(10000, () -> {System.out.println("监控:" + reentrantLockDemo.getMoney());});}
设置超时时间 tryLock()
使用 lock.tryLock() 方法会返回获取锁是否成功。如果成功则返回true,反之则返回false。
并且tryLock方法可以设置指定等待时间,参数为:tryLock(long timeout, TimeUnit unit) , 其中timeout为最长等待时间,TimeUnit为时间单位
获取锁的过程中, 如果超过等待时间, 或者被打断, 就直接从阻塞队列移除, 此时获取锁就失败了, 不会一直阻塞着 ! (可以用来实现死锁问题)
tryLock适合我们再预期判断是否可以获得锁的前提下进行业务处理。而不需要一直等待,占用线程资源。
当我们tryLock的时候,表示我们试着获取锁,如果已经被其他线程占用,那么就可以直接跳过处理,提示用户资源被处理中。
@Testpublic void testLockTryLock() {ReentrantLock lock = new ReentrantLock();// SimpleThreadUtils.SimpleThreadUtils.newLoopThread(10, "threadname", 1000, (t) -> {boolean b = lock.tryLock();try {try {if(b) {System.out.println(t.getName() + "获取到锁,处理一分钟");Thread.sleep(1000);}else {System.out.println(t.getName() + "获取到锁失败");Thread.sleep(1000);}} catch (InterruptedException e) {}} finally {if (b) {lock.unlock();}}});SimpleThreadUtils.wait(10000, () -> {System.out.println("监控:" + reentrantLockDemo.getMoney());});}}
结果:
当前线程-t:1: 开始
当前线程-t:0: 开始
当前线程-t:2: 开始
t:1获取到锁失败
t:2获取到锁,处理一分钟
t:0获取到锁失败
t:0获取到锁,处理一分钟
t:2获取到锁失败
t:1获取到锁失败
t:2获取到锁失败
t:0获取到锁失败
t:1获取到锁,处理一分钟
公平锁 new ReentrantLock(true)
ReentrantLoc
k默认是非公平锁
, 可以指定为公平锁new ReentrantLock(true)
。
在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。
一般不设置ReentrantLock为公平的, 会降低并发度.
Synchronized底层的Monitor锁就是不公平的, 和谁先进入阻塞队列是没有关系的。
条件变量 Condition
传统对象等待集合只有一个 waitSet, Lock可以通过newCondition()方法 生成多个等待集合Condition对象。 Lock和Condition 是一对多的关系
配合方法await()
和signal()
使用流程
await
前需要 获得锁
- await 执行后,会释放锁,进入 conditionObject (条件变量)中等待
- await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
- signal 方法用来唤醒条件变量(等待室)汇总的某一个等待的线程
- signalAll方法, 唤醒条件变量(休息室)中的所有线程
@Testpublic void testCondition() {ReentrantLock lock = new ReentrantLock();// 等待 A(条件变量)Condition ac = lock.newCondition();// 等外 B(条件变量)Condition bc = lock.newCondition();SimpleThreadUtils.newLoopThread(2, "t", 1000, (t) -> {try {print("Lock..................");lock.lock();if (t.getNo() == 0){print("等待...");ac.await();print("等待结束..");}else if(t.getNo() == 1){print("通知取消等待...");ac.signal();print("通知取消等待执行...");}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}});SimpleThreadUtils.wait(10000, () -> {});}
ReadWriteLock
ReadWriteLock也是一个接口,提供了readLock和writeLock两种锁的操作机制,一个资源可以被多个线程同时读,或者被一个线程写,但是不能同时存在读和写线程。
读锁:共享锁 readLock
写锁: 独占锁 writeLock
读写锁一定要注意几点。
- 一定要记得读锁和写锁之间的竞争关系。只有读锁与读锁之间是不存在竞争,其他都会产生锁竞争,锁阻塞。
- 对于读锁而言,由于同一时刻可以允许多个线程访问共享资源,进行读操作,因此称它为共享锁;而对于写锁而言,同一时刻只允许一个线程访问共享资源,进行写操作,因此称它为排他锁。
- 记住锁的通用范式,一定要释放锁
public class ReadWriteLockDemo {private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();private int value;public int getValue() {System.out.println("读:" + value);return value;}public void update(int value) {int sum = this.value + value;try {Thread.sleep(10);} catch (InterruptedException e) {}this.value = sum;}public int read() {Lock lock = readWriteLock.readLock();try {lock.lock();Thread.sleep(10);System.out.println("读锁:after");return getValue();} catch (InterruptedException e) {throw new IllegalArgumentException();} finally {lock.unlock();}}public void updateSafe(int value) {Lock lock = readWriteLock.writeLock();try {System.out.println("writeLock:before");lock.lock();Thread.sleep(1000);System.out.println("writeLock:after");update(value);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}}}
CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。这就是一个计数器一样。
CountDownLatch定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器。
例子: 游戏加载资源
只有5个资源线程同时加载完成,游戏才能开始
public class CountDownLatchTest {@Testpublic void test() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(5);SimpleThreadUtils.newLoopThread(5, "cd", 1, (t) -> {// 资源加载逻辑int i = t.getNo() * 1000;try {// 模拟加载时间Thread.sleep(i);} catch (InterruptedException e) {}System.out.println(t.getName()+": 花费了 " + i + "s");// 计数器-1countDownLatch.countDown();});// 阻塞等待 计数器为0. 所有资源加载完成countDownLatch.await();System.out.println("开始游戏");}
}
CyclicBarrier
CyclicBarrier可以理解为一个循环栅栏,其实和CountDownLatch有一定的相同点。就是都是需要进行计数,CyclicBarrier是等待所有人都准备就绪了,才会进行下一步。不同点在于,CyclicBarrier结束了一次计数之后会自动开始下一次计数。而CountDownLatch只能完成一次。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
CyclicBarrier内部使用了ReentrantLock和Condition两个类。它有两个构造函数
public CyclicBarrier(int parties) {this(parties, null);
}public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier的另一个构造函数CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
await方法
调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法
@Testpublic void test() throws InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(5);SimpleThreadUtils.newLoopThread(5, "cd", 1, (t) -> {while (true) {int i = t.getNo() * 1000;try {Thread.sleep(i);} catch (InterruptedException e) {}System.out.println(":" + t.getName() + ": 准备好了");try {cyclicBarrier.await();System.out.println(" 冲。。。 :" + t.getName());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);}}});SimpleThreadUtils.wait(10000,()->{});}
Semaphore
Semaphore主要是用来控制资源数量的工具,可以理解为信号量。
初始化时给定信号量的个数,其他线程可以来尝试获得许可,当完成任务之后需要释放响应的许可。
这样同时并行/并发处理的数量最大为我们指定的数量。约束住同时访问资源的数量限制。
@Testpublic void test() throws InterruptedException {Semaphore semaphore = new Semaphore(5);SimpleThreadUtils.newLoopThread(10, "cd", 10, (t) -> {try {semaphore.acquire();int i = t.getNo() * 1000;try {Thread.sleep(i);} catch (InterruptedException e) {}System.out.println("加载资源线程:"+t.getName()+": 花费了 " + i + "s");} catch (InterruptedException e) {throw new RuntimeException(e);}finally {semaphore.release();}});SimpleThreadUtils.wait(10000,()->{});}
Exchanger
Exchanger是一个用于线程间协作的工具类。它提供了一个交换的同步点,在这个交换点两个线程能够交换数据。具体交换数据的方式是通过exchange函数实现的,如果一个线程先执行exchange函数,那么它会等待另一个线程也执行exchange方法。当两个线程都到达了同步交换点,两个线程就可以交换数据。
两个人同时到达指定的地点,然后留下信息,exchanger把信息交换传输。
@Testpublic void test() {Exchanger<String> exchanger = new Exchanger<>();SimpleThreadUtils.newLoopThread(2,"ex",2,(t)->{if (t.getNo() == 0){try {String exchange = exchanger.exchange("我先到了,你死定了");System.out.println(t.getName()+":"+exchange);} catch (InterruptedException e) {}}else {try {Thread.sleep(5000);String exchange = exchanger.exchange("先到了吧,去死吧");System.out.println(t.getName()+":"+exchange);} catch (InterruptedException e) {throw new RuntimeException(e);}}});SimpleThreadUtils.wait(1000,()->{});}
}
Phaser
Phaser
是Java7
新引入的并发API
,我们可以将Phaser的概念看成是一个个的阶段, 每个阶段都需要执行的线程任务,任务执行完毕后就进入下一阶段。这里和CyclicBarrier 和CountDownLatch的概念类似, 实际上也确实可以用Phaser代替CyclicBarrier和CountDownLatch。
Phaser也是通过计数器来控制, 在Phaser中叫parties, 我们在指定了parties之后, Phaser可以根据需要动态增加或减少parties的值
register()//添加一个新的注册者
bulkRegister(int parties)//添加指定数量的多个注册者
arrive()// 到达栅栏点直接执行,无须等待其他的线程
arriveAndAwaitAdvance()//到达栅栏点,必须等待其他所有注册者到达
arriveAndDeregister()//到达栅栏点,注销自己无须等待其他的注册者到达
onAdvance(int phase, int registeredParties)//多个线程达到注册点之后,会调用该方法。
结语
并发针对单核 CPU 而言,它指的是 CPU 交替执行不同任务的能力;并行针对多核 CPU 而言,它指的是多个核心同时执行多个任务的能力。
单核 CPU 只能并发,无法并行;换句话说,并行只可能发生在多核 CPU 中。
在多核 CPU 中,并发和并行一般都会同时存在,它们都是提高 CPU 处理任务能力的重要手段。