目录
一、基础概念
进程和线程
进程
线程
Java 线程的无处不在
进程间的通信
进程间通信有几种方式?
CPU 核心数和线程数的关系
上下文切换(Context switch)
并行和并发
二、认识 Java 里的线程
Java 程序天生就是多线程的
线程的启动与中止
启动
Thread 和 Runnable 的区别
Callable、Future 和 FutureTask
新启线程有几种方式?
中止
线程自然终止
stop()、suspend()、resume()
中断
深入理解run()和start()
多次调用一个线程的start方法会怎么样?
三、深入学习 Java 的线程
线程的状态/生命周期
其他的线程相关方法
线程的优先级
线程的调度
线程和协程
内核线程实现
用户线程实现
混合实现
Java线程的实现--为什么 Java 线程调度是抢占式调度?
协程
出现的原因
协程简介
线程和协程使用场景?
纤程-Java 中的协程
Quasar
JDK19 的虚拟线程
守护线程
四、线程间的通信和协调、协作
管道输入输出流
join 方法
join()
synchronized 内置锁
对象锁和类锁
错误的加锁和原因分析
volatile,最轻量的通信/同步机制
等待/通知机制
notify():
notifyAll():
notify()、notifyAll():
wait()
等待和通知的标准范式
notify 和notifyAll 应该用谁
等待超时模式实现一个连接池
方法和锁
调用 yield() 、sleep()、wait()、notify()等方法对锁有何影响?
为什么 wait 和 notify 方法要在同步块中调用?
为什么应该在循环中检查等待条件?while(count<=0) wait()
五、CompleteableFuture
一、基础概念
进程和线程
进程
我们常听说的是应用程序,也就是 app,由指令和数据组成。但是当我们不运行一个具体的 app 时,这些应用程序就是放在磁盘(也包括 U 盘、远程网络 存储等等)上的一些二进制的代码。一旦我们运行这些应用程序,指令要运行, 数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中 还需要用到磁盘、网络等设备,从这种角度来说,进程就是用来加载指令、管理内存、管理 IO 的。
当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。 进程就可以视为程序的一个实例。
大部分程序可以同时运行多个实例进程 (例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如 网易云音乐、360 安全卫士等)。显然,程序是死的、静态的,进程是活的、动态的。进程可以分为系统进程和用户进程。凡是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身,用户进程就是所有由用户启动的进程 。
站在操作系统的角度,进程是程序运行资源分配(以内存为主)的最小单位。
线程
一个机器中肯定会运行很多的程序,CPU 又是有限的,怎么让有限的 CPU 运行这么多程序呢?就需要一种机制在程序之间进行协调,也就所谓 CPU 调度。线程则是 CPU 调度的最小单位。
线程必须依赖于进程而存在,线程是进程中的一个实体,是 CPU 调度和分派的基本单位,它是比进程更小的、能独立运行的基本单位。线程自己基本上不拥有系统资源,,只拥有在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。一个进程可以拥有多个线程,一个线程必须有一个父进程。线程,有时也被称为轻量级进程 (Lightweight Process,LWP),早期 Linux 的线程实现几乎就是复用的进程,后来才独立出自己的 API。
Java 线程的无处不在
Java 中不管任何程序都必须启动一个 main 函数的主线程;Java Web 开发里面的定时任务、定时器、JSP 和 Servlet、异步消息处理机制,远程访问接口 RM 等, 任何一个监听事件,onclick 的触发事件等都离不开线程和并发的知识。
进程间的通信
同一台计算机的进程通信称为 IPC(Inter-process communication),不同计算机之间的进程通信被称为 R(mote)PC,需要通过网络,并遵守共同的协议,比如大家熟悉的 Dubbo 就是一个 RPC 框架,而 Http 协议也经常用在 RPC 上,比如SpringCloud 微服务。
进程间通信有几种方式?
1.管道
分为匿名管道(pipe)及命名管道(named pipe):匿名管道可用于具有亲缘关系的父子进程间的通信,命名管道除了具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。
2.信号(signal)
信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式,用于通知进程有某事件发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一致的。
3.消息队列(message queue)
消息队列是消息的链接表,它克服了上两种通信方式中信号量有限的缺点,具有写权限得进程可以按照一定得规则向消息队列中添加新信息;对消息队列有读权限得进程则可以从消息队列中读取信息。
4.共享内存(shared memory)
可以说这是最有用的进程间通信方式。它使得多个进程可以访问同一块内存空间,不同进程可以及时看到对方进程中对共享内存中数据得更新。这种方式需要依靠某种同步操作,如互斥锁和信号量等。
5.信号量(semaphore)
主要作为进程之间及同一种进程的不同线程之间得同步和互斥手段。
6.套接字(socket)
这是一种更为一般得进程间通信机制,它可用于网络中不同机器之间的进程间通信,应用非常广泛。同一机器中的进程还可以使用Unix domain socket(比如同一机器中 MySQL 中的控制台 mysql shell 和 MySQL 服务程序的连接),这种方式不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,比纯粹基于网络的进程间通信肯定效率更高。
CPU 核心数和线程数的关系
目前主流 CPU 都是多核的,线程是 CPU 调度的最小单位。同一时刻,一个 CPU 核心只能运行一个线程,也就是 CPU 内核和同时运行的线程数是 1:1 的关系,也就是说 8 核 CPU 同时可以执行 8 个线程的代码。但 Intel 引入超线程技术后,产生了逻辑处理器的概念,使核心数与线程数形成 1:2 的关系。
如下 Windows 任务管理器贴图就能看出来,内核数是 16 而逻辑处理器数是 24。(13代cpu采用大小核架构,8大核+8小核,只有大核开启超线程,所以逻辑处理器数为8*2+8=24)
在 Java 中提供了 Runtime.getRuntime().availableProcessors(),可以让我们获取当前的 CPU 核心数,注意这个核心数指的是逻辑处理器数。获得当前的 CPU 核心数在并发编程中很重要,并发编程下的性能优化往往和 CPU 核心数密切相关。
上下文切换(Context switch)
既然操作系统要在多个进程(线程)之间进行调度,而每个线程在使用 CPU 时总是要使用 CPU 中的资源,比如 CPU 寄存器和程序计数器。这就意味着,操作系统要保证线程在调度前后的正常执行,所以,操作系统中就有上下文切换的概念,它是指 CPU(中央处理单元)从一个进程或线程到另一个进程或线程的切换。
上下文是CPU寄存器和程序计数器在任何时间点的内容。
寄存器是CPU内部的一小部分非常快的内存(相对于CPU内部的缓存和CPU 外部较慢的RAM主内存),它通过提供对常用值的快速访问来加快计算机程序的执行。
程序计数器是一种专门的寄存器,它指示CPU在其指令序列中的位置,并保存着正在执行的指令的地址或下一条要执行的指令的地址,这取决于具体的系统。
上下文切换可以更详细地描述为内核(即操作系统的核心)对 CPU 上的进程(包括线程)执行以下活动:
- 暂停一个进程的处理,并将该进程的 CPU 状态(即上下文)存储在内存中的某个地方
- 从内存中获取下一个进程的上下文,并在 CPU 的寄存器中恢复它
- 返回到程序计数器指示的位置(即返回到进程被中断的代码行)以恢复进程。
从数据来说:
- 以程序员的角度来看, 是方法调用过程中的各种局部的变量与资源;
- 以线程的角度来看, 是方法的调用栈中存储的各类信息。
引发上下文切换的原因一般包括:
线程、进程切换、系统调用等等。上下文切换通常是计算密集型的,因为涉及一系列数据在各种寄存器、 缓存中的来回拷贝。就 CPU 时间而言,一次上下文切换大概需要 5000~20000 个时钟周期,相对一个简单指令几个乃至十几个左右的执行时钟周期,可以看出这个成本的巨大。
并行和并发
举个例子,如果有条高速公路 A 上面并排有 8 条车道,那么最大的并行车辆就是 8 辆此条高速公路 A 同时并排行走的车辆小于等于 8 辆的时候,车辆就可以并行运行。CPU 也是这个原理,一个 CPU 相当于一个高速公路 A,核心数或者线程数就相当于并排可以通行的车道;而多个 CPU 就相当于并排有多条高速公路,而每个高速公路并排有多个车道。
当谈论并发的时候一定要加个单位时间,也就是说单位时间内并发量是多少?离开了单位时间其实是没有意义的。
综合来说:
并发 Concurrent:指应用能够交替执行不同的任务,比如单 CPU 核心下执行多线程并非是同时执行多个任务,如果你开两个线程执行,就是在你几乎不可能察觉到的速度不断去切换这两个任务,已达到"同时执行效果",其实并不是的,只是计算机的速度太快,我们无法察觉到而已。
并行 Parallel:指应用能够同时执行不同的任务,例:吃饭的时候可以边吃饭边打电话,这两件事情可以同时执行
两者区别:一个是交替执行,一个是同时执行,如下图所示:
二、认识 Java 里的线程
Java 程序天生就是多线程的
一个 Java 程序从 main()方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程参与,但实际上 Java 程序天生就是多线程程序,因为执行 main() 方法的是一个名称为 main 的线程。
而一个 Java 程序的运行就算是没有用户自己开启的线程,实际也有有很多,JVM 自行启动的线程,一般来说有:
//类说明:只有一个main方法的程序
public class OnlyMain {public static void main(String[] args) {//Java 虚拟机线程系统的管理接口ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();// 不需要获取同步的monitor和synchronizer信息,仅仅获取线程和线程堆栈信息ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);// 遍历线程信息,仅打印线程ID和线程名称信息for (ThreadInfo threadInfo : threadInfos) {System.out.println("[" + threadInfo.getThreadId() + "] "+ threadInfo.getThreadName());}}
}//运行结果:
[6] Monitor Ctrl-Break
[5] Attach Listener
[4] Signal Dispatcher
[3] Finalizer
[2] Reference Handler
[1] main
[6] Monitor Ctrl-Break //监控 Ctrl-Break 中断信号的
[5] Attach Listener //内存 dump,线程 dump,类信息统计,获取系统属性等
[4] Signal Dispatcher // 分发处理发送给 JVM 信号的线程
[3] Finalizer // 调用对象 finalize 方法的线程
[2] Reference Handler//清除 Reference 的线程
[1] main //main 线程,用户程序入口
尽管这些线程根据不同的 JDK 版本会有差异,但是依然证明了 Java 程序天生就是多线程的。
线程的启动与中止
刚刚看到的线程都是 JVM 启动的系统线程,我们学习并发编程希望的自己能操控线程,所以我们先来看看如何启动线程。
启动
启动线程的方式有:
1、X extends Thread,然后 X.start
2、X implements Runnable,然后交给 Thread 运行
//类说明:新启线程的方式
public class NewThread {/*扩展自Thread类*/private static class UseThread extends Thread{@Overridepublic void run() {super.run();SleepTools.second(1);// do my work;System.out.println("I am extendec Thread");}}///实现Runnable接口private static class UseRunnable implements Runnable{@Overridepublic void run() {// do my work;System.out.println("I am implements Runnable");}}public static void main(String[] args) throws InterruptedException, ExecutionException {UseThread useThread = new UseThread();useThread.start();//useThread.start();UseRunnable useRunnable = new UseRunnable();new Thread(useRunnable).start();System.out.println("main end");}
}//运行结果
main end
I am implements Runnable
I am extendec Thread
Thread 和 Runnable 的区别
Thread 才是 Java 里对线程的唯一抽象,Runnable 只是对任务(业务逻辑)的抽象。Thread 可以接受任意一个 Runnable 的实例并执行。
Callable、Future 和 FutureTask
Runnable 是一个接口,在它里面只声明了一个 run()方法,由于 run()方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。
Callable 位于 java.util.concurrent 包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做 call(),这是一个泛型接口,call()函数返回的类型就是传递进来的 V 类型。
Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。
因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。
FutureTask 类实现了 RunnableFuture 接口,RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 实现了 RunnableFuture 接口。所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。
因此我们可以通过一个线程运行 Callable,但是 Thread 不支持构造方法中传递Callable 的实例,所以我们需要通过 FutureTask 把一个 Callable 包装成 Runnable,然后再通过这个 FutureTask 拿到 Callable 运行后的返回值。
要 new 一个 FutureTask 的实例,有两种方法
//类说明:演示Future等的使用(Callable、Future 和 FutureTask)
public class UseFuture {/*实现Callable接口,允许有返回值*/private static class UseCallable implements Callable<Integer>{private int sum;@Overridepublic Integer call() throws Exception {System.out.println("Callable子线程开始计算!");
// Thread.sleep(1000);for(int i=0 ;i<5000;i++){if(Thread.currentThread().isInterrupted()) {System.out.println("Callable子线程计算任务中断!");return null;}sum=sum+i;System.out.println("sum="+sum);} System.out.println("Callable子线程计算结束!结果为: "+sum); return sum; }}public static void main(String[] args) throws InterruptedException, ExecutionException {UseCallable useCallable = new UseCallable();//包装FutureTask<Integer> futureTask = new FutureTask<>(useCallable);Random r = new Random();new Thread(futureTask).start();Thread.sleep(1);if(r.nextInt(100)>50){System.out.println("Get UseCallable result = "+futureTask.get());}else{System.out.println("Cancel................. ");futureTask.cancel(true);}}}//运行结果
Callable子线程开始计算!
sum=0
sum=1
sum=3
sum=6
sum=10
sum=15
//.......................省略sum=15到sum=51040之间的输出结果
sum=51040
sum=51360
sum=51681
sum=52003
Cancel.................
sum=52326
Callable子线程计算任务中断!
新启线程有几种方式?
这个问题的答案其实众说纷纭,有 2 种,3 种,4 种等等答案,建议比较好的回答是:
按照 Java 源码中 Thread 上的注释:
官方说法是在 Java 中有两种方式创建一个线程用以执行,一种是派生自Thread 类,另一种是实现 Runnable 接口。
当然本质上 Java 中实现线程只有一种方式,都是通过 new Thread()创建线程对象,调用 Thread#start 启动线程。
至于基于 callable 接口的方式,因为最终是要把实现了 callable 接口的对象通过 FutureTask 包装成 Runnable,再交给 Thread 去执行,本质上是和实现 Runnable 接口是同一类。而线程池的方式,本质上是池化技术,是资源的复用(和数据库的连接池一样),和新启线程没什么关系。
中止
线程自然终止
要么是 run 执行完成了,要么是抛出了一个未处理的异常导致线程提前结束。
stop()、suspend()、resume()
暂停、恢复和停止操作对应在线程 Thread 的 API 就是 suspend()、resume() 和 stop()。但是这些 API 是过期的,也就是不建议使用的。
不建议使用的原因主要有:
- suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。
- 同样,stop()方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。
- resume()方法会导致死锁风险、状态不一致、安全性问题。
正因为 suspend()、resume()和 stop()方法带来的副作用,这些方法才被标注为不建议使用的过期方法。
中断
安全的中止则是其他线程通过调用某个线程 A 的 interrupt()方法对其进行中断操作, 中断好比其他线程对该线程打了个招呼,“A,你要中断了”,不代表线程 A 会立即停止自己的工作,同样的 A 线程完全可以不理会这种中断请求。线程通过检查自身的中断标志位是否被置为 true 来进行响应,线程通过方法 isInterrupted()来进行判断是否被中断,也可以调用静态方法 Thread.interrupted()来进行判断当前线程是否被中断,不过 Thread.interrupted() 会同时将中断标识位改写为 false。
如果一个线程处于了阻塞状态(如线程调用了 thread.sleep、thread.join、 thread.wait 等),则在线程在检查中断标示时如果发现中断标示为 true,则会在这些阻塞方法调用处抛出 InterruptedException 异常(目标线程将会被唤醒并抛出 InterruptedException 异常,以响应中断信号),并且在抛出异常后会立即将线程的中断标示位清除,即重新设置为 false。
不建议自定义一个取消标志位来中止线程的运行。因为 run 方法里有阻塞调用时会无法很快检测到取消标志,线程必须从阻塞调用返回后,才会检查这个取消标志。这种情况下,使用中断会更好,因为:
一般的阻塞方法,如 sleep 等本身就支持中断的检查
检查中断位的状态和检查取消标志位没什么区别,用中断位的状态还可以避免声明取消标志位,减少资源的消耗。
注意:处于死锁状态的线程无法被中断(不会理会外面的任何请求)
//类说明:如何安全中断线程
public class EndThread {private static class UseThread extends Thread{public UseThread(String name) {super(name);}@Overridepublic void run() {String threadName = Thread.currentThread().getName();System.out.println(threadName+" interrrupt flag ="+isInterrupted());
// while(!isInterrupted()){while(!Thread.interrupted()){System.out.println(threadName+" is running");System.out.println(threadName+"inner interrrupt flag =" +isInterrupted());}System.out.println(threadName+" interrrupt flag ="+isInterrupted());}}public static void main(String[] args) throws InterruptedException {UseThread endThread = new UseThread("endThread");endThread.start();Thread.sleep(20);endThread.interrupt();//中断线程,只是设置线程的中断标识位=true,不会真正中断线程}
}//运行结果
endThread interrrupt flag =false
endThread is running
endThreadinner interrrupt flag =false
endThread is running
endThreadinner interrrupt flag =false
//省略n多个endThread is running endThreadinner interrrupt flag =false
endThread is running
endThreadinner interrrupt flag =true
endThread interrrupt flag =false
//类说明:阻塞方法中抛出InterruptedException异常后,如果需要继续中断,需要手动再中断一次
public class HasInterrputException {private static class UseThread extends Thread{public UseThread(String name) {super(name);}@Overridepublic void run() {while(!isInterrupted()) {try {Thread.sleep(100);} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName()+" in InterruptedException interrupt flag is "+isInterrupted());interrupt();//把false改为truee.printStackTrace();}System.out.println(Thread.currentThread().getName() + " I am extends Thread.");}//while 循环结束后才会打印本句System.out.println(Thread.currentThread().getName()+" interrupt flag is "+isInterrupted());}}public static void main(String[] args) throws InterruptedException {Thread endThread = new UseThread("HasInterrputEx");endThread.start();Thread.sleep(500);endThread.interrupt();}}//运行结果
HasInterrputEx I am extends Thread.
HasInterrputEx I am extends Thread.
HasInterrputEx I am extends Thread.
HasInterrputEx I am extends Thread.
HasInterrputEx in InterruptedException interrupt flag is false
HasInterrputEx I am extends Thread.
HasInterrputEx interrupt flag is true
java.lang.InterruptedException: sleep interruptedat java.lang.Thread.sleep(Native Method)at cn.study.base.abc.safeend.HasInterrputException$UseThread.run(HasInterrputException.java:18)
深入理解run()和start()
Thread类是Java里对线程概念的抽象,可以这样理解:我们通过new Thread() 其实只是 new 出一个 Thread 的实例,还没有操作系统中真正的线程挂起钩来。
只有执行了 start()方法后,才实现了真正意义上的启动线程(操作系统级别的线程)。
从 Thread 的源码可以看到,Thread 的 start 方法中调用了 start0()方法,而 start0()是个 native 方法,这就说明 Thread#start 一定和操作系统是密切相关的。 start()方法让一个线程进入就绪队列等待分配 cpu,分到 cpu 后才调用实现的 run()方法,start()方法不能重复调用,如果重复调用会抛出异常。
而 run 方法是业务逻辑实现的地方,本质上和任意一个类的任意一个成员方法并没有任何区别,可以重复执行,也可以被单独调用。
多次调用一个线程的start方法会怎么样?
不能调用两次Thread.start,否则会抛出 IllegalThreadStateException 异常,表示线程状态非法。这是由于线程的生命周期规定,一旦线程启动,它的执行流程会被独立地调度和管理(一个对象线程实例只能与一个线程对应挂钩),不能再次启动同一个线程对象。如果希望再次执行线程的任务,需要创建一个新的线程对象,并对其调用 start() 方法。
Thread.run()调用的是主线程,和任意一个类的任意一个成员方法并没有任何区别,可以多次调用。
三、深入学习 Java 的线程
线程的状态/生命周期
Java 中线程的状态分为 6 种:
- 初始(NEW):新创建了一个线程对象,但还没有调用 start()方法。
- 运行(RUNNABLE):Java 线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。线程对象创建后,其他线程(比如 main 线程)调用了该对象的 start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取 CPU 的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得 CPU 时间片后变为运行中状态(running)。
- 阻塞(BLOCKED):表示线程阻塞于锁。线程在运行过程中,可能因为某些原因被阻塞。因获取不到锁对象而进入阻塞状态。在被阻塞期间,线程不会占用CPU资源。
- 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断),线程在运行过程中,主动调用了 wait() 、join() 方法时,线程进入等待状态。在等待状态下,线程会释放持有的锁,直到其他线程唤醒它。
- 超时等待(TIMED_WAITING):该状态不同于 WAITING,它可以在指定的时间后自行返回。线程在运行过程中,调用了带有超时参数的 sleep() 、 wait() 方法或者 join() 方法,或者执行了 LockSupport.parkNanos() 或 LockSupport.parkUntil() 方法,线程进入超时等待状态。
- 终止(TERMINATED):表示该线程已经执行完毕。线程的 run() 方法执行结束,或者线程被强制终止(例如调用 stop() 方法,但不推荐使用),线程进入终止状态。
可以使用jstack <pid> 进程id 查看线程状态
状态之间的变迁如下图所示,掌握这些状态可以让我们在进行 Java 程序调优时可以提供很大的帮助。
- 线程运行过程中条件不满足进入等待waiting,条件满足被唤醒继续运行(Object.wait() Thread.join() LockSupport.park()无参数与超时等待TIME_WARITING不同: 可以被唤醒即使条件不满足)
- 被synchronized关键字内置锁机制所阻塞的才会进入塞(BLOCKED)状态
- 被其他显示锁可重入锁等等的只会进入等待(WARITING)或者超时等待(TIME_WARITING)
- 只有在(runnable、running、ready)状态的线程才会去竞争cpu资源
其他的线程相关方法
yield()方法:使当前线程让出 CPU 占有权,但让出的时间是不可设定的。也不会释放锁资源。同时执行 yield()的线程有可能在进入到就绪状态后会被操作系统再次选中马上又被执行。(可以增加其他线程运行的概率,调用yield()进入就绪状态READY后,与其他线程一起竞争CPU资源调度)
比如,ConcurrentHashMap#initTable 方法中就使用了这个方法:
new currentHashMap()的时候并没有被分配空间,当put数据的时候currentHashMap()才会给数组分配空间,进行初始化操作,currentHashMap() 数组只有一个,而执行put()方法的线程有多个,对应数组初始化只需要一个线程就可以了,其他线程应该被阻塞或等待
这是因为 ConcurrentHashMap 中可能被多个线程同时初始化 table,但是其实这个时候只允许一个线程进行初始化操作,其他的线程就需要被阻塞或等待,但是初始化操作其实很快,这里 Doug Lea 为了避免阻塞或者等待这些操作引发的上下文切换等等开销,就让其他不执行初始化操作的线程干脆执行 yield() 方法,以让出 CPU 执行权,让执行初始化操作的线程可以更快的执行完成。
线程的优先级
在 Java 线程中,通过一个整型成员变量 priority 来控制优先级,优先级的范围从 1~10,在线程构建的时候可以通过 setPriority(int)方法来修改优先级,默认优先级是 5,优先级高的线程分配时间片的数量要多于优先级低的线程。但是优先级高的线程不一定会比优先级低的线程先执行
设置线程优先级时,针对频繁阻塞(休眠或者 I/O 操作)的线程需要设置较高优先级,而偏重计算(需要较多 CPU 时间或者偏运算)的线程则设置较低的优先级,确保处理器不会被独占。在不同的 JVM 以及操作系统上,线程规划会存在差异,有些操作系统甚至会忽略对线程优先级的设定。
线程的调度
线程调度是指系统为线程分配 CPU 使用权的过程,主要调度方式有两种:
- 协同式线程调度(Cooperative Threads-Scheduling)
- 抢占式线程调度(Preemptive Threads-Scheduling)、
使用协同式线程调度的多线程系统,线程执行的时间由线程本身来控制,线程把自己的工作执行完之后,要主动通知系统切换到另外一个线程上。使用协同式线程调度的最大好处是实现简单,由于线程要把自己的事情做完后才会通知系统进行线程切换,所以没有线程同步的问题,但是坏处也很明显,如果一个线程出了问题,则程序就会一直阻塞。
使用抢占式线程调度的多线程系统,每个线程执行的时间以及是否切换都由系统决定。在这种情况下,线程的执行时间不可控,所以不会有「一个线程导致整个进程阻塞」的问题出现。
在 Java 中,Thread.yield()可以让出 CPU 执行时间,但是对于获取执行时间,线程本身是没有办法的。对于获取 CPU 执行时间,线程唯一可以使用的手段是设置线程优先级,Java 设置了 10 个级别的程序优先级,当两个线程同时处于Ready 状态时,优先级越高的线程越容易被系统选择执行。
线程和协程
线程其实是操作系统层面的实体,Java 中的线程怎么和操作系统层面对应起来呢?
任何语言实现线程主要有三种方式:使用内核线程实现(1:1 实现),使用用户线程实现(1:N 实现),使用用户线程加轻量级进程混合实现(N:M 实现)。
内核态:处于内核态的 CPU 可以访问任意的数据,包括外围设备,比如网卡、硬盘等,处于内核态的 CPU 可以从一个程序切换到另外一个程序,并且占用 CPU 不会发生抢占情况,一般处于特权级 0 的状态我们称之为内核态。
用户态:处于用户态的 CPU 只能受限的访问内存,并且不允许访问外围设备,用户态下的 CPU 不允许独占,也就是说 CPU 能够被其他程序获取。
内核线程实现
使用内核线程实现的方式也被称为 1: 1 实现。 内核线程(Kernel-Level Thread, KLT) 就是直接由操作系统内核(Kernel, 下称内核) 支持的线程,这种线程由内核来完成线程切换, 内核通过操纵调度器(Scheduler) 对线程进行调度(发生上下文切换), 并负责将线程的任务映射到各个处理器上。
由于内核线程的支持,每个线程都成为一个独立的调度单元,即使其中某一个在系统调用中被阻塞了,也不会影响整个进程继续工作,相关的调度工作也不需要额外考虑,已经由操作系统处理了。
局限性:首先,由于是基于内核线程实现的,所以各种线程操作,如创建、析构及同步,都需要进行系统调用。而系统调用的代价相对较高, 需要在用户态(User Mode)和内核态(Kernel Mode)中来回切换。其次,每个语言层面的线程都需要有一个内核线程的支持,因此要消耗一定的内核资源(如内核线程的栈空间),因此一个系统支持的线程数量是有限的。
用户线程实现
严格意义上的用户线程指的是完全建立在用户空间的线程库上,系统内核不能感知到用户线程的存在及如何实现的。用户线程的建立、同步、销毁和调度完全在用户态中完成,不需要内核的帮助。 如果程序实现得当, 这种线程不需要切换到内核态,因此操作可以是非常快速且低消耗的,也能够支持规模更大的线程数量, 部分高性能数据库中的多线程就是由用户线程实现的。
用户线程的优势在于不需要系统内核支援,劣势也在于没有系统内核的支援,所有的线程操作都需要由用户程序自己去处理。线程的创建、销毁、切换和调度都是用户必须考虑的问题,而且由于操作系统只把处理器资源分配到进程,那诸如“阻塞如何处理”“多处理器系统中如何将线程映射到其他处理器上”这类问题解决起来将会异常困难,甚至有些是不可能实现的。因为使用用户线程实现的程序通常都比较复杂,所以一般的应用程序都不倾向使用用户线程。Java 语言曾经使用过用户线程,最终又放弃了。 但是近年来许多新的、以高并发为卖点的编程语言又普遍支持了用户线程,譬如 Golang。
混合实现
线程除了依赖内核线程实现和完全由用户程序自己实现之外, 还有一种将内核线程与用户线程一起使用的实现方式, 被称为 N:M 实现。 在这种混合实现下, 既存在用户线程, 也存在内核线程。
用户线程还是完全建立在用户空间中, 因此用户线程的创建、 切换、 析构等操作依然廉价, 并且可以支持大规模的用户线程并发。
同样又可以使用内核提供的线程调度功能及处理器映射,并且用户线程的系统调用要通过内核线程来完成。在这种混合模式中, 用户线程与轻量级进程的数量比是不定的,是 N:M 的关系。
Java线程的实现--为什么 Java 线程调度是抢占式调度?
Java 线程在早期的 Classic 虚拟机上(JDK 1.2 以前),是用户线程实现的,但从 JDK 1.3 起, 主流商用 Java 虚拟机的线程模型普遍都被替换为基于操作系统原生线程模型来实现,即采用 1: 1 的线程模型。
以 HotSpot 为例,它的每一个 Java 线程都是直接映射到一个操作系统原生线程来实现的,而且中间没有额外的间接结构, 所以 HotSpot 自己是不会去干涉线程调度的,全权交给底下的操作系统去处理。
所以,这就是我们说 Java 线程调度是抢占式调度的原因。而且 Java 中的线程优先级是通过映射到操作系统的原生线程上实现的,所以线程的调度最终取决于操作系统,操作系统中线程的优先级有时并不能和 Java 中的一一对应,所以Java 优先级并不是特别靠谱。
协程
出现的原因
随着互联网行业的发展,目前内核线程实现在很多场景已经有点不适宜了。比如,互联网服务架构在处理一次对外部业务请求的响应, 往往需要分布在不同机器上的大量服务共同协作来实现,,也就是我们常说的微服务,这种服务细分的架构在减少单个服务复杂度、增加复用性的同时,也不可避免地增加了服务的数量, 缩短了留给每个服务的响应时间。这要求每一个服务都必须在极短的时间内完成计算, 这样组合多个服务的总耗时才不会太长;也要求每一个服务提供者都要能同时处理数量更庞大的请求, 这样才不会出现请求由于某个服务被阻塞而出现等待。
Java 目前的并发编程机制就与上述架构趋势产生了一些矛盾,1:1 的内核线程模型是如今 Java 虚拟机线程实现的主流选择, 但是这种映射到操作系统上的线程天然的缺陷是切换、调度成本高昂,系统能容纳的线程数量也很有限。以前处理一个请求可以允许花费很长时间在单体应用中,具有这种线程切换的成本也是无伤大雅的,但现在在每个请求本身的执行时间变得很短、数量变得很多的前提下,用户本身的业务线程切换的开销甚至可能会接近用于计算本身的开销,这就会造成严重的浪费。
另外我们常见的 Java Web 服务器,比如 Tomcat 的线程池的容量通常在几十个到两百之间,当把数以百万计的请求往线程池里面灌时,系统即使能处理得过来,但其中的切换损耗也是相当可观的。
这样的话,对 Java 语言来说,用户线程的重新引入成为了解决上述问题一个非常可行的方案。
其次,Go 语言等支持用户线程等新型语言给 Java 带来了巨大的压力,也使得 Java 引入用户线程成为了一个绕不开的话题。
协程简介
为什么用户线程又被称为协程呢?
内核线程的切换开销是来自于保护和恢复现场的成本,那如果改为采用用户线程,这部分开销就能够省略掉吗?
答案还是“不能”。 但是,一旦把保护、恢复现场及调度的工作从操作系统交到程序员手上,则可以通过很多手段来缩减这些开销。
由于最初多数的用户线程是被设计成协同式调度(Cooperative Scheduling)的,所以它有了一个别名——“协程”(Coroutine) 完整地做调用栈的保护、恢复工作,所以今天也被称为“有栈协程”(Stackfull Coroutine)。协程的主要优势是轻量,无论是有栈协程还是无栈协程,都要比传统内核线程要轻量得多。如果进行量化的话, 那么如果不显式设置,则在 64 位 Linux上 HotSpot 的线程栈容量默认是 1MB,此外内核数据结构(Kernel Data Structures)还会额外消耗 16KB 内存。与之相对的, 一个协程的栈通常在几百个字节到几 KB 之间, 所以 Java 虚拟机里线程池容量达到两百就已经不算小了, 而很多支持协程的应用中, 同时并存的协程数量可数以十万计。
协程当然也有它的局限, 需要在应用层面实现的内容(调用栈、 调度器这些)特别多,同时因为协程基本上是协同式调度,则协同式调度的缺点自然在协程上也存在。
总的来说,协程机制适用于被阻塞的,且需要大量并发的场景(网络 io),不适合大量计算的场景,因为协程提供规模(更高的吞吐量),而不是速度(更低的延迟)。
线程和协程使用场景?
协程适用:高IO密集型,高并发,高吞吐量(网络编程、高并发的服务器程序)(对比线程,增强了规模,速度并没有增强)
线程适用:计算密集型(cpu不与网络打交道,从内存中取数据计算)
纤程-Java 中的协程
在 JVM 的实现上,以 HotSpot 为例,协程的实现会有些额外的限制,Java调用栈跟本地调用栈是做在一起的。如果在协程中调用了本地方法,还能否正常切换协程而不影响整个线程? 另外,如果协程中遇传统的线程同步措施会怎样? 譬如 Kotlin 提供的协程实现, 一旦遭遇 synchronize 关键字, 那挂起来的仍将是整个线程。
所以 Java 开发组就 Java 中协程的实现也做了很多努力,OpenJDK 在 2018 年创建了 Loom 项目,这是 Java 的官方解决方案, 并用了“纤程(Fiber)”这个名字。
Loom 项目背后的意图是重新提供对用户线程的支持,但这些新功能不是为了取代当前基于操作系统的线程实现, 而是会有两个并发编程模型在 Java 虚拟机中并存,可以在程序中同时使用。新模型有意地保持了与目前线程模型相似的 API 设计, 它们甚至可以拥有一个共同的基类, 这样现有的代码就不需要为了使用纤程而进行过多改动, 甚至不需要知道背后采用了哪个并发编程模型。根据 Loom 团队在 2018 年公布的他们对 Jetty 基于纤程改造后的测试结果,同样在 5000QPS 的压力下, 以容量为 400 的线程池的传统模式和每个请求配以一个纤程的新并发处理模式进行对比, 前者的请求响应延迟在 10000 至 20000 毫秒之间, 而后者的延迟普遍在 200 毫秒以下,
目前 Java 中比较出名的协程库是 Quasar[ˈkweɪzɑː(r)](Loom 项目的 Leader 就是 Quasar 的作者 Ron Pressler), Quasar 的实现原理是字节码注入,在字节码层面对当前被调用函数中的所有局部变量进行保存和恢复。这种不依赖 Java 虚拟机的现场保护虽然能够工作,但影响性能。
Quasar
Quasar 的使用其实并不复杂,首先引入 Maven 依赖
<dependency><groupId>co.paralleluniverse</groupId><artifactId>quasar-core</artifactId><version>0.7.9</version>
</dependency>
在执行 Quasar 的代码前,还需要配置 VM 参数(Quasar 的实现原理是字节码注入,所以,在运行应用前,需要配置好 quasar-core 的 java agent 地址)
本机的 Maven 仓库路径
-javaagent:E:\Dev\dep\repository\co\paralleluniverse\quasar-core\0.7.9\quasar-core-0.7.9.jar
在具体的业务场景上,我们模拟调用某个远程的服务,假设远程服务处理耗时需要 1S,使用休眠 1S 来代替。为了比较,用多线程和协程分别调用这个服务
10000 次,来看看两者所需的耗时。
//Quasar
public class FiberExample {public static void main(String[] args) throws Exception{CountDownLatch count = new CountDownLatch(10000);//线程间进行协同工作的工具类StopWatch stopWatch = new StopWatch();stopWatch.start();//模拟10000个用户任务IntStream.range(0,10000).forEach(i-> new Fiber() {@Overrideprotected String run() throws SuspendExecution, InterruptedException {//Quasar中Thread和Fiber都被称为Strand,Fiber不能调用Thread.sleep休眠Strand.sleep(1000 );count.countDown();return "aa";}}.start());count.await();stopWatch.stop();System.out.println("结束了: " + stopWatch.prettyPrint());}}//运行结果
结束了: StopWatch '': running time = 1276237900 ns
---------------------------------------------
ns % Task name
---------------------------------------------
1276237900 100%
//线程
public class Standard {public static void main(String[] args) throws Exception{CountDownLatch count = new CountDownLatch(10000);StopWatch stopWatch = new StopWatch();stopWatch.start();ExecutorService executorService = Executors.newCachedThreadPool();//无限量,任务来一个开启一个线程//ExecutorService executorService = Executors.newFixedThreadPool(200);//花费时间:50470955300 nsIntStream.range(0,10000).forEach(i-> executorService.submit(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException ex) { }count.countDown();}));count.await();stopWatch.stop();System.out.println("结束了: " + stopWatch.prettyPrint());executorService.shutdownNow();}
}//运行结果ExecutorService executorService = Executors.newCachedThreadPool();
结束了: StopWatch '': running time = 2564249800 ns
---------------------------------------------
ns % Task name
---------------------------------------------
2564249800 100% //运行结果ExecutorService executorService = Executors.newFixedThreadPool(200);
结束了: StopWatch '': running time = 50470955300 ns
---------------------------------------------
ns % Task name
---------------------------------------------
50470955300 100%
从代码层面来看,两者的代码高度相似,忽略两者的公共部分,代码不同的地方也就 2、3 行。
其中的 Fiber 就是 Quasar 为我们提供的协程相关的类,可以类比为 Java 中的 Thread 类。
其他的 CountDownLatch(闭锁,线程的某种协调工具类)、Executors.newCachedThreadPool(线程池)是并发编程后面的课程将要学习的知识。StopWatch 是 Spring 的一个工具类,一个简单的秒表工具,可以计时指定代码段的运行时间以及汇总这个运行时间。
这些代码的业务意义:调用远程服务,10000 次,每次耗时 1S,然后统计总耗时。
可以看到性能的提升还是非常明显的。而且上面多线程编程时,并没有指定线程池的大小,在实际开发中是绝不允许的。一般我们会设置一个固定大小的线程池,因为线程资源是宝贵,线程多了费内存还会带来线程切换的开销。上面的场景在设置 200 个固定大小线程池时(Executors.newFixedThreadPool(200)),在本机的测试结果达到了 50 多秒,几乎是数量级的增加。
由这个结果也可以看到协程在需要处理大量 IO 的情况下非常具有优势,基于固定的几个线程调度,可以轻松实现百万级的协程处理,而且内存消耗非常平稳。
JDK19 的虚拟线程
2022 年 9 月 22 日,JDK19(非 LTS 版本)正式发布,引入了协程,并称为轻量级虚拟线程。但是这个特性目前还是预览版,还不能引入生成环境。因为环境所限,本课程不提供实际的范例,只讲述基本用法和原理。
要使用的话,需要通过使用 javac --release 19 --enable-preview XXX.java 编译程序,并使用 java --enable-preview XXX 运行该程序
在具体使用上和原来的 Thread API 差别不大:java.lang.Thread.Builder,可以创建和启动虚拟线程,例如:
Thread thread = Thread.ofVirtual().name("duke").unstarted(runnable);// Thread.ofPlatform() 则创建传统意义的实际或者Thread.startVirtualThread(Runnable)
并通过 Executors.newVirtualThreadPerTaskExecutor()提供了虚拟线程池功能。在具体实现上,虚拟线程当然是基于用户线程模式实现的,JDK 的调度程序不直接将虚拟线程分配给处理器,而是将虚拟线程分配给实际线程,是一个 M: N 调度,具体的调度程序由已有的 ForkJoinPool 提供支持。但是虚拟线程不是协同调度的,JDK 的虚拟线程调度程序通过将虚拟线程挂载到平台线程上来分配要在平台线程上执行的虚拟线程。在运行一些代码之后,虚拟线程可以从其载体卸载。此时平台线程是空闲的,因此调度程序可以在其上挂载不同的虚拟线程,从而使其再次成为载体。
通常,当虚拟线程阻塞 I/O 或 JDK 中的其他阻塞操作(如 BlockingQueue.take ())时,它将卸载。当阻塞操作准备完成时(例如,在套接字上已经接收到字节) ,它将虚拟线程提交回调度程序,调度程序将在运营商上挂载虚拟线程以恢复执行。虚拟线程的挂载和卸载频繁且透明,并且不会阻塞任何 OS 线程。
守护线程
Daemon(守护)线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。这意味着,当一个 Java 虚拟机中不存在非 Daemon 线程的时候,Java 虚拟机将会退出。可以通过调用 Thread.setDaemon(true)将线程设置为 Daemon 线程。我们一般用不上,比如垃圾回收线程就是 Daemon 线程。
Daemon 线程被用作完成支持性工作,但是在 Java 虚拟机退出时 Daemon 线程中的 finally 块并不一定会执行。在构建 Daemon 线程时,不能依靠 finally 块中的内容来确保执行关闭或清理资源的逻辑。
public class DaemonThread {private static class UseThread extends Thread{@Overridepublic void run() {try {//while(!isInterrupted()){while (true) {System.out.println(Thread.currentThread().getName() + " I am extends Thread.");}
// System.out.println(Thread.currentThread().getName()
// + " interrupt flag is " + isInterrupted());} finally {//守护线程中finally不一定起作用System.out.println(" .............finally");}}}// static{
// UseThread useThread = new UseThread();
// //useThread.setDaemon(true);
// useThread.start();
// }public static void main(String[] args) throws InterruptedException, ExecutionException {UseThread useThread = new UseThread();//注释这个,程序死循环一直运行并输出Thread-0 I am extends Thread.useThread.setDaemon(true);useThread.start();Thread.sleep(1000);System.out.println("main end");}
}//
//运行结果
.
Thread-0 I am extends Thread.
//省略n多个Thread-0 I am extends Thread.
Thread-0 I am extends Thread.
Thread-0 I am extends Thread.
Thread-0 I am extends Thread.
main end
Thread-0 I am extends Thread.
Thread-0 I am extends Thread.
Thread-0 I am extends Thread.
//省略n多个Thread-0 I am extends Thread.
//能运行结束,最后输出Thread-0 I am extends Thread.
四、线程间的通信和协调、协作
很多的时候,孤零零的一个线程工作并没有什么太多用处,更多的时候,我们是很多线程一起工作,而且是这些线程间进行通信,或者配合着完成某项工作,这就离不开线程间的通信和协调、协作。
管道输入输出流
已经知道,进程间有好几种通信机制,其中包括了管道,其实 Java 的线程里也有类似的管道机制,用于线程之间的数据传输,而传输的媒介为内存。设想这么一个应用场景:通过 Java 应用生成文件,然后需要将文件上传到云端,比如:
- 页面点击导出后,后台触发导出任务,然后将 mysql 中的数据根据导出条件查询出来,生成 Excel 文件,然后将文件上传到 oss,最后发布一个下载文件的链接。
- 和银行以及金融机构对接时,从本地某个数据源查询数据后,上报 xml 格式的数据,给到指定的 ftp、或是 oss 的某个目录下也是类似的。
一般的做法是,先将文件写入到本地磁盘,然后从文件磁盘读出来上传到云盘,但是通过 Java 中的管道输入输出流一步到位,则可以避免写入磁盘这一步。
Java 中的管道输入/输出流主要包括了如下 4 种具体实现:
面向字节:PipedOutputStream、PipedInputStream
面向字符:PipedReader 、 PipedWriter
public class Piped {public static void main(String[] args) throws Exception {PipedWriter out = new PipedWriter();PipedReader in = new PipedReader();//将输出流和输入流进行连接,否则在使用时会抛出IOExceptionout.connect(in);Thread printThread = new Thread(new Print(in), "PrintThread");printThread.start();int receive = 0;try {//将键盘的输入,用输出流接受,在实际的业务中,可以将文件流导给输出流while ((receive = System.in.read()) != -1){out.write(receive);}} finally {out.close();}}static class Print implements Runnable {private PipedReader in;public Print(PipedReader in) {this.in = in;}@Overridepublic void run() {int receive = 0;try {//输入流从输出流接收数据,并在控制台显示//在实际的业务中,可以将输入流直接通过网络通信写出while ((receive = in.read()) != -1){System.out.print((char) receive);}} catch (IOException ex) {}}}
}//运行结果->在控制台输入什么就输出什么
join 方法
现在有 T1、T2、T3 三个线程,你怎样保证 T2 在 T1 执行完后执行,T3 在 T2 执行完后执行?
答:用 Thread#join 方法即可,在 T3 中调用 T2.join,在 T2 中调用 T1.join。
join()
把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行(串行)。比如在线程 B 中调用了线程 A 的 Join()方法,直到线程 A 执行完毕后,才会继续执行线程 B 剩下的代码。
synchronized 内置锁
线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,包括数据之间的共享,协同处理事情。这将会带来巨大的价值。
Java 支持多个线程同时访问一个对象或者对象的成员变量,但是多个线程同时访问同一个变量,会导致不可预料的结果。关键字 synchronized 可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,使多个线程访问同一个变量的结果正确,它又称为内置锁机制。
对象锁和类锁
对象锁是用于对象实例方法,或者一个对象实例上的,类锁是用于类的静态方法或者一个类的 class 对象上的。
类锁是指使用 synchronized 关键字修饰静态方法或静态代码块,从而实现对类的锁定,而不是对对象的锁定。类锁可以防止多个线程同时执行被类锁定的方法或代码块。以下是类锁的示例:
// 类锁修饰静态方法,可以直接new SynClass().increment调用
public static synchronized void increment() {count++;
}private static final Object lock = new Object();
public static void increment() {synchronized (lock) { // 类锁修饰静态代码块count++;}
}//increment() 方法被使用 synchronized 关键字修饰,且为静态方法。
//这样,无论多少个线程同时调用 increment() 方法,都会被类锁定,只有一个线程可以执行该方法,从而保证线程安全。
注意:类锁是针对整个类的,而不是针对类的每个实例。当一个线程获取了类锁后,其他线程必须等待该线程释放锁才能执行类锁定的方法或代码块。因此,类锁适用于需要对整个类进行同步的情况。
类的对象实例可以有很多个,所以当对同一个变量操作时,用来做锁的对象必须是同一个,否则加锁毫无作用。比如下面的示例代码:
但是有一点必须注意的是,其实类锁只是一个概念上的东西,并不是真实存在的,类锁其实锁的是每个类的对应的 class 对象,但是每个类只有一个 class 对象,所以每个类只有一个类锁。
//类说明:synchronized关键字的使用方法
public class SynTest {private long count =0;private Object obj = new Object();//作为一个锁public long getCount() {return count;}public void setCount(long count) {this.count = count;}//无锁时public void incCountNoSyn(){count++;}//用在同步块上、对象锁public void incCountBlock(){synchronized (this){count++;}}//syn直接放在方法上、对象锁,incCountMethod是成员方法和对象密切相关public synchronized void incCountMethod(){count++;}//syn直接放在静态方法上=>类锁,可以直接new SynTest().incCountMethod1调用public synchronized static void incCountMethod1(){//count++;}private static synchronized void sysClass(){}//用在同步块上,但是锁的是单独的对象实例、对象锁public void incCountObj(){synchronized (obj){count++;}}//线程private static class Count extends Thread{private SynTest simplOper;public Count(SynTest simplOper) {this.simplOper = simplOper;}@Overridepublic void run() {for(int i=0;i<10000;i++){//simplOper.incCountBlock();//运行结果正确(有锁)//simplOper.incCountObj();//运行结果正确(有锁)simplOper.incCountNoSyn();//运行结果错误(无锁)}}}public static void main(String[] args) throws InterruptedException {SynTest simplOper = new SynTest();//启动两个线程Count count1 = new Count(simplOper);Count count2 = new Count(simplOper);count1.start();count2.start();Thread.sleep(50);System.out.println(simplOper.count);//20000}
}
同样的,当对同一个变量操作时,类锁和对象(非 class 对象)锁混用也同样毫无用处。
错误的加锁和原因分析
//类说明:错误的加锁和原因分析
public class TestIntegerSyn {public static void main(String[] args) throws InterruptedException {Worker worker=new Worker(1);//Thread.sleep(50);for(int i=0;i<5;i++) {new Thread(worker).start();}}private static class Worker implements Runnable{private Integer i;private Object o = new Object();public Worker(Integer i) {this.i=i;}@Overridepublic void run() {synchronized (i) {Thread thread=Thread.currentThread();System.out.println(thread.getName()+"--@"+System.identityHashCode(i));i++;System.out.println(thread.getName()+"-------[i="+i+"]-@"+System.identityHashCode(i));SleepTools.ms(3000);System.out.println(thread.getName()+"-------[i="+i+"]--@"+System.identityHashCode(i));}}}}
/*
System.identityHashCode(obj) 是 Java 中用于获取对象的唯一标识哈希码(identity hash code)的方法。
这个哈希码不同于对象的 hashCode() 方法返回的哈希码。
System.identityHashCode(obj) 方法的参数是一个对象,它返回该对象的唯一标识哈希码。
这个哈希码由 JVM 根据对象的内存地址计算而来,因此它在同一运行时环境下是唯一的。
需要注意的是,System.identityHashCode(obj) 方法返回的哈希码不会考虑对象的 hashCode() 方法的重写。
因此,不同的对象在内存中具有不同的地址,即使它们的 hashCode() 方法返回相同的值,它们的唯一标识哈希码也是不同的。
*///运行结果
Thread-0--@1636039581
Thread-0-------[i=2]-@1866734045
Thread-4--@1866734045
Thread-4-------[i=3]-@847059392
Thread-0-------[i=3]--@847059392
Thread-4-------[i=3]--@847059392
Thread-3--@847059392
Thread-3-------[i=4]-@646782137
Thread-3-------[i=4]--@646782137
Thread-2--@646782137
Thread-2-------[i=5]-@415803797
Thread-2-------[i=5]--@415803797
Thread-1--@415803797
Thread-1-------[i=6]-@1929761341
Thread-1-------[i=6]--@1929761341
可以看到 i 的取值会出现乱序或者重复取值的现象,原因:虽然我们对 i 进行了加锁,但是
但是当我们反编译这个类的 class 文件后,可以看到 i++实际是:
本质上是返回了一个新的 Integer 对象。也就是每个线程实际加锁的是不同的 Integer 对象,所以说到底,还是当对同一个变量操作时,用来做锁的对象必须是同一个,否则加锁毫无作用。
volatile,最轻量的通信/同步机制
volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
//演示Volatile的提供的可见性
public class VolatileCase {private volatile static boolean ready;private static int number;private static class PrintThread extends Thread{@Overridepublic void run() {System.out.println("PrintThread is running.......");while(!ready){//加上下面这两个输入结果也正确//System.out.println("lll");//内部使用了synchronized//Thread.sleep();//可能在内部引发了线程对缓存的刷新};//无限循环System.out.println("number = "+number);}}public static void main(String[] args) throws InterruptedException {new PrintThread().start();SleepTools.second(1);number = 51;ready = true;SleepTools.second(5);System.out.println("main is ended!");}
}//运行结果
PrintThread is running.......
number = 51
main is ended!
ready不加 volatile 时,子线程无法感知主线程修改了 ready 的值,从而不会退出循环,而加了 volatile 后,子线程可以感知主线程修改了 ready 的值,迅速退出循环。但是 volatile 不能保证数据在多个线程下同时写时的线程安全,volatile 最适用的场景:一个线程写,多个线程读。
等待/通知机制
线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了 “做什么”(what)和“怎么做”(How),简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件,如果条件满足则退出 while 循环,从而完成消费者的工作。却存在如下问题:
1.难以确保及时性。
2. 难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
等待/通知机制则可以很好的避免,这种机制是指一个线程 A 调用了对象 O 的 wait()方法进入等待状态,而另一个线程 B 调用了对象 O 的 notify()或者 notifyAll()方法,线程 A 收到通知后从对象 O 的 wait()方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait()和 notify/notifyAll() 的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
notify():
通知一个在对象上正在等待(Waiting)状态的线程,选择是由系统决定的,不能指定唤醒的具体线程,使其从 wait()方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入 Waiting 状态。
如果有多个线程在等待,系统会随机选择一个线程进行唤醒,选择哪个线程由底层的线程调度器决定。
notifyAll():
用于唤醒所有在该对象上正在等待(Waiting)状态的线程,使它们都重新进入就绪状态(Runnable)。
如果有多个线程在等待,notifyAll() 方法会唤醒所有等待的线程,让它们有机会去竞争执行。
notify()、notifyAll():
这两个方法必须在同步块或同步方法中调用,因为它们都依赖于对象的内部锁(监视器锁)。调用 notify() 或 notifyAll() 方法会释放当前线程持有的锁,以便等待的线程能够获得锁并继续执行。
wait()
wait() :调用该方法使当前线程进入WAITING 等待状态,并释放当前对象的锁。等待状态的线程不能继续执行,直到其他线程调用相同对象的 notify() 或notifyAll() 方法唤醒它,或被中断才会返回。
wait(long timeout) :调用该方法使当前线程进入等待状态,释放当前对象的锁,并设定等待时间。如果在等待时间内没有被其他线程唤醒就超时返回,或者等待时间为0,线程会自动苏醒。
wait (long,int):对于超时时间更细粒度的控制,可以达到纳秒
等待和通知的标准范式
等待方遵循如下原则:
- 获取对象的锁。
- 如果条件不满足,那么调用对象的 wait()方法,被通知后仍要检查条件。
- 条件满足则执行对应的逻辑。
//等待方
synchronized加锁(对象){while(条件不满足){对象.wait()方法}进入后面的业务逻辑
}
通知方遵循如下原则:
- 获得对象的锁。
- 改变条件。
- 通知所有等待在对象上的线程。
//通知方
synchronized加锁(对象){业务逻辑=>改变条件:对象.notify()方法(通知方法)
}
注意:wait()、notify() 必须将它们放在获取了目标对象的锁(synchronized 关键字修饰的同步块或同步方法)内部,不然会抛出IllegalMonitorStateException异常
/*
当在多线程环境中使用 wait() 和 notify() 方法时,通常会配合使用 while 循环来进行条件检查。
这是为了避免虚假唤醒(spurious wakeup)的情况,即在等待线程醒来时,条件可能并未实际满足,因此需要重新检查条件是否满足。
*/
public class Example {private final Object lock = new Object();private boolean condition = false;public void doSomething() {synchronized (lock) { // 获取目标对象的锁try {// 执行业务逻辑// ...while (!condition) {// 进入等待状态,并释放锁lock.wait();}// 被唤醒后继续执行// ...} catch (InterruptedException e) {e.printStackTrace();}}}public void doSomethingElse() {synchronized (lock) { // 获取目标对象的锁// 执行业务逻辑// ...// 设置条件为满足condition = true;// 唤醒等待的线程lock.notify();//一般都放在逻辑代码最后执行,代表业务逻辑已经做完,不放在最后也不会有问题,也会等待业务逻辑执行完才会通知}}
}
当通知方调用notify()的时候,等待方持有的锁将被释放,通知方拿到锁去做业务逻辑
在调用 wait()、notify()系列方法之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法、notify()系列方法,进入 wait()方法后,当前线程释放锁,在从 wait()返回前,线程与其他线程竞争重新获得锁,执行 notify()系列方法的线程退出调用了 notifyAll 的 synchronized 代码块的时候后,他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出 synchronized 代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。
notify 和notifyAll 应该用谁
尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。
//类说明:快递实体类
public class Express {public final static String DIST_CITY = "ShangHai";public final static int TOTAL = 500;private int km ;//快递运输里程数private String site;//快递到达地点public Express() {}public Express(int km, String site) {this.km = km;this.site = site;}public void change(){if (km < TOTAL){km = km +100;System.out.println("the Km is "+this.km);}if(km >= TOTAL){site = DIST_CITY;System.out.println("the Express is arrived");}}//线程等待公里的变化public synchronized void waitKm(){while(this.km <= TOTAL){try {wait();System.out.println("Map thread["+Thread.currentThread().getId() +"] wake,I will change db");} catch (InterruptedException e) {e.printStackTrace();}}}//线程等待目的地的变化public synchronized void waitSite(){while(!this.site.equals(DIST_CITY)){try {wait();System.out.println("Notice User thread["+Thread.currentThread().getId() +"] wake");} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("the site is "+this.site+",I will call user");}
}
//类说明:测试wait/notify/notifyAll
public class TestWN {private static Express express = new Express(0,"WUHAN");//检查里程数变化的线程,不满足条件,线程一直等待private static class CheckKm extends Thread{@Overridepublic void run() {express.waitKm();}}//检查地点变化的线程,不满足条件,线程一直等待private static class CheckSite extends Thread{@Overridepublic void run() {express.waitSite();}}public static void main(String[] args) throws InterruptedException {for(int i=0;i<2;i++){//启用两个线程new CheckSite().start();}for(int i=0;i<2;i++){//启用两个线程new CheckKm().start();}SleepTools.ms(500);for(int i=0; i<5; i++){synchronized (express){express.change();express.notifyAll();}SleepTools.ms(500);}}
}//notify()运行结果
the Km is 100
Notice User thread[20] wake
the Km is 200
Notice User thread[21] wake
the Km is 300
Map thread[22] wake,I will change db
the Km is 400
Map thread[23] wake,I will change db
the Km is 500
the Express is arrived
Notice User thread[20] wake
the site is ShangHai,I will call user//notifyAll()运行结果
the Km is 100
Map thread[23] wake,I will change db
Map thread[22] wake,I will change db
Notice User thread[21] wake
Notice User thread[20] wake
the Km is 200
Notice User thread[20] wake
Notice User thread[21] wake
Map thread[22] wake,I will change db
Map thread[23] wake,I will change db
the Km is 300
Map thread[23] wake,I will change db
Map thread[22] wake,I will change db
Notice User thread[21] wake
Notice User thread[20] wake
the Km is 400
Notice User thread[20] wake
Notice User thread[21] wake
Map thread[22] wake,I will change db
Map thread[23] wake,I will change db
the Km is 500
the Express is arrived
Map thread[23] wake,I will change db
Map thread[22] wake,I will change db
Notice User thread[21] wake
the site is ShangHai,I will call user
Notice User thread[20] wake
the site is ShangHai,I will call user
等待超时模式实现一个连接池
调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。
/*假设等待时间段是 T,那么可以推断出在当前时间 now+T 之后就会超时
等待持续时间:REMAINING=T。
超时时间:FUTURE=now+T。
// 对当前对象加锁*/// 在mills内无法获取到连接,将会返回null 1s
public Connection fetchConnection(long mills) throws InterruptedException {synchronized (pool){//永不超时if(mills<=0){while(pool.isEmpty()){pool.wait();}return pool.removeFirst();}else{//超时时刻long future = System.currentTimeMillis()+mills;//等待时长long remaining = mills;while(pool.isEmpty()&&remaining>0){pool.wait(remaining);//notifyAll()唤醒一次,重新计算等待时长remaining = future-System.currentTimeMillis();}Connection connection = null;if(!pool.isEmpty()){connection = pool.removeFirst();}return connection;}}}/*客户端获取连接的过程被设定为等待超时的模式,也就是在 1000 毫秒内如果无法获取到可用连接,将会返回给客户端一个 null。
设定连接池的大小为 10 个,然后通过调节客户端的线程数来模拟无法获取连接的场景。
它通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,
调用方需要先调用 fetchConnection(long)方法来指定在多少毫秒内超时获取连接,
当连接使用完成后,需要调用 releaseConnection(Connection)方法将连接放回线程池
*/
方法和锁
调用 yield() 、sleep()、wait()、notify()等方法对锁有何影响?
yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。
调用 wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行 wait 方法后面的代码。
调用 notify()系列方法后,对锁无影响,线程只有在 syn 同步代码执行完后才会自然而然的释放锁,所以 notify()系列方法一般都是 syn 同步代码的最后一行。
yield() 、sleep()是在Thread线程调用
wait()、notify()是对象调用
为什么 wait 和 notify 方法要在同步块中调用?
主要是因为 Java API 强制要求这样做,如果你不这么做,你的代码会抛出 IllegalMonitorStateException 异常。
其实真实原因是:
这个问题并不是说只在 Java 语言中会出现,而是会在所有的多线程环境下出现。 假如我们有两个线程,一个消费者线程,一个生产者线程。生产者线程的任务可以简化成将 count 加一,而后唤醒消费者;消费者则是将 count 减一,而后在减到 0 的时候陷入睡眠:
生产者伪代码:count+1;notify();消费者伪代码:while(count<=0)wait()count--
这里面有问题:
生产者是两个步骤:
1. count+1;
2. notify();
消费者也是两个步骤:
1. 检查 count 值;
2. 睡眠或者减一;
万一这些步骤混杂在一起呢?比如说,初始的时候 count 等于 0,这个时候 消费者检查 count 的值,发现 count 小于等于 0 的条件成立;就在这个时候,发 生了上下文切换,生产者进来了,噼噼啪啪一顿操作,把两个步骤都执行完了, 也就是发出了通知,准备唤醒一个线程。这个时候消费者刚决定睡觉,还没睡呢, 所以这个通知就会被丢掉。紧接着,消费者就睡过去了……
这就是所谓的 lost wake up 问题。 无法唤醒问题
那么怎么解决这个问题呢?
现在我们应该就能够看到,问题的根源在于,消费者在检查 count 到调用 wait()之间,count 就可能被改掉了。这就是一种很常见的竞态条件。
很自然的想法是,让消费者和生产者竞争一把锁,竞争到了的,才能够修改 count 的值。
为什么应该在循环中检查等待条件?while(count<=0) wait()
处于等待状态的线程可能会收到错误警报和伪唤醒,如果不在循环中检查等待条件,程序就会在没有满足结束条件的情况下退出。因此,当一个等待线程醒来时,不能认为它原来的等待状态仍然是有效的,在 notify()方法调用之后和等待线程醒来之前这段时间它可能会改变。这就是在循环中使用 wait()方法效果更好的原因。
五、CompleteableFuture
Java并发/多线程CompleteableFuture详解-CSDN博客