文章目录
- 前言
- 一、CAS
- 1.1 CAS的概念
- 1.2 原子类
- 1.3 CAS的ABA问题
- 二、JUC中常用类
- 2.1 Callable接口
- 2.2 ReentrantLock(可重入)
- 2.3 Semaphore信号量
- 2.4 CountDownLatch类
- 2.5 CopyOnWriteArrayList类
- 2.6 ConcurrentHashMap
前言
对于多线程进阶的部分,更多总结的就是面试常考,但是工作中开发中不常用到的知识。
一、CAS
1.1 CAS的概念
CAS就是compare and swap的首字母缩写,意味着比较和交换,这样的一条指令即可完成比较和交换这一套操作,也就是说这套操作是原子的。
我们可以将CAS的流程想象成一个方法。
这里的交换其实思想上更偏向于赋值,因为一般更关注于内存地址address中的内容而不关心寄存器reg2中的内容,所以就可以近似说这里的操作就是将reg2的值赋给了address地址。
CAS一般就是cpu中的一条指令,所以操作系统为了使用它完成这样的操作就需要去提供这样的CAS的api。然后JVM又对这样的api进行了封装,使得我们在java中也能够使用CAS操作了。但是实际上这样的CAS操作被封装到了“unsafe”包当中,就是提醒大家容易出错,不鼓励直接使用CAS。
1.2 原子类
Java当中也有一些类对CAS进行了进一步的封装,就比如说原子类。
如上图的AtomicInteger就相当于对int进行了封装,对于它的++或者–操作都是原子的,实例代码如下:
package thread;import java.util.concurrent.atomic.AtomicInteger;public class Demo41 {public static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {Thread t = new Thread(() -> {for (int i = 0; i < 50000; i++) {//count++ 这里的对count的修改都是原子的count.getAndIncrement();//++count//count.incrementAndGet();//--count//count.decrementAndGet();//count--//count.getAndDecrement();//count+=10;//count.getAndAdd(10);}});Thread t1 = new Thread(() -> {for (int i = 0; i < 50000; i++) {count.getAndIncrement();}});t.start();t1.start();t.join();t1.join();System.out.println(count);}
}
这里的多线程代码就是经典的两个线程两个循环来计算count值,因为这里的count使用到了原子类的方法,所以加一操作是原子性的,自然不存在线程安全的问题,也能够得到正确结果。
那么使用这种原子性操作的意义是什么呢?意义就在于效率,因为锁是一个很重量级的操作,如果操作没有原子性在多线程的情况下就要加锁,但是我们可以使用CAS从而不去使用锁,从而提高代码效率。这一套基于CAS不加锁实现线程安全代码的方式,也被称为“无锁编程”。但是CAS这种方法也就仅仅适用于少数场景。
1.3 CAS的ABA问题
属于CAS的一个重要注意事项,CAS的核心就是“比较-发现相等-交换”->发现相等即数据没发生任何改变,但是相等不等于没改变过。可能值经历了一个从A到B再到A的过程,这种情况在极端环境下会产生问题。
如上图取款操作,假如我们要取500,情急之下,我们多按了两次取款按钮,此时产生了两个线程来进行扣款操作,但是如果在此时别人给你转了500,那么就会出现问题了。
如图左边是t1线程,右边是t2线程,t2线程完成扣款五百之后,此时t3线程给账户又转了500,此时应该不成立的t1线程的判断又成立了,导致又完成一次扣款。上述的过程就是典型的ABA问题所造成的bug,是非常极端的情况。
如何去避免这样的问题呢?可以约定一个版本号,每次进行扣款或存款都更新版本号,如果版本号没有改变数据就一定没变过。
通过版本号约束就可以避免这里的ABA问题,避免多次扣款。即使t3线程仍然给账户汇了500,但是此时版本号已经是2了,所以t1线程的版本号对不上,方法内部的扣款操作无法完成,所以即使有两个线程去扣款,扣的款也只有500。
二、JUC中常用类
JUC是java.util.concurrent这个包的首字母,在这里介绍一下这个包当中的常用类。
2.1 Callable接口
我们都知道Runnable接口用来表示一个待执行的任务,Callable接口和Runnable也是相似的,他也是用来表示一个待执行的任务,但是Callable有返回值,表示这个线程执行结束要得到的结果是啥。
public class Demo42 {private static int count = 0;public static void main(String[] args) throws InterruptedException, ExecutionException {//使用Runnable来求出1~100的和Thread t = new Thread(new Runnable() {@Overridepublic void run() {int result = 0;for (int i = 1; i <= 100; i++) {result += i;}//需要用成员变量来接收值 主线程和t线程的耦合程度高 如果有多个这样的线程就不方便了count = result;}});
以上给出了一段代码,就是使用类变量count来得到线程结果,这样的代码等线程多了之后很不方便,代码不够优雅。Callable就是用来解决上述代码的问题的。接下来给出全部代码用于对比:
package thread;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class Demo42 {private static int count = 0;public static void main(String[] args) throws InterruptedException, ExecutionException {//使用Runnable来求出1~100的和Thread t = new Thread(new Runnable() {@Overridepublic void run() {int result = 0;for (int i = 1; i <= 100; i++) {result += i;}//需要用成员变量来接收值 主线程和t线程的耦合程度高 如果有多个这样的线程就不方便了count = result;}});// t.start();// t.join();// System.out.println(count);// Callable和Runnable很相似 但是Runnable可以返回计算的值Callable<Integer> callable = new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int result = 0;for (int i = 1; i <= 100; i++) {result += i;}return result;}};// futuretask这个类用来包装callable这个类 这样callable就可以直接放入线程FutureTask<Integer> futureTask = new FutureTask<>(callable);Thread t2 = new Thread(futureTask);t2.start();//从future获取线程启动通过callable计算得到的值t2.join();System.out.println(futureTask.get());}}
如以上代码,Callable接口需要使用FutureTask来包装,包装之后就将FutureTask对象放入线程,线程执行完成之后就可以通过FutureTask对象来得到线程执行的结果。
2.2 ReentrantLock(可重入)
在以前的JDK中,synchronized还没现在那么好用,那时ReentrantLock还是非常有市场的。但是随着版本的迭代,synchronized越来越强,基本上需要加锁的时候无脑使用synchronized大概率不会出问题。那么ReentrantLock现在还有什么价值?
(1)ReentrantLock实现了公平锁
这里代码中的参数写true就是公平锁,false就是非公平锁。
(2)ReentrantLock提供了tryLock操作,给加锁提供了更多的操作空间。
尝试加锁,如果该锁已经被获取了,那么就直接失败返回,不会继续等待。tryLock还有一个类似版本就是可以指定等待的时间,超时后返回。
(3)synchronized搭配wait以及notify的等待通知机制,ReentrantLock搭配Condition类完成等待通知。
Condition类比wait以及notify强一点。(多个线程wait,notify唤醒随机一个。Condition指定线程唤醒)
2.3 Semaphore信号量
信号量是一个非常简单的概念,就是一个计数器,描述了可用资源的数目。围绕信号量有两个操作,P操作,计数器减一,申请资源,V操作,计数器加一,释放资源。提出信号量的是荷兰人,PV是荷兰语的首字母,在英语中是acquire就是获取,以及release表示释放。代码示例如下:
package thread;import java.util.concurrent.Semaphore;public class Demo44 {public static void main(String[] args) throws InterruptedException {// 四个可用资源 P申请资源 V释放资源Semaphore semaphore = new Semaphore(4);semaphore.acquire(1);System.out.println("P操作");semaphore.acquire(1);System.out.println("P操作");semaphore.acquire(1);System.out.println("P操作");semaphore.acquire(1);System.out.println("P操作");// 此时信号量的四个资源已经被申请完了// 如果继续申请的话就会堵塞 因为要等别的线程释放信号量的资源semaphore.acquire(1);}}
以上代码信号量拥有四个单位的资源,然后通过acquire方法来申请资源,当资源被申请完并且没有资源释放时,再次申请资源就会阻塞。当设置信号量资源为一个单位,则信号量取值只能为1或者0,此时的信号量可以当成锁来使用。代码示例如下:
package thread;import java.util.concurrent.Semaphore;public class Demo45 {public static int count = 0;public static void main(String[] args) throws InterruptedException {//设置 1 0 信号量Semaphore semaphore = new Semaphore(1);Thread t1 = new Thread(() -> {try {for (int i = 0; i < 50000; i++) {semaphore.acquire(1);count++;semaphore.release();}} catch (InterruptedException e) {throw new RuntimeException(e);}});Thread t2 = new Thread(() -> {try {for (int i = 0; i < 50000; i++) {semaphore.acquire(1);count++;semaphore.release();}} catch (InterruptedException e) {throw new RuntimeException(e);}});t1.start();t2.start();t1.join();t2.join();System.out.println(count);}
}
以上代码其实还是多线程代码的经典例子,使用两个线程来计算累加值。当t1进行count加一的操作时,它已经申请了唯一的信号量资源,此时如果t2线程也想进行count加一就必须先执行申请信号量资源的操作,此时就会阻塞,只有当t1线程的count++执行结束之后释放资源,t2线程才能继续执行,这就实现了count++操作的原子性,从而避免线程安全问题。
2.4 CountDownLatch类
相对来说比较实用的工具类,当我们把一个任务分为多个时,就可以通过这个工具类来识别任务是否整体执行完毕了。代码示例如下:
package thread;import java.util.concurrent.CountDownLatch;public class Demo46 {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(10);for (int i = 0; i < 10; i++) {int temp = i;Thread t = new Thread(() -> {System.out.println("线程启动:" + temp);//当作任务try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("线程结束:" + temp);latch.countDown();});t.start();}//等待所有线程中的任务结束latch.await();System.out.println("所有线程结束。");}}
这段代码中我们给CountDownLatch类对象的参数为10,并且创建10个线程去执行任务,并且在每个线程中使用countDown方法,countDown方法相当于计数,当一个线程结束就会加一,latch.await()方法就会等待所有线程执行结束,当countDown方法累加的数等于初始化CountDownLatch对象的参数时await方法就会停止等待,整个代码就运行结束了。这里代码中CountdownLatch对象的参数和线程数相等,并且每个线程都放了countDown方法,所以所有线程运行结束await方法也就不等了。
2.5 CopyOnWriteArrayList类
ArrayList,LinkedList,Stack,Queue,HashMap…在多线程下使用集合类需要注意线程安全问题。Vector自带了synchronized,Stack继承自Vector所以也有synchronized,HashTable也是自带synchronized。但是需要注意一点,加锁不代表就是线程安全的,不加锁也不能确定线程就是一定不安全的,需要具体代码具体分析。
在我们使用未加锁的类时需要手动进行加锁,这样是比较麻烦的,标准库提供了一些其它的解决方案,如下图。
通过这样的操作,给ArrayList这些集合类套一层壳,就是给一些关键方法加上了synchronized,使得ArrayList达到Vector那样的效果。
CopyOnWriteArrayList类也是一种解决线程安全问题的方法。
如果当前有多个线程读列表上的数据,那么不需要做任何处理。如果某个线程对上面的数据进行修改,此时另一个线程进行读取,那么很可能会读到200 3这样的中间情况。CopyOnWriteArrayList这样的类就是一种写时拷贝,在你对列表进行修改时会开辟新空间在新空间上进行修改,你要读取数据那么就在旧空间进行读取,当修改完成后将新的列表的引用代替旧的引用,旧的空间就可以释放了。这样的过程没有任何的加锁和阻塞,也能保证线程读不到错误的数据。
这种方法的思想应用的很广,例如显卡渲染画面到显示器,显示的动态效果其实就是很多张图片,由于显卡渲染足够快这些图片就能融合在一起,看到动画效果。实际上就是写时拷贝,在显示上一个画面的时候,在背后的额外空间生成下一个画面,生成完毕了用下一个画面代替上一个画面。
2.6 ConcurrentHashMap
我们知道HashMap是线程不安全的,HashTable是带锁的,是否是线程安全的?事实上并不推荐使用这个,标准库提供了更好的替代也就是ConcurrentHashMap。
HashTable加锁就是简单粗暴的给每个方法加了synchronized,就相当于针对this加锁,只要针对HashTable上的元素进行操作,就都会涉及到锁冲突。
ConcurrentHashMap做出了以下优化:
(1)使用锁桶的方式来代替一把全局锁,有效降低冲突概率。
这一点很好理解,如果有两个线程针对两个不同的链表进行操作,那么它们之间是不会产生锁冲突的。本身两个线程修改的是不同的链表,也没涉及到“公共变量”,所以不涉及线程安全问题。这个提升是非常大的,因为一个哈希表上的桶非常多,桶之间发生冲突的概率非常小,并且synchronized我们前面的博客也讲过了,只要不发生冲突synchronized只是加了一个偏向锁,就类似一个标记,消耗非常小。
(2)对于哈希表的size即使你修改的不同链表/桶,但是你在多线程的情况下也会涉及到多个线程修改一个公共变量的问题,在ConcurrentHashMap中对于size的修改就是使用CAS这种具有原子性的语句来完成,这样不仅避免了加锁这种重量级的操作,也解决了线程安全的问题。
(3)针对扩容进行了特殊优化。
如果发现负载因子太大了,那么就需要扩容,然而扩容又是比较低效的操作,普通的HashMap要在一次put的过程中完成整个扩容过程,就会使得put操作非常卡。ConcurrentHashMap就会在扩容的时候整出另外的一份空间,每次进行哈希表的基本操作都会将一部分扩容之前空间的数据搬到新空间,不是一口气搬完而是分多次,在搬的过程中如果是插入操作就将新数据插入到新空间,删除操作,新旧空间都进行删除,查找操作,新旧空间都要查找。
另外值得一提的是,在java8之前ConcurrentHashMap是基于分段锁的形式进行实现的,就是引入多个锁对象,每个锁对象去管理若干个哈希桶。相比于HashTable这个方法是进化,但是还是不如直接锁桶,后面就把这个方法给废弃了。