并发容器
- 什么是并发容器?
- 同步容器:
- 并发容器:
- ConcurrentHashMap
- 结构图
- JDK1.7结构图
- JDK1.8结构图
- CopyOnWriteArrayList
- 实现原理
- 并发队列
- 阻塞队列
- ArrayBlockingQueue
转自极客时间
什么是并发容器?
在JUC包中,有一大部分是关于并发容器的,如ConcurrentHashMap,ConcurrentSkipListMap, CopyOnWriteArrayList及阻塞队列。这里将介绍使用频率、面试中出现频繁的最高的 ConcurrentHashMap和阻塞队列。
注意:这里说到的容器概念,相当于我们理解中的集合的概念。
同步容器:
Java中的集合主要分为四大类:List、Map、Set和Queue,但是并不是所有集合都是线程安全的。比 如,我们经常使用的ArrayList,HashMap,HashSet就不是线程安全的。
早期的JDK1.0中的就提供了线程安全的集合,包括Vector,Stack和Hashtable。此外还有在JDK1.2中增 加的Collections中内部SynchronizedXxx类,它们也是线程安全的集合,可以由对应 Collections.synchronizedXxx工厂方法创建。这些类实现线程安全的方式都是一样的:都是基于 synchronized这个同步关键字实现的,对每个公有方法都进行了同步,保证每次只有一个线程能访问集 合,所以它们被称为线程安全的集合(同步容器)。
并发容器:
在JDK1.5之前,JDK提供的线程安全的类都是同步集合容器。同步容器都是线程安全的,但是所有线程 对容器只能串行访问,性能很差。在JDK1.5之后引入的JUC并发包,提供的更多类型的并发容器,在性 能上做了很多改进优化,可以用来替代同步容器。它们都是针对多线程并发访问来进行设计的,我们称 它们为并发容器。
并发容器依然可以归属到我们提到的四大类:List、Map、Set 和 Queue。
这里我总结了一下它们特性和使用场景:
- List容器:
Vector:使用synchronized同步锁,数据具有强一致性。适合于对数据有强一致性要求的场 景,但性能较差。 CopyOnWriteArrayList:底层使用数组存储数据,使用复制副本实现有锁写操作,不能保 证强一致性。适合于读多写少,允许读写数据短暂不一致的高并发场景。 - Map容器
Hashtable:使用synchronized同步锁,数据具有强一致性。适合于对数据有强一致性要求的 场景,但性能较差。 ConcurrentHashMap:基于数组+链表+红黑树实现,写操作时通过synchronized同步锁将 HashEntry作为锁的粒度支持一定程度的并发写,具有弱一致性。适合于存储数据量较小,读 多写少且不要求强一致性的高并发场景。 ConcurrentSkipListMap:基于跳表实现的有序Map,使用CAS实现无锁化读写,具有弱一致 性。适合于存储数据量大,读写都比较频繁,对数据不要求强一致性的高并发场景。 - Set容器
CopyOnWriteArraySet:底层使用数组存储数据,使用复制副本实现有锁写操作,不能保证 强一致性。适合于读多写少,允许读写数据短暂不一致的场景。 ConcurrentSkipListSet:基于跳表实现的有序Set,使用CAS实现无锁化读写,具有弱一致 性。适合于存储数据量大,读写都比较频繁,对数据不要求强一致性的高并发场景。
ConcurrentHashMap
结构图
JDK1.7结构图
Java7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类 似,仍然是数组和链表组成。
每个segment独立上ReentrantLock锁,每个segment之间互不影响,提高并发效率。
默认有16个segment,最多可以同时支持16个线程并发写(操作分别分布在不同的Segment上)。这个 默认值可以在初始化时设置,但一旦初始化以后,就不可以再扩容了。
JDK1.8结构图
ConcurrentHashMap是一个存储 key/value 对的容器,并且是线程安全的。
改进一: 取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用 table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
改进二: 将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。查询 更快
CopyOnWriteArrayList
实现原理
CopyOnWrite 思想:是平时查询的时候,都不需要加锁,随便访问,只有在更新的时候,才会从原来的 数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本。修改操作的同时,读操 作不会被阻塞,而是继续读取旧的数据。这点要跟读写锁区分一下。
public class Demo15CopyOnWriteArrayList {public static void main(String[] args) {//1、初始化CopyOnWriteArrayListList<Integer> tempList = Arrays.asList(new Integer [] {1,2});CopyOnWriteArrayList<Integer> copyList = new CopyOnWriteArrayList<>(tempList);ThreadLocal tl = new ThreadLocal();//2、模拟多线程对list进行读和写ExecutorService executorService = Executors.newFixedThreadPool(10);executorService.execute(new ReadThread(copyList));executorService.execute(new WriteThread(copyList));executorService.execute(new WriteThread(copyList));executorService.execute(new WriteThread(copyList));executorService.execute(new ReadThread(copyList));executorService.execute(new WriteThread(copyList));executorService.execute(new ReadThread(copyList));executorService.execute(new WriteThread(copyList));try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println("copyList size:"+copyList.size());executorService.shutdown();}
}class ReadThread implements Runnable {private List<Integer> list;public ReadThread(List<Integer> list) {this.list = list;}@Overridepublic void run() {System.out.print("size:="+list.size()+",::");for (Integer ele : list) {System.out.print(ele + ",");}System.out.println();}
}class WriteThread implements Runnable {private List<Integer> list;public WriteThread(List<Integer> list) {this.list = list;}@Overridepublic void run() {this.list.add(9);}
}
并发队列
阻塞队列
ArrayBlockingQueue
有界,可以指定容量
公平:可以指定是否需要保证公平,如果想要保证公平,则等待最长时间的线程会被优先处理,不过会带来一定的性能损耗。
场景:有10个面试者,只有1个面试官,大厅有3个位子让面试者休息,每个人面试时间10秒,模拟所有 人面试的场景。
/*** 案例:有10个面试者,只有1个面试官,大厅有3个位子让面试者休息,每个人面试时间10秒,模拟所有人面试的场景。*/
public class Demo16ArrayBlockingQueue {static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);public static void main(String[] args) {Interviewer r1 = new Interviewer(queue);//面试官Engineers e2 = new Engineers(queue);//程序员们new Thread(r1).start();new Thread(e2).start();}
}class Interviewer implements Runnable {BlockingQueue<String> queue;public Interviewer(BlockingQueue queue) {this.queue = queue;}@Overridepublic void run() {System.out.println("面试官:我准备好了,可以开始面试");String msg;try {while(!(msg = queue.take()).equals("stop")){System.out.println(msg + " 面试+开始...");TimeUnit.SECONDS.sleep(10);//面试10sSystem.out.println(msg + " 面试-结束...");}System.out.println("所有候选人都结束了");} catch (InterruptedException e) {e.printStackTrace();}}
}class Engineers implements Runnable {BlockingQueue<String> queue;public Engineers(BlockingQueue queue) {this.queue = queue;}@Overridepublic void run() {for (int i = 1; i <= 10; i++) {String candidate = "程序员" + i;try {queue.put(candidate);System.out.println(candidate+" 就坐=等待面试~");} catch (InterruptedException e) {e.printStackTrace();}}try {queue.put("stop");} catch (InterruptedException e) {e.printStackTrace();}}
}