引言
本博客基于常用的并发容器,简单概括其基本特性和简单使用,并不涉及较深层次的原理分析和全面的场景用法。
适合对不了解并发容器的同学,工作中遇到类似的场景,能够对文中提到的并发容器留有简单印象就好。
一、ConcurrentHashMap
下面的程序中,切换任意Map的实现方式,如TreeMap、HashTable、ConcurrentHashMap等,运行程序,观察执行结果:
public class T01_ConcurrentMap {public static void main(String[] args) {
// Map<String, String> map = new TreeMap<>();
// Map<String, String> map = new Hashtable<>();Map<String, String> map = new ConcurrentHashMap<>();
// Map<String, String> map = new ConcurrentSkipListMap<>();
// Map<String, String> map = new HashMap<>();Random rdm = new Random();Thread[] ths = new Thread[100];CountDownLatch latch = new CountDownLatch(ths.length);long start = System.currentTimeMillis();for (int i = 0; i < ths.length; i++) {ths[i] = new Thread(() -> {for (int j = 0; j < 10000; j++)map.put("a" + rdm.nextInt(100000), "a" + rdm.nextInt(100000));latch.countDown();});}Arrays.asList(ths).forEach(t -> t.start());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();System.out.println(end - start);}
}
ConcurrentHashMap和HashTable,前者在执行效率方面要比后者高,因为HashTable在put每个对象的时候,都需要锁定整个Map对象;而ConcurrentHashMap的实现方式是通过“分段锁”,即将锁的作用范围细化,在put的时候,只锁定对应的一段,从而提高效率。
ConcurrentSkipListMap跳表,不仅支持并发,而且是有序的,因为提供了排序功能,在性能方面可能不及Concurrent-HashMap。
二、CopyOnWriteList写时复制容器
切换下面程序中List的实现方式(ArrayList、Vector、CopyOnWriteArrayList)观察执行结果。
public class T02_CopyOnWriteList {public static void main(String[] args) {// List<String> lists = new ArrayList<>();// 会产生并发问题// List<String> lists = new Vector<>();List<String> lists = new CopyOnWriteArrayList<>();Random rdm = new Random();Thread[] ths = new Thread[100];for (int i = 0; i < ths.length; i++) {Runnable task = () -> {for (int j = 0; j < 1000; j++) {lists.add("a" + rdm.nextInt(10000));}};ths[i] = new Thread(task);}runAndComputeTime(ths);System.out.println(lists.size());}private static void runAndComputeTime(Thread[] ths) {long t1 = System.currentTimeMillis();Arrays.asList(ths).forEach(t -> t.start());Arrays.asList(ths).forEach(t -> {try {t.join();} catch (Exception e) {e.printStackTrace();}});long t2 = System.currentTimeMillis();System.out.println(t2 - t1);}
}
CopyOnWriteList写的效率非常低,读的效率非常高。这是因为在向写时复制容器中添加一个元素的时候会将整个容器复制一份,再对复制后的容器进行插入;而读的时候不需要加锁。因此,这种容器一般用于极少修改,而读取频繁的应用场景。
三、Collections.synchronizedXx
Collections是一个容器工具类,通过类似synchronizedList(List<T> list)、synchronizedMap(Map<K,V> m)等命名形式的静态方法,可以返回一个加了锁的对应容器。实际上,通过这种方式获得的同步容器仅仅是将普通的非线程安全的容器的方法进行synchronized封装:
使用方法非常简单,即传入的非同步容器,返回一个同步容器:
List<String> strs = new ArrayList<>();
List<String> syncStrs = Collections.synchronizedList(strs);
四、ConcurrentLinkedQueue
线程安全的并发队列,就可以使用ConcurrentLinkedQueue,是“尾进头出”的并发队列;另外还有一个ConcurrentLinkedDeque 是并发的双端队列,在实际开发中用处也非常大。
执行下面的程序,观察输出结果,重点理解个别方法的含义:
public static void main(String[] args) {Queue<String> strs = new ConcurrentLinkedQueue<>();Deque<String> names = new ConcurrentLinkedDeque<>();for (int i = 0; i < 10; i++) {strs.offer("S" + i); // addnames.offer("N" + i);}System.out.println("output#1 : " + strs);System.out.println("output#2 : " + strs.size());System.out.println("output#3 : " + strs.poll()); // FIFO,容器中的删除System.out.println("output#4 : " + strs.size());System.out.println("output#5 : " + strs.peek()); // 取出,容器中的不删System.out.println("output#6 : " + strs.size());System.out.println("output#7 : " + names);System.out.println("output#8 : " + names.pollLast()); // LIFOSystem.out.println("output#9 : " + names);System.out.println("output#10 : " + names.peekFirst()); // FIFOSystem.out.println("output#11 : " + names);}
执行结果:
五、LinkedBlockingQueue
使用LinkedBlockingQueue非常适合解决生产者-消费者模式的问题。
public class T05_LinkedBlockingQueue {static BlockingQueue<String> strs = new LinkedBlockingQueue<>();static Random rdm = new Random();public static void main(String[] args) {new Thread(() -> {for (int i = 0; i < 100; i++) {try {strs.put("a" + i);TimeUnit.MILLISECONDS.sleep(rdm.nextInt(1000));} catch (Exception e) {e.printStackTrace();}}}, "p1").start();for (int i = 0; i < 5; i++) {new Thread(() -> {for (;;) {try {System.out.println(Thread.currentThread().getName()+ " take " + strs.take());// 如果空了,就会等待} catch (Exception e) {e.printStackTrace();}}}, "c" + i).start();}}
}
Queue在高并发的情况下,通常可以使用两种队列:ConcurrentLinkedQueue 和 BlockingQueue(接口)。
BlockingQueue是阻塞式队列,它是一个接口,可以根据有界或无界等实际需要考虑使用它的相关实现类,如ArrayBlockingQueue或上面代码中用到的LinkedBlockingQueue。
六、ArrayBlockingQueue
ArrayBlockingQueue是一个有界队列,在构造时可以指定元素个数,队列会自行维护元素的个数:
public class T06_ArrayBlockingQueue {static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);static Random rdm = new Random();public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10; i++) {strs.put("a" + i);}// strs.put("aaa");// 线程阻塞,知道有空闲位置
// strs.add("aaa"); // output:java.lang.IllegalStateException: Queue full
// strs.offer("aaa"); // output:[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]strs.offer("aaa", 3, TimeUnit.SECONDS);// 阻塞,但可以指定超时时间System.out.println(strs);}
}
注意观察此容器为我们提供的不同场景下可以选择的添加元素的方式,执行结果见代码注释部分。
七、DelayQueue
DelayQueue也是一种无界队列,容器中的每个元素会记录一个倒计时,等待时间结束后才可以被消费者取出。可以用于执行定时任务。
public class T07_DelayQueue {static BlockingQueue<MyTask> tasks = new java.util.concurrent.DelayQueue<>();static class MyTask implements Delayed {long runningTime;MyTask(long rt) {runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else {return 0;}}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return String.valueOf(runningTime);}}public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask(now + 1000);MyTask t2 = new MyTask(now + 2000);MyTask t3 = new MyTask(now + 1500);MyTask t4 = new MyTask(now + 2500);MyTask t5 = new MyTask(now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for (int i = 0; i < 5; i++) {System.out.println(tasks.take());}}
}
八、TransferQueue
TransferQueue提供了一种特殊的方法叫做transfer(), 这个方法在有消费者等待消费的时候,不会将元素放入队列中,而是直接传递给消费者。因此这种方法非常适用于高并发的情况下。 但是,当没有消费者的时候,那么transfer就会阻塞,而其他类似的方法如put(),add()都不会阻塞。在实时消息处理中用到的比较多, 例如Netty。
public class T08_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (Exception e) {e.printStackTrace();}}).start();strs.transfer("aaa");new Thread(() -> {try {System.out.println(strs.take());} catch (Exception e) {e.printStackTrace();}}).start();}
}
九、SynchronusQueue
同步队列 SynchronousQueue是一种特殊的TransferQueue。 它的容量为0,是没有容量的队列,在消费者正在等待的时候,必须使用put(实际上内部使用的是transfer),放入的任何元素 都必须直接交给消费者,而不能放入容器中。
public class T09_SynchronousQueue { // 容量为0public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strs = new SynchronousQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (Exception e) {e.printStackTrace();}}).start();// strs.put("aaa"); // 阻塞等待消费者消费strs.add("aaa");System.out.println(strs.size());}}
总结
综上,是对一些常用容器的介绍和案例展示,重点要理解它们的应用区别和使用场景,最起码要有个大概印象,在遇到类似问题的时候考虑使用它们。
ConcurrentHashMap
CopyOnWriteList
Collections.synchronizedXx
ConcurrentLinkedQueue
BlockingQueue
LinkedBQ
ArrayBQ
TransferQueue
SynchronusQueue
DelayQueue执行定时任务
鸣谢
《马士兵老师高并发编程系列--第三部分》