文章目录
- 1 SynchronousQueue是什么
- 2 核心属性详解
- 3 核心方法详解
- 3.1 transfer(E e, boolean timed, long nanos)
- 3.1.1 TransferStack实现
- 3.1.2 TransferQueue实现
- 3.2 外部方法
- 3.2.1 put(E e)
- 3.2.2 offer(E e)
- 3.2.3 take()
- 3.2.4 poll()
- 4 总结
java 并发编程系列文章目录
1 SynchronousQueue是什么
在java的注释上写着:一种阻塞队列,其中每个插入操作都必须等待另一个线程执行相应的移除操作,反之亦然。元素的size()方法返回一定是0,就是内部元素不可见,你要么阻塞添加元素,要么阻塞获取元素,获取到元素,返回,提供元素的线程也返回。如果非阻塞,那么只有当前队列里有等待transfer的线程和你的mode不同才会成功。
2 核心属性详解
通过内部类实现数据传输功能,有两个实现,即公平和非公平,对应的数据结构是queue 和 stack
abstract static class Transferer<E> {//对于生成者 E是提交的元素,不为null,消费者是null, 剩下两个参数表示是否是有阻塞,和超时时间abstract E transfer(E e, boolean timed, long nanos);}//...省略其他
3 核心方法详解
3.1 transfer(E e, boolean timed, long nanos)
3.1.1 TransferStack实现
基于stack栈的数据结构,每次都是设置head,然后按照特性,如果不带时间的情况,transfer数据如果head没有,说明没有其他线程在等待transfer,所以直接失败
如果带有时间的,会park住,等待其他transfer的来匹配唤醒。整体流程就是这样
E transfer(E e, boolean timed, long nanos) {SNode s = null;//数据为null就表示来拿数据的,否则就是put数据的int mode = (e == null) ? REQUEST : DATA;for (;;) {SNode h = head;//如果此时是空,或者当前头结点的mode和自己相同if (h == null || h.mode == mode) {//1. 需要timed 但是nanos 是非正数,那就不需要等待,直接返回,其实就是必定不成功,因为h.mode == mode,不一样的时候h == null 没法匹配其他线程的数据//此时会清理match == this的head,就是匹配失败的nodeif (timed && nanos <= 0) {if (h != null && h.isCancelled())casHead(h, h.next); // pop cancelled nodeelsereturn null;} //此时是非timed 或者 timed 且 nanos > 0的 因为此时是stack结构,所以设置head为自己else if (casHead(h, s = snode(s, e, h, mode))) {//设置成功 获取Node 如果返回的是自己,说明获取失败 下面会描述该方法SNode m = awaitFulfill(s, timed, nanos);if (m == s) {//这个方法就是重新设置headclean(s);return null;}//获取数据成功了,重新设置headif ((h = head) != null && h.next == s)casHead(h, s.next);//返回对应的数据return (E) ((mode == REQUEST) ? m.item : s.item);}} //此时h != null 或者 mode不相同//此时head 没有被占用else if (!isFulfilling(h.mode)) { //对match == this去除掉if (h.isCancelled()) casHead(h, h.next); //设置head else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//设置成功for (;;) {SNode m = s.next;if (m == null) {casHead(s, null);s = null;break;}SNode mn = m.next;//尝试匹配 把m.match设置成s 此时会唤醒对应的阻塞线程if (m.tryMatch(s)) {casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item);} else s.casNext(m, mn);}}} else {//此时head是FULFILLING状态 那就拿head.next去尝试匹配,相同的原理 SNode m = h.next; if (m == null) casHead(h, null); else {SNode mn = m.next;if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); }}}}//这个方法就是线程到这匹配数据 匹配不到就阻塞。整体逻辑是这个SNode awaitFulfill(SNode s, boolean timed, long nanos) {final long deadline = timed ? System.nanoTime() + nanos : 0L;Thread w = Thread.currentThread();//int spins = (shouldSpin(s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {if (w.isInterrupted())s.tryCancel();SNode m = s.match;if (m != null)return m;if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel();continue;}}if (spins > 0)spins = shouldSpin(s) ? (spins-1) : 0;//此处以上代码就是判断超时 循环次数 节点是否匹配过,在一个cas + for循环每次都要验证数据的//设置等待的线程,之后会被其他线程来匹配的时候唤醒。唤醒是在tryMatchelse if (s.waiter == null)s.waiter = w; // establish waiter so can park next iterelse if (!timed)//阻塞当前线程LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)//阻塞带有超时时间的LockSupport.parkNanos(this, nanos);}}
3.1.2 TransferQueue实现
只是数据结构改变而已,实现逻辑几乎不变
3.2 外部方法
3.2.1 put(E e)
放入一个元素,如果返回的是Null,按照transfer原理,是走到LockSupport.park(this); 即没匹配带数据就会阻塞,如果返回Null就是中断的情况所以会重置中断位且抛出中断异常
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();}}
3.2.2 offer(E e)
因为timed == true 且nanos = 0 所以如果此时没有head 会立马返回
public boolean offer(E e) {if (e == null) throw new NullPointerException();//只是返回成功或者失败return transferer.transfer(e, true, 0) != null;}
3.2.3 take()
timed == false 其实和put一样会进入LockSupport.park(this) 阻塞自己,此时会出现中断情况,所以会判断返回结果,是null重置中断位,抛出异常。
public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();}
3.2.4 poll()
poll方法 timed == true nanos == 0 所以如果head == null 会直接返回,这个方法就是直接获取数据,返回结果 可能为null
public E poll() {return transferer.transfer(null, true, 0);}
4 总结
利用transfer方法提供了两种实现,让多个线程之间可以去交换数据,它与其他队列的区别在于,不是用一个数据结构去永久的存储数据,这里你想把数据一定能给别人使用,只有阻塞等待别人来匹配,用offer的方式,此时head == null 就会立马失败返回null