文章目录
- 零 基本概念
- 1 CAS、ABA 问题和原子变量
- 2 this 引用逸出
- 3 不变性 immutable
- 4 同步、异步、阻塞、非阻塞
- 5 JMM
- 6 同步方案演示:计数器 demo*
- 一 进程与线程
- 1 区别与联系
- 2 Java内存区域
- 3 线程组
- 4 线程的上下文切换
- 5 并发与并行
- 6 线程的生命周期与状态
- 二 线程间的通信和同步
- 1 线程同步:锁(synchronized / Lock)
- 2 线程同步:wait-notify
- 3 线程同步:volatile 信号量
- 4 线程通信:管道
- 三 线程死锁
- 1 死锁的四个必要条件
- 2 死锁解决:预防死锁、避免死锁、检测与解除死锁
- 3 避免死锁
- 开放调用
- 限制锁顺序*
- 四 并发编程的相关方法
- 1 sleep() 与 wait()
- 2 run() 与 start()
- 3 join()
- 4 yield()
- 5 线程创建的四种方法
- 6 获取与设置优先级、守护线程
- 五 synchronized
- 1 简介
- 2 使用方法
- 3 注意事项
- 六 volatile
- 1 作用
- 2 双重校验锁实现 单例模式(线程安全)
- 3 与 synchronized 的关系
- 4 volatile 的实现——内存屏障*
- 5 使用 volatile 必须满足的条件*
- 七 ReentrantLock
- 1 可重入锁
- 2 和 synchronized 异同
- 八 并发容器
- 1 ConcurrentHashMap
- 扩容问题**
- 扩容需要解决的问题
- 扩容流程
- sizeCtl
- transferIndex
- ForwardingNode
- 2 写时复制:CopyOnWriteArrayList / CopyOnWriteArraySet
- 3 并发队列
- 4 基于跳表:ConcurrentSkipListMap / ConcurrentSkipListSet
- 九 异步任务执行服务、线程池
- 0 基本接口和类 & 提交任务的方法
- 1 为什么使用线程池
- 2 ThreadPoolExecutor 构造方法
- 3 ThreadPoolExecutor 的状态 & 扩展
- 4 ThreadPoolExecutor 任务处理流程
- 5 线程池大小的设置
- 6 ForkJoinPool
- 7 按完成顺序获取任务返回结果
- 8 线程池异常处理
- 9 获取执行结果:Future
- 实例 Memorizer
- 十 同步工具类
- 1 CountDownLatch
- 2 循环栅栏 CyclicBarrier
- 3 信号量 Semaphore
- 4 ThreadLocal
- 十一 线程中断
- 1 中断方法
- 2 示例
- 十二 性能与可伸缩性
- 1 相关概念
- 2 多线程引入的开销
- 3 减少锁竞争
零 基本概念
- 对象的线程安全
- 当 多个线程 访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那这个 对象 就是线程安全的
- 并发编程的三个特性:原子性、有序性、可见性
- 关于有序性
- 如果在本线程内观察,所有操作都是有序的(线程内表现为串行)
- 如果从一个线程观察另一个线程,所有的操作都是无序的(指令重排序、工作内存和主内存同步延迟)
- 无状态的对象一定是线程安全的:计算过程中的临时状态(传入的参数)仅存在于栈上的局部变量中,并且只能由正在执行的线程访问
- 多线程常见的两个问题
- 竞态条件:计算的正确性取决于多个线程交替执行的时序,即结果是否正确具有偶然性
- 内存可见性:一个线程对一个共享变量的修改,另一个线程不能马上看到(甚至永远不能看到)
因为除了内存,数据还会被缓存在寄存器和各级缓存中
- 访问变量时,不一定从内存中取
- 修改变量时,可能先写到缓存,稍后才同步到内存中
1 CAS、ABA 问题和原子变量
CAS CompareAndSwap
- 是一种非阻塞同步方式,采用基于冲突检测的乐观并发策略
- 包含三个操作数:需要读写的内存位置V、预期值A、拟写入的新值B
- 当且仅当V=A时,CAS才会通过原子方式用B更新V的值,否则不会执行任何操作
- 无论是否操作成功,都返回V原有的值
- 多个线程尝试使用CAS同时更新一个变量时,只有一个线程可以成功变更,其它都会失败,但是失败的线程不会被挂起,而是被告知失败,可以再次尝试(通常在失败时不执行任何操作)
- CAS 的整个过程是原子的,依赖于硬件指令集
- CAS 是 Java 并发包的基础,基于 CAS 可以实现高效、乐观、非阻塞的数据结构和算法,它也是并发包的锁、同步工具和各种容器的基础
java.util.concurrent.atomic
包含了一些原子变量类,也是基于 CAS 实现的,使用示例:
// 线程不安全
public class ThreadSafeCounter {private final long count = 0L;public void plusOne() {// 自增操作包含 读取-修改-写入 三个步骤,且非原子操作count++;}
}// 线程安全
public class ThreadSafeCounter {private final AtomicLong count = new AtomicLong(0);public void plusOne() {// 原子操作count.incrementAndGet(); }
}
- CAS 常见的 ABA 问题
- 一种简单的解决方法是,为值加上版本号:两次看到相同的值,版本号是不同的,
AtomicStampedReference
维护一个引用和对应的版本号,且修改操作是原子的 - 大部分情况下 ABA 问题不会影响并发的正确性,如果需要解决该问题,采用互斥同步可能会更高效
- 一种简单的解决方法是,为值加上版本号:两次看到相同的值,版本号是不同的,
private void test() {AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 111);// 所有的修改操作都是原子的atomicStampedReference.attemptStamp(2, 222);atomicStampedReference.attemptStamp(1, 333); // 值为1,但版本号不同}
2 this 引用逸出
this
引用逸出的后果是:如果发生逸出时外部类未初始化完成,会导致不可预料的结果this
引用逸出产生的条件:在构造函数中 创建 并 发布 内部类(即使发布的语句是构造函数的最后一行)- 内部类可以访问外部类的对象的域,是因为内部类构造的时候,会把外部类的对象
this
隐式的作为一个参数传递给内部类
// 逸出示例
public class ThisEscape {public final int id;public final String name;public ThisEscape(EventSource<EventListener> source) {id = 1;name = "xjpking";// 在此创建了内部类,内部类具有外部类的this引用,但外部类的构造方法并未执行完成source.registerListener(new EventListener() {public void onEvent(Object obj) {System.out.println("id: "+ThisEscape.this.id);System.out.println("name: "+ThisEscape.this.name);}});}
}// 正确示例:保证创建内部类时,外部类已经初始化完成
// 破坏了this逸出的第二个条件:没有在构造方法中发布内部类
public class ThisSafe {public final int id;public final String name;private ThisSafe(EventSource<EventListener> source) {id = 1;listener = new EventListener(){public void onEvent(Object obj) {System.out.println("id: "+ThisSafe.this.id);System.out.println("name: "+ThisSafe.this.name);}};name = "xjpisking";}// 只有构造方法返回之后,外部类初始化完成public static ThisSafe getInstance(EventSource<EventListener> source) {ThisSafe safe = new ThisSafe();source.registerListener(safe.listener);return safe;}
}
3 不变性 immutable
- 不可变对象满足以下条件:
- 对象创建后状态不能修改
- 所有属性都由
final
修饰 - 对象是正确创建的(不发生
this
逸出)
- 不可变对象一定是线程安全的
- 可以将多个与一致性相关的属性绑定在一个不可变对象中(让它们看起来是原子的,避免部分被修改),从而保证这些属性的一致性
4 同步、异步、阻塞、非阻塞
- 同步、异步指请求发送方的行为
- 同步:发送方等待接收方返回处理后的结果
- 异步:发送方不等待接收方返回处理后的结果
- 阻塞、非阻塞指请求接收方的行为
- 阻塞:接收方直到处理请求后才返回结果
- 非阻塞:接收方即时返回结果,如果未处理完请求也返回
组合 | 行为 |
---|---|
同步阻塞 | 发送方发送请求之后一直等待响应;接收方处理请求如果不能马上等到返回结果,就一直等到返回结果后,才响应发送方 |
同步非阻塞 | 发送方发送请求之后一直等待响应;接收方立即返回结果 |
异步阻塞 | 发送方向接收方请求后,不等待响应,可以继续其他工作;接收方处理请求如果不能马上等到返回结果,就一直等到返回结果后,才响应发送方 |
异步非阻塞 | 发送方向接收方请求后,不等待响应,可以继续其他工作;接收方立即返回结果 |
5 JMM
- 对应关系
- 从 JVM 运行时内存来看,工作内存对应栈,主内存对应堆中的对象实例
- 从硬件角度来看,工作内存对应寄存器或高速缓存,主内存对应物理硬件的内存
- 工作内存保存了变量的主内存副本,线程对变量的所有操作都必须在工作内存中执行,不能直接读写主内存的数据
volatile
变量同样遵循,只是看起来如同直接在主内存中读写访问
6 同步方案演示:计数器 demo*
- 同时面临竞态条件和内存可见性问题
- 演示四种同步方案
volatile
关键字synchronized
关键字- 原子类型变量
- 显式锁
ReentrantLock
class Counter {public int nonSyncCount = 0;public volatile int volatileCount = 0;public int synchronizedCount = 0;public AtomicInteger atomicCount = new AtomicInteger(0);public int lockCount = 0;private final ReentrantLock lock = new ReentrantLock();public void nonSyncIncrease() {nonSyncCount++;}public void volatileIncrease() {volatileCount++;}public synchronized void synchronizedIncrease() {synchronizedCount++;}public void atomicIncrease() {atomicCount.incrementAndGet();}public void lockIncrease() {lock.lock();lockCount++;lock.unlock();}
}public static void main(String[] args) throws InterruptedException {Counter counter = new Counter();// 创建10个线程,每个线程调用5个计数值的自增方法for (int i = 0; i < 10; i++) {new Thread(() -> {for (int j = 0; j < 10000; j++) {counter.nonSyncIncrease();counter.volatileIncrease();counter.synchronizedIncrease();counter.atomicIncrease();counter.lockIncrease();}}).start();}while (Thread.activeCount() > 1) {Thread.yield();}System.out.println(counter.nonSyncCount); // 未同步,结果随机System.out.println(counter.volatileCount); // 仅使用volatile修饰变量,结果随机,因为执行结果依赖于旧值System.out.println(counter.synchronizedCount); // 使用synchronized同步,100000System.out.println(counter.atomicCount); // 使用原子变量,100000System.out.println(counter.lockCount); // 使用显式锁,100000}
一 进程与线程
1 区别与联系
- 进程是系统分配资源的基本单位,线程是 CPU 调度的基本单位
- 进程和线程本质的区别是是否单独占有内存地址空间及其它系统资源(比如I/O)
- 线程是轻量级进程,进程在其执行的过程中可以产生多个线程
- 与进程不同的是属于同一进程的多个线程共享进程的堆和方法区资源 (JDK1.8 之后的元空间),但每个线程有自己的程序计数器、虚拟机栈和本地方法栈
- 所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多
- 使用多线程而非多进程实现并发的优势:
- 进程间的通信比较复杂,而线程间的通信比较简单
- 进程是重量级的,而线程是轻量级的,故多线程方式的系统开销更小
2 Java内存区域
- 线程私有程序计数器的目的是,线程切换后能恢复到正确的执行位置
- 线程私有虚拟机栈和本地方法栈的目的是,保证线程中的 局部变量(存放在栈帧中的局部变量表) 不被别的线程访问到
- 堆和方法区是所有 线程共享的资源,其中堆是进程中最大的一块内存,主要用于存放新创建的对象 (几乎所有对象都在这里分配内存),方法区主要用于存放已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据
3 线程组
- 线程组是一个树状的结构,每个线程组下面可以有多个线程或者 线程组
- 每个 Thread 必然存在于一个 ThreadGroup 中,Thread 不能独立于 ThreadGroup 存在
- 如果在
new Thread
时没有显式指定,那么默认将父线程(当前执行new Thread的线程)线程组设置为自己的线程组 - ThreadGroup 是一个标准的向下引用的树状结构,这样设计的原因是防止"上级"线程被"下级"线程引用而无法有效地被GC回收
- 线程组可以起到统一控制线程的优先级和检查线程的权限的作用
4 线程的上下文切换
- 线程在执行过程中会有自己的运行条件和状态(也称上下文),比如上文所说到过的程序计数器,栈信息等
- 线程切换时,需要保存当前线程的上下文,留待线程下次占用 CPU 的时候恢复现场,并加载下一个将要占用 CPU 的线程上下文
- 上下文切换会使部分缓存失效
举例说明:线程A切换到线程B
1.先挂起线程A,将其在CPU中的状态保存在内存中
2.在内存中检索下一个线程B的上下文,并将其在 CPU 的寄存器中恢复,开始执行B线程
3.当B执行完,根据程序计数器中指向的位置恢复线程A
5 并发与并行
- 并发: 同一时间段,多个任务都在执行 (单位时间内不一定同时执行,可以来回切换);
- 并行: 单位时间内,多个任务同时执行
6 线程的生命周期与状态
- 线程创建之后它将处于 NEW(新建) 状态
- 调用
start()
方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。以上两种状态都属于 RUNNABLE - 当线程执行
wait()
或join()
方法之后,线程进入 WAITING(等待) 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而 TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过sleep(long millis)
、join(long millis)
方法或wait(long millis)
方法可以将 Java 线程置于 TIMED_WAITING 状态。当超时时间到达后 Java 线程将会返回到 RUNNABLE 状态 - 当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到 BLOCKED(阻塞) 状态
- 线程在执行 Runnable 的
run()
方法之后将会进入到 TERMINATED(终止) 状态
反复调用同一个线程的start()方法是否可行?假如一个线程执行完毕(此时处于TERMINATED状态),再次调用这个线程的 start() 方法是否可行?
两个问题的答案都是不可行,因为 threadStatus 的值会改变,调用 start() 的前提是 threadStatus==0 ,此时再次调用 start() 方法会抛 IllegalThreadStateException异常
二 线程间的通信和同步
另参考 十一、同步工具类
1 线程同步:锁(synchronized / Lock)
- 线程同步的根本目的是让线程按照一定的顺序执行
- 多个线程同时访问的可变数据才需要通过锁保护
- 每个共享、可变的变量都只由一个锁保护
- 对于包含多个变量的不变性条件,每个变量都由同一个锁保护
2 线程同步:wait-notify
- 基于
Object
类的wait()
方法和notify()
(随机叫醒一个正在等待的线程) ,notifyAll()
(叫醒所有正在等待的线程) 方法实现 - 每个对象除了拥有用于锁的等待队列,还有一个条件队列用于线程间的协作(两个队列存放的都是线程对象)
wait()
和notify()
只能在对应的synchronized
代码块内被调用,否则会抛出异常- 对应指的是只能调用
synchronized
保护的对象的wait()
和notify()
- 下面的例子中,
synchronized
保护的是子线程对象this
,所以调用的也是this
的wait()
和notify()
方法
- 对应指的是只能调用
- 调用
wait()
的具体过程- 将当前线程放入条件等待队列,释放对象锁,阻塞等待线程状态变为
WAITING/TIMED_WAITING
- 到达等待时间后,或者被其它线程调用
notify()/notifyAll()
,从条件等待队列中移除,尝试竞争对象锁- 如果获得锁,线程状态变为
RUNNABLE
,并从wait()
调用中返回 - 否则加入对象锁的等待队列,线程状态变为
BLOCKED
,只有获得锁后才从wait()
调用中返回
- 如果获得锁,线程状态变为
- 从
wait()
返回后,线程重新获得锁,但不代表等待的条件一定成立,需要重新检查条件
synchronized(obj) {while(条件不成立) {obj.wait();}满足条件后的操作 }
- 将当前线程放入条件等待队列,释放对象锁,阻塞等待线程状态变为
- 调用
notify()
会唤醒条件队列的线程并将其移除,但不会立刻释放锁,只有所在的同步代码块执行完后才会释放锁,被唤醒的线程获得锁后才会从wait()
返回 wait-notify
DEMO
public static void main(String[] args) throws InterruptedException {WaitThread waitThread = new WaitThread();waitThread.start(); // 子线程启动Thread.sleep(2000L);waitThread.setFlagTrue(); // 主线程改变标志位}class WaitThread extends Thread {private volatile boolean flag = false;@Overridepublic void run() {System.out.println("sub thread running");try {synchronized (this) {// 子线程执行循环while (!flag) {wait(); // 子线程加入子线程对象的条件队列,释放锁,等待唤醒}// 直到主线程将flag修改为true,并调用notify()唤醒子线程,跳出循环...System.out.println("loop end");}} catch (InterruptedException ignored) {}}public synchronized void setFlagTrue() {// 主线程修改flag,并唤醒在子线程对象的条件队列上的子线程flag = true;System.out.println("main thread call notify()");notify();}
}输出结果:
sub thread running
main thread call notify()
loop end
3 线程同步:volatile 信号量
- 参见
volatile
部分
4 线程通信:管道
- JDK提供了
PipedWriter
、PipedReader
、PipedOutputStream
、PipedInputStream
- 前面两个是基于字符的(处理单元为2字节),后面两个是基于字节流的(处理单元为1字节)
三 线程死锁
1 死锁的四个必要条件
互斥条件
:该资源任意一个时刻只由一个线程占用请求与保持条件
:一个进程因请求资源而阻塞时,对已获得的资源保持不放不剥夺条件
:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源循环等待条件
:若干进程之间形成一种头尾相接的循环等待资源关系
2 死锁解决:预防死锁、避免死锁、检测与解除死锁
- 预防死锁
- 破坏请求与保持条件 :一次性申请所有的资源,或者使用超时机制主动释放锁
- 破坏不剥夺条件 :占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源
- 破坏循环等待条件 :获取一系列锁时,按某一顺序申请资源,反序释放
- 避免死锁——银行家算法
- 检测与解除死锁
操作系统:死锁
3 避免死锁
开放调用
- 利用 开放调用(不将方法设置为
synchronized
) 避免同时持有多个锁,但这种做法也削弱了原子性- 这种方式适用于 业务无需原子性 的场景
- 如果在持有锁的情况下调用某个外部方法,就需要警惕可能出现死锁
- 示例:车队和出租车
// 不使用开放调用class Taxi {private Position location;private Position destination;private Dispatcher dispatcher;public synchronized void setLocation(Position location) {this.location = location;if (location.equals(destination)) {dispatcher.notifyAvailable(this); // 持有本车的锁,请求车队的锁}}public synchronized Position getLocation() {return location;}}class Dispatcher {private Set<Taxi> taxis;private Set<Taxi> availableTaxis;public synchronized void notifyAvailable(Taxi taxi) {availableTaxis.add(taxi);}public synchronized void showAllLocations() {for (Taxi t : taxis) {Position location = t.getLocation(); // 持有车队的锁,获取车的锁// ...}}}
// 使用开放调用class Taxi {private Position location;private Position destination;private Dispatcher dispatcher;public void setLocation(Position location) {boolean reach = false;// 仅持有车的锁synchronized (this) {this.location = location;if (location.equals(destination)) {reach = true;}}if (reach) {// 仅持有车队的锁dispatcher.notifyAvailable(this);}}public synchronized Position getLocation() {return location;}}class Dispatcher {private Set<Taxi> taxis;private Set<Taxi> availableTaxis;public synchronized void notifyAvailable(Taxi taxi) {availableTaxis.add(taxi);}public void showAllLocations() {Set<Taxi> copy;// 仅持有车队的锁synchronized (this) {copy = new HashSet<>(this.taxis);}for (Taxi t : copy) {// 仅持有车的锁Position location = t.getLocation();// ...}}}
限制锁顺序*
- 如果需要同时持有多个锁,确保线程在获取多个锁的时候采用一致的顺序
- 示例:转账,总是先获得ID更小的账户的锁
class LockByOrderDemo {/*** 加时锁,只有两个转账账户无法比较时使用* 如果ID不重复则不会使用*/private static final Object lock = new Object();public void transferMoney(Account from, Account to, double amount) {// 优先加锁id小的账户if (from.getId() < to.getId()) {synchronized (from) {synchronized (to) {if (from.getBalance() > 0) {from.decrease(amount);to.increase(amount);}}}} else if (from.getId() > to.getId()) {synchronized (to) {synchronized (from) {if (from.getBalance() > 0) {from.decrease(amount);to.increase(amount);}}}} else {// 如果重复则需要加时锁,保证每次只有一个线程以未知的顺序获取两个账户的锁synchronized (lock) {synchronized (from) {synchronized (to) {if (from.getBalance() > 0) {from.decrease(amount);to.increase(amount);}}}}}}
}
四 并发编程的相关方法
1 sleep() 与 wait()
sleep()
方法没有释放锁,而wait()
方法释放了锁(从wait()
返回后线程又重新获得锁)- 都可以暂停线程的执行
wait()
通常被用于线程间交互/通信,sleep()
通常被用于暂停执行wait()
无参方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的notify()
或者notifyAll()
方法,且线程竞争到锁之后,才能从方法中返回;sleep()
方法执行完成后,线程会自动苏醒wait()
可以指定时间,也可以不指定;而sleep()
必须指定时间wait()
释放CPU资源,同时释放锁;sleep()
释放CPU资源,但是不释放锁,所以易死锁
为什么 sleep 函数的精度很低?
- sleep函数并不能起到定时的作用,主要作用是延时
- 在一些多线程中可能会看到sleep(0),其主要目的是让出时间片
- 当系统越繁忙的时候它精度也就越低,因为它的精度取决于线程自身优先级、其他线程的优先级,以及线程的数量等因素
2 run() 与 start()
- 调用
start()
方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。start()
会执行线程的相应准备工作,然后自动执行run()
方法的内容,实现多线程工作 - 直接执行
run()
方法,会把run()
方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以不是多线程工作
3 join()
- 在 a 线程中调用 b 线程的
join()
方法,线程 a 进入阻塞状态,直到线程 b 完全执行完,线程 a 从阻塞状态中恢复 - 如果主线程想等待子线程执行完毕后,获得子线程中的处理完的某个数据,就使用
join()
public static void main(String[] args) {Thread t = new MyThread();thread.start();thread.join();// 主线程的其它功能...(会在子线程执行完后开始执行)
}
4 yield()
- 静态方法,通知操作系统调度器,当前线程可以让出 CPU 时间片
- 仅作为建议,也可能被完全忽略
5 线程创建的四种方法
Runnable
接口不会返回结果或抛出检查异常,Callable
接口可以- 如果任务不需要返回结果或抛出异常推荐使用
Runnable
,这样代码看起来会更加简洁
- 通过继承
Thread
类,并重写run()
方法,使用时直接创建该类的对象
public class Demo {public static class MyThread extends Thread {@Overridepublic void run() {System.out.println("MyThread");}}public static void main(String[] args) {Thread myThread = new MyThread();myThread.start();}
}
- 实现
Runnable
接口并重写run()
方法,使用时将该类的对象作为参数传递到Thread
类的构造方法中- 这种方法不受单继承的限制,更适合处理多线程共享数据的情况
public class Demo {public static class MyThread implements Runnable {@Overridepublic void run() {System.out.println("MyThread");}}public static void main(String[] args) {new Thread(new MyThread()).start();}
}
- 实现
Callable
接口,并重写call()
方法call()
可以有返回值- 可以抛出异常,被外面的操作捕获,获取异常的信息
- 支持泛型,泛型的类型即返回值的类型
//1.创建一个实现Callable的实现类
class NumThread implements Callable{//2.实现call方法,将此线程需要执行的操作声明在call()中@Overridepublic Object call() throws Exception {int sum = 0;for (int i = 1; i <= 100; i++) {if(i % 2 == 0){System.out.println(i);sum += i;}}return sum;}
}public class ThreadNew {public static void main(String[] args) {//3.创建Callable接口实现类的对象NumThread numThread = new NumThread();//4.将此Callable接口实现类的对象作为传递到FutureTask构造器中,创建FutureTask的对象FutureTask futureTask = new FutureTask(numThread);//5.将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象,并调用start()new Thread(futureTask).start();try {//6.获取Callable中call方法的返回值//get()返回值即为FutureTask构造器参数Callable实现类重写的call()的返回值。Object sum = futureTask.get();System.out.println("总和为:" + sum);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
- 通过线程池,工程中的唯一合法方法
- 提高响应速度(减少创建新线程的时间)
- 降低资源消耗(重复利用线程池中线程,不需要每次都创建)
- 便于线程管理
public class ThreadPool {public static void main(String[] args) {//1. 提供指定线程数量的线程池ExecutorService service = Executors.newFixedThreadPool(10);ThreadPoolExecutor service1 = (ThreadPoolExecutor) service;//2.执行指定的线程的操作。需要提供实现Runnable接口或Callable接口实现类的对象service.execute(new NumberThread());//适合适用于Runnable// service.submit(Callable callable);//适合使用于Callable//3.关闭连接池service.shutdown();}}
6 获取与设置优先级、守护线程
- Java 只是给操作系统一个优先级的 参考值,线程最终在操作系统的调用顺序由操作系统的线程调度算法决定的
- 优先级获取:
thread_instance.getPriority()
- 优先级设置:
setPriority(int LEVEL)
,默认5,最高10,最低1,优先级越高,先执行的 概率 更大 - 线程组也具有优先级,如果某个线程优先级大于线程所在线程组的最大优先级,那么该线程的优先级被线程组的最大优先级取代
- 守护线程默认的优先级比较低
- 优先级获取:
- 避免使用线程优先级,否则会增加平台依赖性
- 所有的非守护线程都结束了,守护线程也会自动结束(直接抛弃,不会执行
finally
块) - 新创建的线程会继承创建者的守护状态,因此默认主线程创建的所有线程都是非守护线程
五 synchronized
详细内容
1 简介
- Java 多线程的锁都是基于对象的,Java中的每一个对象都可以作为一个锁
- 阻塞、悲观(CAS 非阻塞、乐观)
- 解决多个线程之间访问资源的同步性
- 保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行(线程间互斥)
- 同时保证内存可见性(同一个锁同步的不同线程,看到的都是共享变量的最新值)
Happens-Before
- 上图展示了两个线程使用同一个锁时,它们之间的
Happens-Before
关系- JMM为程序中的所有操作定义的
偏序关系
- 要保证执行操作B的线程看到操作A的结果(无论A和B是否由同一个线程执行),A和B之间必须满足
Happens-Before
关系- 如果不满足,则JVM可以对操作A和B进行任意重排序
- 在 Java 早期版本中,
synchronized
属于 重量级锁,效率低下- 因为操作系统实现线程之间的切换时需要从用户态转换到内核态(通过
trap
指令)
- 因为操作系统实现线程之间的切换时需要从用户态转换到内核态(通过
2 使用方法
- 修饰实例方法(锁定实例):作用于当前对象实例加锁,进入同步代码前要获得当前对象实例的锁
synchronized void method() {// TODO
}
- 修饰静态方法(锁定类):给当前类加锁,会作用于类的所有对象实例 ,进入同步代码前要获得当前 class 的锁。如果一个线程 A 调用一个实例对象的非静态
synchronized
方法,而线程 B 需要调用这个实例对象所属类的静态synchronized
方法,是允许的,不会发生互斥现象,因为访问静态synchronized
方法占用的锁是当前类的锁,而访问非静态synchronized
方法占用的锁是当前实例对象锁。
synchronized static void method() {// TODO
}
- 修饰代码块(指定锁定类型):指定加锁对象,对给定对象/类加锁
synchronized(this|object)
表示进入同步代码库前要获得给定对象的锁synchronized(类.class)
表示进入同步代码前要获得 当前 class 的锁
synchronized(this) {// TODO
}
3 注意事项
- 不要使用
synchronized(String a)
,因为字符串常量池具有缓存功能 - 构造方法不能使用
synchronized
关键字修饰。因为构造方法本身就属于线程安全的
六 volatile
1 作用
- 禁止 JVM 的指令重排
- 指令重排可以保证串行语义一致,但是没有义务保证多线程间的语义也一致
- 指示 JVM,变量是共享且不稳定的,每次使用它都到主存中进行读取(保证变量的可见性),并在使用完毕后写回内存
- 在当前的 Java 内存模型下,线程可以把变量保存本地内存(比如机器的寄存器,CPU cache)中,而不是直接在主存中进行读写
- 可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致
2 双重校验锁实现 单例模式(线程安全)
public class Singleton {private volatile static Singleton uniqueInstance; // 对象实例,需要用volatile修饰private Singleton() { // 构造方法,设置为private}public static Singleton getUniqueInstance() { // 创建实例//先判断对象是否已经实例过,没有实例化过才进入加锁代码if (uniqueInstance == null) {//类对象加锁,保证只能创建一个实例synchronized (Singleton.class) {if (uniqueInstance == null) {uniqueInstance = new Singleton();// 执行过程:// 1.为 uniqueInstance 分配内存空间// 2.初始化 uniqueInstance// 3.将 uniqueInstance 指向分配的内存地址}}}return uniqueInstance;}
}
- 必须使用
volatile
关键字的原因:- 由于 JVM 具有指令重排的特性,
uniqueInstance = new Singleton()
执行顺序有可能变成注释中的 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例(仅仅是刚分配了内存空间) - 线程 T1 执行了 1 和 3,此时 T2 调用
getUniqueInstance()
后发现 uniqueInstance 不为空,因此返回 uniqueInstance,但此时 uniqueInstance 还未被初始化 - 使用
volatile
禁止 JVM 的指令重排,保证在多线程环境下也能正常运行
- 由于 JVM 具有指令重排的特性,
3 与 synchronized 的关系
synchronized
关键字和volatile
关键字是互补而非对立的关系volatile
关键字是线程同步的 轻量级 实现,性能比synchronized
关键字好volatile
关键字只能用于变量(仅仅保证对单个volatile变量的读/写具有原子性),而synchronized
关键字可以修饰方法以及代码块volatile
保证变量在多个线程之间的 可见性 ,而synchronized
可以保证 变量的可见性 和 操作的原子性
4 volatile 的实现——内存屏障*
屏障类型 | 作用 |
---|---|
LoadLoad | 对于语句 Load1; LoadLoad; Load2 ,在 Load2 及后续读取操作要读取的数据被访问前,保证 Load1 要读取的数据被读取完毕 |
StoreStore | 对于语句 Store1; StoreStore; Store2 ,在 Store2 及后续写入操作执行前,保证 Store1 的写入操作对其它处理器可见 |
LoadStore | 对于语句 Load1; LoadStore; Store2 ,在 Store2 及后续写入操作被刷出前,保证 Load1 要读取的数据被读取完毕 |
StoreLoad | 对于语句 Store1; StoreLoad; Load2 ,在 Load2 及后续所有读取操作执行前,保证 Store1 的写入对所有处理器可见 |
- JVM 的实现会在
volatile
读写前后均加上内存屏障,在一定程度上保证有序性 - 内存屏障的保守策略:
在每个 写入操作前 插入 StoreStore 屏障,写入操作后 插入 StoreLoad 屏障
在每个 读操作后 插入 LoadLoad 屏障和 LoadStore 屏障
5 使用 volatile 必须满足的条件*
- 变量修改后的值不依赖原值,或能够确保同一时间只有单一线程修改变量值
- 变量不需要和其它状态变量共同构成不变性约束
- 违反上述条件时,证明操作变量同时需要满足原子性,因此仅
volatile
不满足线程安全的要求,需要加锁
class VolatileThreadUnsafeDemo {private volatile int count = 0;private void increase() {count++; // 新值依赖原值,违背了条件1}public void test() {int threadCount = 20;for (int i = 0; i < threadCount; i++) {new Thread(() -> {for (int j = 0; j < 10000; j++) {increase();}}).start();}while (Thread.activeCount() > 1) {Thread.yield();}System.out.println(count); // 结果是随机的}
}
七 ReentrantLock
该部分的参考
详细内容
1 可重入锁
- 可重入锁,指的是一个线程能够对一个临界资源重复加锁
- AQS 有一个变量 state 用于记录同步状态:
- 初始情况下,
state = 0
,表示ReentrantLock
目前处于解锁状态 - 如果有线程调用
lock()
方法进行加锁,state = 1
,如果该线程再次调用lock()
方法加锁,就执行state++
- 线程每调用一次
unlock()
方法释放锁,会让state--
- 通过查询
state
的数值,即可知道ReentrantLock
被重入的次数了
- 初始情况下,
- Demo
- 现在有方法 m1 和 m2,两个方法均使用了同一把锁对方法进行同步控制,同时方法 m1 会调用 m2
- 线程 t 进入方法 m1 成功获得了锁,此时线程 t 要在没有释放锁的情况下,调用 m2 方法
- 由于 m1 和 m2 使用的是同一把可重入锁,所以线程 t 可以进入方法 m2,并再次获得锁,而不会被阻塞住
- 假如 lock 是不可重入锁,那么上面的示例代码必然会引起死锁情况的发生
void m1() {lock.lock();try {// 调用 m2,因为可重入,所以并不会被阻塞m2();} finally {lock.unlock()}
}void m2() {lock.lock();try {// do something} finally {lock.unlock()}
}
2 和 synchronized 异同
- 都是基于互斥同步的方法:通过互斥达到同步
synchronized
使用的是对象或类进行加锁,而ReentrantLock
内部是通过AQS(AbstractQueuedSynchronizer)
中的同步队列进行加锁- 公平与非公平指的是线程获取锁的方式:
- 公平模式下,线程在同步队列中通过 FIFO 的方式获取锁,每个线程最终都能获取锁,缺点是效率低
- 在非公平模式下,线程会通过“插队”的方式去抢占锁,抢不到的则进入同步队列进行排队,缺点是可能出现线程饥饿
class Window implements Runnable{private int ticket = 100;//1.实例化ReentrantLockprivate ReentrantLock lock = new ReentrantLock();@Overridepublic void run() {while(true){try{//2.调用锁定方法lock()lock.lock();if(ticket > 0){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + ":售票,票号为:" + ticket);ticket--;}else{break;}}finally {//3.调用解锁方法:unlock()lock.unlock();}}}
}
八 并发容器
1 ConcurrentHashMap
- 数据结构
类型 | 数据结构 | 使用的锁 |
---|---|---|
ConcurrentHashMap JDK1.7 | Segment 数组 + HashEntry 数组 + 链表 | Segment (本质是 ReentrantLock ),每次锁若干 HashEntry |
ConcurrentHashMap JDK1.8 | Node 数组 + 链表/红黑树 | synchronized ,每次锁一个 Node |
Hashtable | 数组+链表 | synchronized ,每次锁全表 |
- 在 JDK1.7 的时候,
ConcurrentHashMap
采用分段锁机制,对整个桶数组进行了分割分段(Segment,每个 Segment 都是一个可重入锁),每一个 Segment 只锁容器其中一部分数据,多线程访问容器里不同数据段的数据不会存在锁竞争,提高并发访问率
static class Segment<K,V> extends ReentrantLock implements Serializable {...}
- JDK1.8 的时候已经摒弃了
Segment
的概念,synchronized
只锁定当前链表或红黑二叉树的首节点,并发控制使用synchronized
和 CAS 来操作,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本
- Hashtable(同一把锁) :使用
synchronized
来保证线程安全,效率非常低下- 当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈效率越低
- 当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈效率越低
扩容问题**
扩容需要解决的问题
- 创建新数组的时候,只能由一个线程创建
- 拷贝数据的时候,已经拷贝过的数据不能重复拷贝
- 拷贝数据的时候,一个桶只能由一个线程负责
- 多个线程如何协作
扩容流程
- 计算每个CPU核一轮处理桶的个数,最小是16
- 修改
transferIndex
标志位,每个线程领取完任务就减去多少 - 领取完任务之后就开始处理,如果桶为空就设置为
ForwardingNode
,如果不为空就加锁拷贝,拷贝完成之后也设置为ForwardingNode
节点 - 如果某个线程分配的桶处理完了之后,再去申请,发现
transferIndex = 0
,这个时候就说明所有的桶都领取完了,但是别的线程领取任务之后有没有处理完并不知道,该线程会将sizeCtl
的值减1 - 最后一个线程处理完,发现
sizeCtl - 2 = rs<< RESIZE_STAMP_SHIFT
(此时的sizeCtl
高16位是本次扩容的标识,低16位值为2,即只有当前线程正在参与扩容),才会将旧数组用新数组覆盖,并且会重新设置sizeCtl
的值为0.75n,作为新数组的扩容阈值
sizeCtl
private transient volatile int sizeCtl;
- 用于标识
ConcurrentHashMap
的状态 sizeCtl
的高16位是标志位(每一轮扩容生成的一个唯一的标志),低16位等于 参与扩容的线程数+1- 如果最后一个帮助扩容的线程在结束时,发现
sizeCtl - 2 == rs<<RESIZE_STAMP_SHIFT(值为16)
,说明所有参与扩容的线程都执行完,即扩容完毕,rs 是本次扩容的标志
sizeCtl | 含义 |
---|---|
-1 | 正在初始化 |
0.75n | 正常状态,代表扩容阈值 |
其它负值 | 有其他线程正在扩容 |
- 为什么在扩容时为负值:因为
sizeCtl
最初是rs
带符号左移16位的结果,它的符号位为1,所以数值为负
transferIndex
- 扩容时用于标志,下一个线程要处理的桶的开始位置,每个线程每次领取指定数目的桶进行数据的拷贝
- 在刚开始扩容时值为旧数组的长度
- 比如初始大小是
transferIndex = table.length = 64
,每个线程领取的桶个数是16,第一个线程领取完任务后transferIndex = 48
,也就是说第二个线程这时进来是从第48个桶开始处理,再减去16,依次类推,这就是多线程协作处理的原理
ForwardingNode
- 在旧数组中起标记作用,表示其他线程正在扩容,并且此节点已经扩容完毕
- 关联了新数组
nextTable
,扩容期间可以通过find
方法,访问已经迁移到了nextTable
中的数据
2 写时复制:CopyOnWriteArrayList / CopyOnWriteArraySet
- 线程安全,以优化读为目标,牺牲了写的性能,适用于读多写少的场景
CopyOnWriteArrayList
基于ReentrantLock
实现;CopyOnWriteArraySet
基于CopyOnWriteArrayList
实现- 读读、读写可以并行,但多个线程不能同时写,每个写操作需要先获得锁
- 写时复制:对一块内存进行修改时,不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后将指向原来内存指针指向新的内存(整个过程是原子的),原来的内存就可以被回收掉了
3 并发队列
- 无锁非阻塞队列:
ConcurrentLinkedQueue
、ConcurrentLinkedDeque
- 通过 CAS 操作实现非阻塞
- 基于链表实现
- 普通阻塞队列:
ArrayBlockingQueue
、LinkedBlockingQueue
、LinkedBlockingDeque
- 都实现了
BlockingQueue
接口,适合用于作为数据共享的通道 - 都基于
ReentrantLock
和Condition
实现 - 广泛使用在 “生产者-消费者” 问题中,其原因是
BlockingQueue
提供了可阻塞的插入和移除的方法:当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止
- 都实现了
- 优先级阻塞队列:
PriorityBlockingQueue
- 延时阻塞队列:
DelayQueue
- 其它阻塞队列:
SynchronousQueue
- 没有存储元素的空间,如果没有其它线程在等待从队列中接受元素,put 操作就会等待
- take 操作同样需要等待其它线程向队列中放元素,如果没有也会等待
4 基于跳表:ConcurrentSkipListMap / ConcurrentSkipListSet
- 关于跳表
- 跳表基于链表,在链表的基础上增加了多层索引的结构
- 高层的索引节点一定同时是底层的索引节点,每个索引节点有两个指针:一个向右指向同层的索引节点;另一个指向下层的索引节点或基本链表节点
- 每层的节点有 1/2 的概率成为更高一层的节点,基于此可以实现类似二分查找的操作
- 跳表内所有的元素都是排序的,对跳表进行遍历会得到有序的结果,适用于数据需要有序的环境
ConcurrentSkipListMap / ConcurrentSkipListSet
数据有序,没有使用锁,所有操作都是非阻塞的,包括写
九 异步任务执行服务、线程池
参考链接
0 基本接口和类 & 提交任务的方法
接口 | 功能 |
---|---|
Runnable, Callable | 要执行的异步任务 |
Executor, ExecutorService | 执行服务 |
Future | 异步执行的结果,包含返回值或异常 |
Executor
是一个顶层接口,在它里面只声明了一个方法execute(Runnable)
,返回值为void
,参数为Runnable
类型ExecutorService
接口继承了Executor
接口,并声明了一些方法:submit
、invokeAll
、invokeAny
以及shutDown
等- 抽象类
AbstractExecutorService
实现了ExecutorService
接口,基本实现了ExecutorService
中声明的所有方法execute(task)
异步执行,无返回值submit(task)
异步执行,返回Future
对象,可通过task.get()
同步到主线程invoke(task)
一直阻塞到任务执行完成返回
1 为什么使用线程池
- 创建/销毁线程需要消耗系统资源,线程池可以 复用已创建的线程
- 控制并发的数量:并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃
- 统一管理线程,通过线程工厂可以指定创建线程的行为(如命名)
- 只有提交给线程池的任务是 同类型 且 相互独立 时,线程池的性能才能达到最佳
- 如果将运行时间较长和较短的任务提交给一个线程池,可能会产生饥饿
- 如果一些任务依赖于其它任务的执行结果,可能会产生死锁
2 ThreadPoolExecutor 构造方法
参数 | 含义 | 说明 | 是否必须 |
---|---|---|---|
int corePoolSize | 线程池中核心线程数最大值 | 一旦创建,核心线程默认情况下会一直存在于线程池中(核心线程不会预先创建,有任务时才会创建);非核心线程如果长时间的闲置就会被销毁 | 是 |
int maximumPoolSize | 线程池中线程总数最大值 | 核心线程数量 + 非核心线程数量 | 是 |
long keepAliveTime | 非核心线程闲置超时时长 | 非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置 allowCoreThreadTimeOut(true) ,则会也作用于核心线程 | 是 |
TimeUnit unit | keepAliveTime 的单位 | 枚举类型 | 是 |
BlockingQueue workQueue | 阻塞队列,维护着等待执行的 Runnable 任务对象 | 下面补充说明 | 是 |
ThreadFactory threadFactory | 线程创建工厂 | 用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂 | 否 |
RejectedExecutionHandler handler | 拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略 | 下面补充说明 | 否 |
-
常用的阻塞队列
LinkedBlockingQueue
:底层数据结构是链表,默认大小是Integer.MAX_VALUE
,也可以指定大小ArrayBlockingQueue
:底层数据结构是数组,需要指定队列的大小SynchronousQueue
:同步队列(容量为0),要将一个任务放到该队列中,必须有一个线程等待接收这个任务,否则执行拒绝策略DelayQueue
:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素
- 只有任务相互独立时,设置最大线程数、最大队列长度才是合理的,否则可能会出现死锁
- 如果任务之间存在依赖,应该使用无界的线程池
-
有关拒绝处理策略
ThreadPoolExecutor.AbortPolicy
:默认策略,丢弃任务并抛出RejectedExecutionException
异常ThreadPoolExecutor.DiscardPolicy
:丢弃新来的任务,但是不抛出异常ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列头部(也就是最旧的,如果是优先队列则丢弃优先级最高的)任务,然后重新尝试执行程序,如果再次失败,重复此过程ThreadPoolExecutor.CallerRunsPolicy
:由主线程处理该任务,此时主线程暂停向队列中提交新任务
ThreadPoolExecutor.CallerRunsPolicy
在执行时,主线程处于忙碌状态,新到达的请求会保存在 TCP 队列中- 如果持续下去可能造成 TCP 队列丢弃请求,这种现象最终会蔓延到客户端,导致性能降低
3 ThreadPoolExecutor 的状态 & 扩展
状态 | 说明 |
---|---|
RUNNING | 线程池创建后处于 RUNNING 状态 |
SHUTDOWN | 调用 shutdown() 方法后处于 SHUTDOWN 状态,线程池不能接受新的任务,正在执行的线程不中断,完成阻塞队列的任务(存疑) |
STOP | 调用 shutdownNow() 方法后处于 STOP 状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,工作线程全部停止,阻塞队列为空 |
TIDYING | 当所有的任务已终止,线程池会变为 TIDYING 状态,接着会执行 terminated() 函数 |
TERMINATED | 线程池处在 TIDYING 状态时,并且执行完 terminated() 方法之后 , 线程池被设置为 TERMINATED 状态 |
class ThreadPoolExecutorExtension extends ThreadPoolExecutor {public ThreadPoolExecutorExtension(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {// 任务执行前...}@Overrideprotected void afterExecute(Runnable r, Throwable t) {// 任务执行后...}@Overrideprotected void terminated() {// 线程池关闭...}
}
4 ThreadPoolExecutor 任务处理流程
- 线程总数量 <
corePoolSize
,无论当前已经创建的线程是否空闲,都会新建一个核心线程执行任务(在核心线程数量 <corePoolSize
时,让核心线程数量快速达到corePoolSize
) - 线程总数量 >=
corePoolSize
时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(线程复用)。在加入队列前后,都进行线程池状态是否是RUNNING
的检查 - 当缓存队列满了,说明这个时候任务非常多,创建非核心线程去执行这个任务
- 缓存队列满了, 且总线程数达到了
maximumPoolSize
,则会采取拒绝策略进行处理
为什么在步骤2中,要二次检查线程池的状态?
- 在多线程的环境下,线程池的状态是时刻发生变化的
- 有可能刚获取线程池状态后线程池状态就改变了。判断是否将 command 加入workqueue 是线程池之前的状态
- 倘若没有二次检查,万一线程池处于非 RUNNING 状态(在多线程环境下很有可能发生),那么 command 永远不会执行
- 类似于单例模式的双重校验
5 线程池大小的设置
- 假设机器有N个CPU
- 对于计算密集型任务(计算需要占用CPU),应该设置线程数为 N+1。多设置一个线程的目的是,计算密集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停,有一个“额外”的线程可以确保在这种情况下CPU周期不会中断工作
- 对于I/O密集型任务(I/O不占用CPU),应该设置线程数为 2N
- 对于同时有计算工作和I/O工作的任务,考虑使用两个线程池分别处理两种任务
6 ForkJoinPool
参考链接
- 采用分治方法处理任务
- 每个线程对应一个双端队列 Deque 用于存放任务,并采用工作窃取机制
- 构造函数参数
参数 | 含义 |
---|---|
parallelism | 线程池最大线程数量和该参数值有关系,但不是绝对的关联(有依据但并不全由它决定) |
factory | 线程创建工厂 |
handler | 线程因未知异常而终止的回调处理,执行的任务中出现异常,并从任务中被抛出时,会被 handler 捕获 |
asyncMode | 当设置为 ture 的时候,队列采用先进先出方式;反之则是采用后进先出的方式,默认false(参考上图) |
- 提交任务的两种方式:
RecursiveAction(类比Runnable)
、RecursiveTask<V>(类比Callable<V>)
class ForkJoinPoolDemo {private static final ForkJoinPool forkJoinPool = new ForkJoinPool();public static void addTask() {// 无返回值的任务 类比RunnableRecursiveAction recursiveAction = new RecursiveAction() {@SneakyThrows@Overrideprotected void compute() {Thread.sleep(5000);System.out.println("recursiveAction" + Thread.currentThread());}};// 有返回值的任务 类比CallableRecursiveTask<Integer> recursiveTask = new RecursiveTask<Integer>() {@SneakyThrows@Overrideprotected Integer compute() {Thread.sleep(5000);System.out.println("recursiveTask" + Thread.currentThread());return 1;}};// execute: 异步,无返回值forkJoinPool.execute(recursiveTask);forkJoinPool.execute(recursiveAction);// submit: 异步,返回Future,需要调用get同步结果ForkJoinTask<Integer> taskSubmit = forkJoinPool.submit(recursiveTask);ForkJoinTask<Void> actionSubmit = forkJoinPool.submit(recursiveAction);try {Integer res = taskSubmit.get();actionSubmit.get(); // Void} catch (InterruptedException | ExecutionException e) {// ...}// invoke: 阻塞并直接返回结果Integer taskInvoke = forkJoinPool.invoke(recursiveTask);forkJoinPool.invoke(recursiveAction);}
}
7 按完成顺序获取任务返回结果
- 如果向
Executor
提交一组任务,并在计算完成后获得结果(例如每下载一张图片就执行一次渲染,而非下载完所有图片再渲染)- 使用
ThreadPoolExecutor.submit(task)
并对返回值轮询调用future.get()
,效率低 - 使用
ThreadPoolExecutor.invoke(task)
会阻塞直到返回结果,同样效率低 - 使用
CompletionService.take()
可以在部分任务完成后,按完成的先后顺序获取任务future
- 使用
ExecutorCompletionService
是CompletionService
的实现,将计算任务委托给线程池执行
class CompletionServiceDemo {/*** 线程池*/private final Executor executor = Executors.newFixedThreadPool(4);public void test() {CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);Future<Integer> future5000 = completionService.submit(new CustomTask(5000));Future<Integer> future2000 = completionService.submit(new CustomTask(2000));Future<Integer> future8000 = completionService.submit(new CustomTask(8000)); for (int i = 0; i < 3; i++) {Future<Integer> future = null;try {future = completionService.take(); // 阻塞直到有任务完成,按照任务完成的先后顺序获得结果System.out.println(future.get()); // 输出结果 2000, 5000, 8000} catch (InterruptedException | ExecutionException e) {// ...}}}static class CustomTask implements Callable<Integer> {private final int i;public CustomTask(int i) {this.i = i;}@Overridepublic Integer call() throws Exception {Thread.sleep(i);return i;}}
}
8 线程池异常处理
- 通过自定义线程工厂的方式实现
- 在线程的
run()
方法中主动捕获异常 - 设置未捕获异常的处理器
Thread.UncaughtExceptionHandler
,被动处理Thread.UncaughtExceptionHandler
仅在使用execute(task)
向线程池提交任务时才生效
submit(task)
会在调用其返回值的get()
方法时将异常包装为ExecutionException
后重新抛出
- 在线程的
/**
* 自定义线程工厂
**/
class MyThreadFactory implements ThreadFactory {private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public MyThreadFactory(String namePrefix) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();this.namePrefix = namePrefix + "-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0) {@Overridepublic void run() {// 1.捕获未受检异常,例如RuntimeException(主动)// 不被捕获的未受检异常将使线程终结Throwable throwable = null;try {super.run();} catch (Throwable t) {throwable = t;} finally {// 处理策略,可以新建线程代替当前线程,也可以在活跃线程数足够多的时候不新建线程}}};// 2.处理未捕捉的异常(被动)// 仅在通过execute方法提交任务时才会用该方式处理异常// 使用submit提交的任务会在其返回值调用future.get()时将异常包装为ExecutionException后重新抛出t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {@Overridepublic void uncaughtException(Thread t, Throwable e) {System.out.println("do something to handle uncaughtException");}});return t;}
}
9 获取执行结果:Future
- 一般用于执行时间较长的任务
Future
可以把计算结果从执行计算的线程传递到获取计算结果的线程,也可以在任务执行中取消- 调用
future.get()
时,行为取决于任务的状态- 已完成:立即返回结果(或抛出异常)
Callable
对象内部的异常会被包装成ExecutionException
并抛出- 可以通过
e.getCause()
获得包装前的初始异常
- 未完成:阻塞直到任务完成(或抛出异常,抛出的异常包含超时)
- 被取消:抛出
CancellationException
- 已完成:立即返回结果(或抛出异常)
- 对执行完成的
Future
调用cancel()
不会产生影响 RunnableFuture
同时拓展了Runnable
(作为执行的任务)和Future
(作为任务执行的结果)
public class Solution {public static void main(String[] args) {// 创建Callable对象Callable<Integer> callable = () -> {if (System.currentTimeMillis() % 2 == 0) {throw new MyException(); // 自定义异常} else {Thread.sleep(2000);return 1;}};// 创建FutureTask对象FutureTask<Integer> futureTask = new FutureTask<>(callable);// 将FutureTask对象交付线程并执行Thread thread = new Thread(futureTask);thread.start();try {futureTask.get(10, TimeUnit.SECONDS);// 保持阻塞直到 1.执行完成 2.抛出异常(包含中断、超时、Callable内部checked异常、RuntimeException、Error)} catch (TimeoutException e) {// 超时...} catch (InterruptedException e) {// 中断...} catch (ExecutionException e) {// Callable内部异常,被包装后重新抛出if (e.getCause() instanceof MyException) {System.out.println("Callable Inner Exception, thrown by futureTask.get()");}} finally {// 1.如果任务已经结束,执行cancel不会产生任何影响// 2.如果任务正在运行,将会被中断futureTask.cancel(true);}}
}
实例 Memorizer
- 如果输入参数不在缓存中则执行计算;否则查询缓存并返回结果
class ComputeCache<K, V> {/*** 因为计算过程较长,Value使用Future*/private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<>();private V compute(K args) {while (true) {Future<V> result = cache.get(args);if (result == null) {// 缓存中无该值,执行计算Callable<V> callable = new Callable<V>() {@Overridepublic V call() throws Exception {// take a long time// Thread.sleep(100000);return null;}};FutureTask<V> futureTask = new FutureTask<>(callable);result = cache.putIfAbsent(args, futureTask); // 保证查询-放入操作是原子的,避免了put可能被多个线程同时执行if (result == null) { // 加入成功,开始执行计算result = futureTask;futureTask.run();}}try {// 阻塞直到拿到计算结果return result.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}
}
十 同步工具类
- 均基于 AQS 实现
1 CountDownLatch
- 参与的线程有不同的角色:有的负责倒计时,有的负责等待计数器变为0,且角色都可以有多个
- 不能重复使用
await()
解除阻塞的情况:- 计数值为0
- 中断
- 超时
/** 使用示例:等待线程池的任务全部执行完成 **/
public final void run(List<RuleDO> rules) {CountDownLatch countDownLatch = new CountDownLatch(taskList.size());for (ScanTask task : taskList) {// 向线程池加入任务threadPoolExecutor.execute(() -> {doScan(task, rules);// 计数值-1countDownLatch.countDown();});}try {// 在此阻塞,直到计数值为0,或发生中断,或超时countDownLatch.await();} catch (InterruptedException e) {// ...}
}
2 循环栅栏 CyclicBarrier
- 和
CountDownLatch
不同的是,参与的线程角色相同,且可以重复使用 - 构造方法参数
- 线程数(必选)
Runnable
类型,所有线程到达后执行的动作,由最后一个到达的线程执行(可选)
static void CyclicBarrierTest() {int subThreadNum = 5;// 所有子线程到达后执行的行为,由最后一个到达的子线程完成Runnable commonAction = new Runnable() {@Overridepublic void run() {System.out.println("all sub threads terminated, print by thread " + Thread.currentThread().getName());}};// 构造CyclicBarrierCyclicBarrier cyclicBarrier = new CyclicBarrier(subThreadNum, commonAction);for (int i = 0; i < subThreadNum; i++) {new Thread(() -> {System.out.println("round1: sub thread " + Thread.currentThread().getName() + " terminated");try {cyclicBarrier.await(); // 线程宣布到达} catch (InterruptedException | BrokenBarrierException e) {// ...}System.out.println("round2: sub thread " + Thread.currentThread().getName() + " terminated");try {cyclicBarrier.await(); // 线程宣布到达} catch (InterruptedException | BrokenBarrierException e) {// ...}}).start();}}执行结果:
round1: sub thread Thread-1 terminated
round1: sub thread Thread-0 terminated
round1: sub thread Thread-3 terminated
round1: sub thread Thread-2 terminated
round1: sub thread Thread-4 terminated
all sub threads terminated, print by thread Thread-4
round2: sub thread Thread-4 terminated
round2: sub thread Thread-3 terminated
round2: sub thread Thread-0 terminated
round2: sub thread Thread-1 terminated
round2: sub thread Thread-2 terminated
all sub threads terminated, print by thread Thread-2
3 信号量 Semaphore
- 用于控制同时访问某个资源的操作数量(例如实现资源池、对容器添加容量限制),用于有增有减的场景(
CountDownLatch
用于仅减的场景) - 设置
permits=1
时,和一般的锁仍有区别:- 不可重入
- 一般锁只能由持有锁的线程释放,
Semaphore
可以由任意线程释放
- 常用方法
acquire()
获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态
acquire(int permits)
获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态acquireUninterruptibly()
获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)tryAcquire()
尝试获得令牌,返回获取令牌成功或失败,不阻塞线程
tryAcquire(long timeout, TimeUnit unit)
尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程
release()
释放一个令牌,唤醒一个获取令牌不成功的阻塞线程
hasQueuedThreads()
等待队列里是否还存在等待线程
getQueueLength()
获取等待队列里阻塞的线程数
drainPermits()
清空令牌把可用令牌数置为0,返回清空令牌的数量
availablePermits()
返回可用的令牌数量。
- 使用示例:使用信号量控制任务向线程池的提交速率
class LimitTaskCommitWithSemaphoreDemo {private ExecutorService executorService; // 线程池拒绝策略 AbortPolicyprivate final Semaphore semaphore = new Semaphore(100); // 信号量上界 = 最大线程数 + 队列最大长度public void addTask(Runnable task) throws InterruptedException {semaphore.acquire(); // 信号量-1try {executorService.execute(() -> {task.run();semaphore.release(); // 执行完毕,信号量+1});} catch (RejectedExecutionException e) {semaphore.release(); // 添加失败,信号量+1}}
}
4 ThreadLocal
参考链接
ThreadLocal
类主要解决的就是让每个线程绑定自己的值,这个值不能被其它线程访问到ThreadLocal
对象一般声明为static
以便引用- 典型用途是提供上下文信息,比如在一个 Web 服务器中,一个线程执行用户请求,会访问一些共同的信息(请求信息、用户身份、数据库连接等),这些是线程执行过程中的全局信息,如果作为参数在不同代码间传递会很繁琐
public class RequestContext {public static class Request {...}private static ThreadLocal<String> userId = new ThreadLocal<>();private static ThreadLocal<Request> req = new ThreadLocal<>();// getter&setter...
}
ThreadLocalMap
- 每个 Thread 中都具备一个容器
ThreadLocalMap
,而ThreadLocalMap
可以存储以ThreadLocal
为 key (ThreadLocal
的弱引用),Object 对象为 value 的键值对 ThrealLocal
类可以通过Thread.currentThread()
获取到当前线程对象后,直接通过getMap(Thread t)
可以访问到该线程的ThreadLocalMap
对象ThreadLocalMap
不使用拉链法解决哈希冲突,而是向后探测:如果先遇到了空位置则直接插入;如果先遇到了 key 过期的数据则进行垃圾回收并替换
- 每个 Thread 中都具备一个容器
十一 线程中断
1 中断方法
interrupt()
仅是一种协作机制,不会强制终止线程- 总结:中断状态 true -> false 的情况
- 调用静态方法
Thread.interrupted()
- 抛出异常
InterruptedException
,例如对WAITING/TIMED_WAITING
状态的线程调用interrupt()
- 线程执行完成
- 调用静态方法
public class Thread {/*** 由*本线程对象*之外的线程调用,作用为:* 1.线程处于状态WAITING/TIMED_WAITING,则抛出InterruptedException* 2.线程处于状态RUNNABLE/BLOCKED,将中断标志位设置为true(仅告知"需要中断"而不采取其它措施)* 3.如果本线程对象处于状态NEW/TERMINATE,则无动作(不修改标志位)**/public void interrupt() {...}/*** 查看*本线程对象*的中断标志位* 如果线程执行完成则返回false,即使之前对它调用了 interrupt()**/public boolean isInterrupted() {...}/*** 静态方法,查看*调用该方法的线程*的中断标志位,并清除状态* 调用形式仅可以为 Thread.interrupted(),同时会将中断状态更新为false**/public static boolean interrupted() {...}
}
2 示例
- Demo1,仅有一个主线程
class ThreadInterruptDemo {protected void test() throws Exception {Thread.currentThread().interrupt(); // 将主线程的中断标志设为trueSystem.out.println("MainStatus after thread.interrupt(): " + Thread.currentThread().isInterrupted());boolean mainThreadInterruptStatus = Thread.interrupted(); // 查看当前线程(主线程)的中断状态,并将其重置为falseSystem.out.println("MainStatus return by Thread.interrupted(): " + mainThreadInterruptStatus);System.out.println("MainStatus after Thread.interrupted(): " + Thread.currentThread().isInterrupted());}
}返回结果:
MainStatus after thread.interrupt(): true
MainStatus return by Thread.interrupted(): true
MainStatus after Thread.interrupted(): false
- Demo2
class ThreadInterruptDemo {protected void test() throws Exception {Thread subThread = new Thread(() -> {System.out.println("Start: " + Thread.currentThread().isInterrupted());try {System.out.println("Before sleep: " + Thread.currentThread().isInterrupted());Thread.sleep(5000);} catch (InterruptedException e) {System.out.println("InterruptedException: " + Thread.currentThread().isInterrupted());// 捕获到InterruptedException后,会将中断状态重置为false,如果需要维持还需要重设为trueThread.currentThread().interrupt();}System.out.println("End: " + Thread.currentThread().isInterrupted());});subThread.start();subThread.interrupt(); // 如果子线程处于阻塞状态,则抛出InterruptedException;否则仅将子线程的中断状态设置为trueThread.sleep(10000); // 等待子线程执行完成System.out.println("SubThread end status: " + subThread.isInterrupted());}
}返回结果1:
Start: false
Before sleep: false
---此时调用subThread.interrupt()---
InterruptedException: false // 发生了InterruptedException,中断状态被重置为false
End: true // 在catch块里又将中断状态设置为true
---子线程执行完成---
SubThread end status: false返回结果2:
---此时调用subThread.interrupt()---
Start: true
Before sleep: true
InterruptedException: false // 证明了线程先中断状态true再sleep也会抛出异常!!!
End: true
---子线程执行完成---
SubThread end status: false
十二 性能与可伸缩性
1 相关概念
- 可伸缩性 定义
当增加计算资源(CPU、内存、存储容量、I/O带宽等)时,程序的吞吐量或处理能力相应地增加 - 首先保证程序正确,然后再提高运行速度,避免不成熟的优化
- 所有并发程序中都会包含串行部分,至少同步代码块是串行的
- Amdahl定律
程序的可伸缩性取决于必须串行执行的代码比例
2 多线程引入的开销
- 上下文切换
- 调度器划分合适的时间片大小,将线程切换的开销分摊给执行时间,提高整体吞吐量
- 上下文切换会使缓存失效
- 内存同步
- 来源于
synchronized
和volatile
保证的内存可见性 - 同步会增加共享内存总线上的通信量,而总线的带宽是有限的,所有处理器都共享总线
- 来源于
- 阻塞
- 包含阻塞I/O、等待获取锁、在条件变量上等待
- 等待获得锁的线程会被阻塞,被阻塞的线程可以自旋等待或被操作系统挂起,直到解除阻塞(具体采用的方式和上下文切换的代价、阻塞时间有关)
- 被挂起的线程会引入两次切换:被阻塞的线程在自己的时间片用完之前被换出,在解除阻塞时由被换回
3 减少锁竞争
- Java程序中的串行操作主要来源于独占方式的资源锁
- 对于一个锁,可以通过以下方式减少锁竞争
- 减少锁的持有时间
- 减少锁的请求频率(降低锁粒度、避免热点域)
- 用非独占锁或非阻塞锁代替独占锁