同步类容器
1,这些复合操作在多线程并发地修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题 增强for循环和iterator的形式不容许遍历的时候修改元素
- 出现java.util.ConcurrentModificationException
package com.example.core.collection;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;public class UseSyncCollection {// 出现java.util.ConcurrentModificationExceptionpublic Collection<String> m1(Vector<String> list) {for (String temp : list) {if ("3".equals(temp)) {list.remove(temp);}}return list;}public static void main(String[] args) {Vector v = new Vector<>();v.add("1");v.add("2");v.add("3");UseSyncCollection test = new UseSyncCollection();Collection<String> ret1 = test.m1(v);System.err.println(ret1.toString());}
}
- 出现java.util.ConcurrentModificationException
package com.example.core.collection;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;public class UseSyncCollection {// 出现java.util.ConcurrentModificationExceptionpublic Collection<String> m2(Vector<String> list) {Iterator<String> iterator = list.iterator();while (iterator.hasNext()) {String temp = iterator.next();if ("3".equals(temp)) {list.remove(temp);}}return list;}public static void main(String[] args) {Vector v = new Vector<>();v.add("1");v.add("2");v.add("3");UseSyncCollection test = new UseSyncCollection();Collection<String> ret2 = test.m2(v);System.err.println(ret2.toString());}
}
- success
package com.example.core.collection;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;public class UseSyncCollection {//successful!普通for循环,单线程,先删除,再返回public Collection<String> m3(Vector<String> list) {for (int i = 0; i < list.size(); i++) {if ("3".equals(list.get(i))) {list.remove(i);}}return list;}public static void main(String[] args) {Vector v = new Vector<>();v.add("1");v.add("2");v.add("3");UseSyncCollection test = new UseSyncCollection();Collection<String> ret3 = test.m3(v);System.err.println(ret3.toString());}
}
2,同步类容器: 如Vector、HashTable。 这些容器的同步功能其实都是有JDK的Collections.synchronized***等工厂方法去创建实现的。 其底层的机制无非就是用synchronized关键字对每个公用的方法都进行同步,或者使用Object mutex对象锁的机制使得每次只能有一个线程访问容器的状态。 不满足如今既要保证线程安全,又要追求高并发的目的
List<String> list = new ArrayList<>();
Collections.synchronizedCollection(list);
并发类容器的概念
- jdk5.0以后提供了多种并发类容器来替代同步类容器从而改善性能。
- 同步类容器的状态都是串行化的。 (锁竞争)
- 他们虽然实现了线程安全,但是严重降低了并发性,造成了cpu的使用率激增,在多线程环境时,严重降低了应用程序的吞吐量
ConcurrentMap
- 接口下有俩个重要的实现: ConcurrentHashMap ConcurrentSkipListMap(支持并发排序功能)
- ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,它们有自己的锁。
- 只要多个修改操作发生在不同的段上,它们就可以并发进行。把一个整体分成了16个段(Segment),也就是最高支持16个线程的并发修改操作。
- 这也是在多线程场景时减小锁的粒度从而降低锁竞争的一种方案。并且代码中大多共享变量使用volatile关键字声明,目的是第一时间获取修改的内容,性能非常好。
package com.example.core.collection;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class UseConcurrentMap {public static void main(String[] args) {ConcurrentHashMap<String, Object>map = new ConcurrentHashMap<>();map.put("k1","v1");map.put("k2","v1");map.put("k1","vv1");//如果输入key已经存在,就会覆盖掉原值map.putIfAbsent("k1","vvv1");//如果输入key已经存在,不会进行任何操作for(Map.Entry<String,Object>me : map.entrySet()){System.err.println("key: "+ me.getKey()+",value:"+me.getValue());}}
}
Copy-On-Write
- Copy-On-Write简称COW,是一种用于程序设计中的优化策略。
- JDK里的COW容器有两种: CopyOnWriteArrayList CopyOnWriteArraySet
- COW容器非常有用,可以在非常多的并发场景中使用到。
- CopyOnWrite容器即写时复制的容器。 通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
- 这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。 所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器
- A线程执行写操作,会基于原本容器的副本(黄色的)进行操作,如果C线程此时也执行写操作,会等待A线程的锁释放,再进行写入操作。B线程执行读操作,直接在原本的容器上面操作即可。等写入完成之后,OrderList会将指针从原本的容器(黄色的)指向容器的副本(蓝色的),而原本的容器(黄色的)会被gcc删除
- 如果原容器容量很大的话,就不要使用copy-on-write,因为要执行对于容器的复制,会占据内存很大的空间
- 如果频繁的写入操纵,也不适合
- 适用于读多写少的情况,且容器的容量也不要很大
并发Queue
- 在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口
ConcurrentLinkedQueue
- ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。
- 它是一个基于链接节点的无界线程安全队列。 该队列的元素遵循先进先出的原则,头是最先加入的,尾是最近加入的,该队列不允许null元素。
- ConcurrentLinkedQueue重要方法: add() 和 offer() 都是加入元素的方法 (在ConcurrentLinkedQueue中,这俩个方法没有任何区别) poll() 和 peek() 都是取头元素节点,区别在于前者会删除元素,后者不会
BlockingQueue
- offer(anObject): 表示如果可能的话, 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true, 否则返回false.(本方法不阻塞当前执行方法的线程) 不等待
- offer(E o, long timeout, TimeUnit unit), 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。 设置等待超时时间
- put(anObject): 把anObject加到BlockingQueue里, 如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。堵塞等待
- poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败,设置等待时间
- take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入,阻塞等待
- drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁
阻塞队列的模拟
- 拥有固定长度承装元素的容器
- 计数器统计容器的容量大小
- 当队列里面没有元素的时候需执行线程要等待
- 当队列元素已满的时候执行线程也需要等待
package com.example.core.collection;import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class MyQueue {// 创建队列容器private final LinkedList<Object> list = new LinkedList<>();// 创建计数器countprivate final AtomicInteger count = new AtomicInteger(0);//单个服务的原子性,保证数据的一致性private final int maxSize;//最大容量的限制private final int minSize = 0;//最小容量的限制private final Object lock = new Object();//锁public MyQueue(int maxSize){this.maxSize = maxSize;}public void put(Object obj){synchronized (lock){while(count.get() == maxSize){try{lock.wait();}catch(InterruptedException e){e.printStackTrace();}}// 添加新的元素进入容器中list.add(obj);count.getAndIncrement();//i++System.err.println("元素"+obj+"已经添加到容器中");//进行唤醒可能正在等待的take方法操作中的线程,当take来取数值时,容器为空,take进行等待,当数据放入,通知take取数lock.notify();}}public Object take(){Object temp = null;synchronized (lock){while(count.get() == minSize){try{lock.wait();}catch (InterruptedException e){e.printStackTrace();}}temp = list.removeFirst();//移除第一个元素count.getAndDecrement();//i--System.err.println("元素"+temp+"已经从容器中取走");//进行唤醒可能正在等待的put方法操作线程,当put方法往容器里面放数值,但是容器已满,put进入等待,当take取走数据,唤醒put来存入数据lock.notify();}return temp;}public int size(){return count.get();}public List<Object> getQueueList(){return list;}
}
编写MyQueueTest.java的测试代码
package com.example.core.collection;public class MyQueueTest {public static void main(String[] args) throws Exception{MyQueue mq = new MyQueue(5);mq.put("a");mq.put("b");mq.put("c");mq.put("d");mq.put("e");System.out.println("当前元素的个数:"+mq.size());Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {mq.put("f");mq.put("g");}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {try{Thread.sleep(1000);Object o1 = mq.take();Thread.sleep(1000);Object o2 = mq.take();}catch(InterruptedException e){e.printStackTrace();}}},"t2");t1.start();Thread.sleep(1000);t2.start();Thread.sleep(5000);System.out.println(mq.getQueueList().toString());}
}
/*
输出结果如下
元素a已经添加到容器中
元素b已经添加到容器中
元素c已经添加到容器中
元素d已经添加到容器中
元素e已经添加到容器中
当前元素的个数:5
元素a已经从容器中取走
元素f已经添加到容器中
元素b已经从容器中取走
元素g已经添加到容器中
[c, d, e, f, g]
*/
ArrayBlockingQueue
- ArrayBlockingQueue:基于数组的阻塞队列实现
- 在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用。
高性能无阻塞无界队列
package com.example.core.collection;import java.util.concurrent.ConcurrentLinkedQueue;public class UseBlockingQueue {public static void main(String[] args) throws Exception{//高性能的无阻塞的无界限队列ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();clq.offer("a");clq.add("b");clq.add("c");clq.add("d");System.out.println("从容器头部取出元素"+clq.poll());//从头部取出元素,并且从容器本身移除System.out.println("容器的长度"+clq.size());System.out.println("从容器的头部取出元素"+clq.peek());//从头部取出元素,并且不会从容器本身移除System.out.println("容器的长度"+clq.size());}
}
基于阻塞-有界队列
package com.example.core.collection;import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;public class UseBlockingQueue {public static void main(String[] args) throws Exception{//基于阻塞的有界队列ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<>(5);abq.put("a");abq.add("b");abq.add("c");abq.add("d");abq.add("e");System.out.println(abq.offer("f",2, TimeUnit.SECONDS));ArrayBlockingQueue<String> abq2 = new ArrayBlockingQueue<>(5);abq.drainTo(abq2,3);for(Iterator iterator = abq2.iterator();iterator.hasNext();){String string = (String)iterator.next();System.out.println("元素"+string);}}
}
LinkedBlockingQueue
- 基于链表的阻塞队列 同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)
- LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。他是一个无界队列
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>();
SynchronousQueue
- 一种没有缓冲的队列
- 生产者产生的数据直接会被消费者获取并消费
- A线程一直等待B线程的输入,B产生的数据被A消费,SynchronousQueue只是做一个中转,不负责存储
package com.example.core.collection;import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.*;public class UseBlockingQueue {public static void main(String[] args) throws Exception{//不能存放任何元素的 阻塞队列SynchronousQueue<String>sq = new SynchronousQueue<>();new Thread(new Runnable() {@Overridepublic void run() {try{System.out.println("元素内容:"+sq.take());}catch (InterruptedException e){e.printStackTrace();}}},"t1").start();new Thread(new Runnable() {@Overridepublic void run() {sq.add("a");}},"t2").start();}
}
PriorityBlockingQueue
- 基于优先级的阻塞队列
- 优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口,在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列
package com.example.core.collection;import java.util.concurrent.PriorityBlockingQueue;public class UsePriorityBlockingQueue {public static void main(String[] args) throws InterruptedException{PriorityBlockingQueue<Node> pdq = new PriorityBlockingQueue<Node>();Node n3 = new Node(3,"node3");Node n4 = new Node(4,"node4");Node n2 = new Node(2,"node2");Node n1 = new Node(1,"node1");pdq.add(n4);pdq.add(n3);pdq.add(n1);pdq.add(n2);System.out.println("0 容器为:"+pdq);System.out.println("1 获取元素:"+pdq.take().getId());System.out.println("1 容器为 :"+pdq);System.out.println("2 获取元素:"+pdq.take().getId());System.out.println("2 容器为 :"+pdq);System.out.println("3 获取元素:"+pdq.take().getId());System.out.println("3 容器为 :"+pdq);System.out.println("4 获取元素为:"+pdq.take().getId());}
}
package com.example.core.collection;public class Node implements Comparable<Node>{private int id;private String name;public Node(){}public Node(int id,String name){super();this.id = id;this.name = name;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic int compareTo(Node node){return this.id > node.id ? 1 : (this.id < node.id ? -1 : 0);}public String toString(){return this.id + ":" + this.name;}
}
DelayQueue
- 带有延迟时间的Queue
- 其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、 任务超时处理、空闲连接的关闭等等
package com.example.core.collection;import java.util.concurrent.DelayQueue;public class Wangba implements Runnable{private DelayQueue<WangMin> delayQueue = new DelayQueue<WangMin>();public boolean start = true;//表示网吧营业public void startMachine(String id,String name,int money){WangMin wm = new WangMin(id,name,System.currentTimeMillis()+money * 1000);System.out.println("网名:"+ name +",身份证: "+id+",缴费:"+money+"元,开始上网");delayQueue.add(wm);}public void overMachine(WangMin wm){System.out.println("网名:"+wm.getName()+",身份证:"+wm.getId()+",已经到了下机时间了");}@Overridepublic void run(){while (start){try{WangMin wm = delayQueue.take();overMachine(wm);}catch(InterruptedException e){e.printStackTrace();}}}public static void main(String[] args) {Wangba wangba = new Wangba();System.out.println("网吧正常营业");Thread yingye = new Thread(wangba);yingye.start();wangba.startMachine("001","张三",2);wangba.startMachine("001","李四",4);wangba.startMachine("001","王五",7);}
}
package com.example.core.collection;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class WangMin implements Delayed {private String id;private String name;private long endTime;//上网截止日期private final TimeUnit timeUnit = TimeUnit.SECONDS;@Overridepublic String toString() {return "WangMin{" +"id='" + id + '\'' +", name='" + name + '\'' +", endTime=" + endTime +'}';}public WangMin(){}public WangMin(String id,String name,long endTime){this.id = id;this.name = name;this.endTime = endTime;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public long getEndTime() {return endTime;}public void setEndTime(long endTime) {this.endTime = endTime;}public TimeUnit getTimeUnit() {return timeUnit;}//用来判断是否到达下机时间@Overridepublic long getDelay(TimeUnit unit){return endTime - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed delayed){WangMin w = (WangMin)delayed;return this.getDelay(timeUnit) - w.getDelay(timeUnit) > 0 ? 1 : -1;}
}