前言:
最近看PriorityBlockingQueue这个类的过程中,对扩容方法产生了一些困惑,特此记录下自己思索的过程。
PriorityBlockingQueue:
PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。默认使用对象的 ompareTo 方法提供比较规则,如果你需要自定义比较规则则可以自定义 comparators。
构造方法:
构造方法有3个分别是:
无参:默认容量11
public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}
有参有三个:分别是指定初始容量或追加一个比较器
public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {if (initialCapacity < 1)throw new IllegalArgumentException();this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}
有参还有一个是接收一个集合
public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true; // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify();}
常用方法:
1.offer 操作
在队列中插入一个元素,由于是无界队列,因此一直返回 true 。会返回最小的元素。
public boolean offer(E e) {//元素喂null时,会抛空指针异常if (e == null)throw new NullPointerException();//获取锁对象final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;//size大于等于队列的长度while ((n = size) >= (cap = (array = queue).length))//尝试扩容tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;//上浮排序if (cmp == null)//默认比较器siftUpComparable(n, e, array); else//自定义比较器siftUpUsingComparator(n, e, array, cmp);//size+1size = n + 1;//唤醒因take的阻塞线程notEmpty.signal();} finally {lock.unlock();}return true;}
扩容方法:
private void tryGrow(Object[] array, int oldCap) {//扩容的时候先释放锁 保证在此过程其他线程可以入队出队,增加并发性能lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;//先判断是否处于扩容状态中,并CAS更新扩容状态 更新成功的线程进行扩容if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {//判断旧的容量如果小于64,那么新的容量就等于oldCap*2+2,否则为oldCap的1.5倍int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));/*此处判断是否超过最大容量MAX_ARRAY_SIZE,MAX_ARRAY_SIZE为Integer.MAX_VALUE-8, 疑问一? 当oldCap过大时,newCap会发生数据溢出成为负数时小于0 为什么不判断该情况?*/if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow//此处说明 新的容量超过了最大值,进行最小扩容 int minCap = oldCap + 1;/*此处判断 minCap的合法性 大于0 且必须小于最大值 否则抛出异常疑问二? 此处为什么要判断minCap<0? min小于0的情况只有 oldCap=Integer.MAX_VALUE才可能发生,但是oldCap大于MAX_ARRAY_SIZE(Integer.MAX_VALUE-8)时就会抛异常,那么它如何来的?*/if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();//走此处说明最小扩容值合法,直接扩容为最大容量newCap = MAX_ARRAY_SIZE;}//如果新的容量>旧的容量 并且数组没有发生改变 进行扩容 //疑问三:此处为何要判断引用一致问题if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {//无论新容量成功还是失败都要修改扩容状态allocationSpinLock = 0;}}
//CAS不成功的线程,会直接到此处,让出CPU时间片,尽量让扩容线程优先(但这得不到保证)if (newArray == null) // back off if another thread is allocatingThread.yield();//获取锁,进行扩容 理论上可以是上面CAS失败的线程获取到的lock.lock();/*如果没有newArray=null 说明扩容还未完成 会重新调用扩容方法,重走流程(外出while循环扩容),并再次调用Thread.yield(),给扩容线程让出cpu片 疑问四?为什么要判断 queue == array呢,按理说已经抢到锁了,且走到这里肯定也是完成了newArray,为什么不直接扩容而要再次判断引用一致?
*/if (newArray != null && queue == array) {//复制当前数组元素到新数组queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}
疑问一:
当oldCap过大时,newCap会发生数据溢出成为负数时小于0 为什么不判断该情况?
当oldCap越接近Inter.MAX_VALUE时,扩容时newCap必定是负数,但是JDK团队很巧妙的用newCap - MAX_ARRAY_SIZE > 0去判断。MAX_ARRAY_SIZE为Inter.MAX_VALUE-8。当一个负数减去一个更大的正数时,会成为一个更大的负数,负数同样会发生数据溢出,溢出后会变为正数最大数。只要这个负数<-8,那么再减去MAX_ARRAY_SIZE就一定会溢出变为正数。此处判断肯定会大于0.说明到达newCap已经超过最大界限了。当newCap为正数时,若判断通过,同样说明超过了MAX_ARRAY_SIZE。如果newCap - MAX_ARRAY_SIZE=0时,意味着newCap已经是MAX_ARRAY_SIZE,可以直接进行最大扩容。而newCap - MAX_ARRAY_SIZE<0的情况说明还远没到最大界限的判断,也可以直接扩容。
疑问二:
此处为什么要判断minCap<0? min小于0的情况只有oldCap=Integer.MAX_VALUE才可能发生,但是oldCap大于MAX_ARRAY_SIZE(Integer.MAX_VALUE-8)时就会抛异常,那么它如何来的?
这个问题虽然很简单,但其实困扰了我好几天。后来偶然间想到会不会是构造方法,然后去验证一下,结果答案就是构造方法,构造方法并未对initialCapacity有限制,只判断了不能为负数的情况。也就是说可以初始容量可以为MAX_VALUE,所以这就是此处有这个判断存在的理由。这个事情也侧面说明解决问题不应只局限于当前的眼光,有时要从整体方面去看待一个事物,不能一叶障目。
疑问三:
此处为何要判断引用一致问题?
这块其实比较抽象,计算newCap时,由于没有锁可能有多个线程都在进行扩容,若有线程已经计算完,并获取锁成功完成了引用替换,说明已经在扩容了,那么后来的线程到此处就不用再进行扩容了。
疑问四:
为什么要判断 queue == array呢,按理说已经抢到锁了,且走到这里肯定也是完成了newArray,为什么不直接扩容而要再次判断引用一致?
这块其实也想了挺长时间,也走了一些弯路。答案其实和疑问三一样。简而言之,多个线程都在进行扩容,若当前线程计算完newCap创建数组的时候,已有其他线程完成并抢到锁,进行引用替换,数组复制。释放锁后,被当前线程抢到,那么当前线程其实就没要了多此一举了。一句话总结就是保证多线程扩容时,只有一个线程能扩容成功。
2.put 操作
内部调用的是 offer 操作 ,由于是无界队列,所以不需要阻塞。查阅资料发现有人对这个方法的阻塞有错误的理解,明明上锁了,为什么说说不阻塞。和其他阻塞队列相比,其他阻塞队列大都是有界队列,满了的话,需要先等其他线程调用take或poll方法,等队列有空位了,再去入队。这个由于是无界队列,直接扩容,且扩容时也不上锁。所以JDK团队才会在这个方法的源码加一句never need to block注释吧。
3.poll和take
两者都差不多,都是调用出列方法,所以只贴一个。
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {//此处判断对列中是否还有元素,若没有会阻塞while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}
private E dequeue() {//判断队列是否为空int n = size - 1;if (n < 0)return null;else {Object[] array = queue;//获取第一个元素E result = (E) array[0];//获取最后一个元素E x = (E) array[n];array[n] = null;//下沉算法重排序Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);//修改sizesize = n;return result;}}
4.remove
public boolean remove(Object o) {final ReentrantLock lock = this.lock;
//上锁lock.lock();try {//获取该元素下标int i = indexOf(o);if (i == -1)//找不到返回falsereturn false;//删除对应下标元素removeAt(i);return true;} finally {lock.unlock();}}
private void removeAt(int i) {Object[] array = queue;//判断队列元素是否1个int n = size - 1;if (n == i) // removed last elementarray[i] = null;else {//获取该位置元素E moved = (E) array[n];//将下标置为nullarray[n] = null;//下沉算法 保持堆一致Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(i, moved, array, n);elsesiftDownUsingComparator(i, moved, array, n, cmp);//这种情况说明移动过去后,根本没有下沉(如果有下沉,i处肯定会变成一个比moved小的数)if (array[i] == moved) {if (cmp == null)siftUpComparable(i, moved, array); //上移elsesiftUpUsingComparator(i, moved, array, cmp);}}size = n;}
5.iterator
和JUC的大部分一样,为了并发性能,都是弱一致性迭代。
其他
这些都是自己的一些思考,也有参考过书或是网上其他的文章,纯属个人理解,如果有其他的思路欢迎一起讨论。一些其他的方法如上浮和下沉有点抽象,本人数据结构也不太好,只看了个大概,由于自己都没参透,就不再贴出误人子弟了。