归档
使用示例
https://github.com/zengxf/small-frame-demo/blob/master/multi-thread/reactive-test/reactor-demo/src/main/java/cn/zxf/reactor_demo/jdk/PubSubTest.java
JDK 版本
openjdk version "17" 2021 - 09 - 14
OpenJDK Runtime Environment ( build 17 + 35 - 2724 )
OpenJDK 64 - Bit Server VM ( build 17 + 35 - 2724 , mixed mode, sharing)
原理
关键类
java.util.concurrent.SubmissionPublisher
public class SubmissionPublisher < T > implements Publisher < T > , AutoCloseable { BufferedSubscription < T > clients; final ReentrantLock lock; volatile boolean closed; boolean subscribed; Thread owner; volatile Throwable closedException; final Executor executor; final BiConsumer < ? super Subscriber < ? super T > , ? super Throwable > onNextHandler; final int maxBufferCapacity; public SubmissionPublisher ( ) { this ( ASYNC_POOL , Flow . defaultBufferSize ( ) , null ) ; } public SubmissionPublisher ( Executor executor, int maxBufferCapacity, BiConsumer < ? super Subscriber < ? super T > , ? super Throwable > handler) { . . . this . lock = new ReentrantLock ( ) ; this . executor = executor; this . onNextHandler = handler; this . maxBufferCapacity = roundCapacity ( maxBufferCapacity) ; }
}
订阅
java.util.concurrent.SubmissionPublisher
public void subscribe ( Subscriber < ? super T > subscriber) { if ( subscriber == null ) throw new NullPointerException ( ) ; ReentrantLock lock = this . lock; int max = maxBufferCapacity; Object [ ] array = new Object [ max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY ] ; BufferedSubscription < T > subscription = new BufferedSubscription < T > ( subscriber, executor, onNextHandler, array, max) ; lock. lock ( ) ; try { if ( ! subscribed) { subscribed = true ; owner = Thread . currentThread ( ) ; } for ( BufferedSubscription < T > b = clients, pred = null ; ; ) { if ( b == null ) { Throwable ex; subscription. onSubscribe ( ) ; if ( ( ex = closedException) != null ) subscription. onError ( ex) ; else if ( closed) subscription. onComplete ( ) ; else if ( pred == null ) clients = subscription; else pred. next = subscription; break ; } BufferedSubscription < T > next = b. next; if ( b. isClosed ( ) ) { b. next = null ; if ( pred == null ) clients = next; else pred. next = next; } else if ( subscriber. equals ( b. subscriber) ) { b. onError ( new IllegalStateException ( "Duplicate subscribe" ) ) ; break ; } else pred = b; b = next; } } finally { lock. unlock ( ) ; } }
java.util.concurrent.SubmissionPublisher.BufferedSubscription
static final class BufferedSubscription < T > implements Subscription , ForkJoinPool. ManagedBlocker { long timeout; int head; int tail; final int maxCapacity; volatile int ctl; Object [ ] array; final Subscriber < ? super T > subscriber; final BiConsumer < ? super Subscriber < ? super T > , ? super Throwable > onNextHandler; Executor executor; BufferedSubscription < T > next; BufferedSubscription ( Subscriber < ? super T > subscriber, Executor executor, BiConsumer < ? super Subscriber < ? super T > , ? super Throwable > onNextHandler, Object [ ] array, int maxBufferCapacity ) { this . subscriber = subscriber; this . executor = executor; this . onNextHandler = onNextHandler; this . array = array; this . maxCapacity = maxBufferCapacity; } }
提交数据
java.util.concurrent.SubmissionPublisher
public int submit ( T item) { return doOffer ( item, Long . MAX_VALUE , null ) ; } private int doOffer ( T item, long nanos, BiPredicate < Subscriber < ? super T > , ? super T > onDrop) { if ( item == null ) throw new NullPointerException ( ) ; int lag = 0 ; boolean complete, unowned; ReentrantLock lock = this . lock; lock. lock ( ) ; try { Thread t = Thread . currentThread ( ) , o; BufferedSubscription < T > b = clients; if ( ( unowned = ( ( o = owner) != t) ) && o != null ) owner = null ; if ( b == null ) complete = closed; else { complete = false ; boolean cleanMe = false ; BufferedSubscription < T > retries = null , rtail = null , next; do { next = b. next; int stat = b. offer ( item, unowned) ; . . . } while ( ( b = next) != null ) ; . . . } } finally { lock. unlock ( ) ; } . . . }
java.util.concurrent.SubmissionPublisher.BufferedSubscription
final int offer ( T item, boolean unowned) { Object [ ] a; int stat = 0 , cap = ( ( a = array) == null ) ? 0 : a. length; int t = tail, i = t & ( cap - 1 ) , n = t + 1 - head; if ( cap > 0 ) { boolean added; if ( n >= cap && cap < maxCapacity) added = growAndOffer ( item, a, t) ; else if ( n >= cap || unowned) added = QA . compareAndSet ( a, i, null , item) ; else { QA . setRelease ( a, i, item) ; added = true ; } if ( added) { tail = t + 1 ; stat = n; } } return startOnOffer ( stat) ; } final int startOnOffer ( int stat) { int c; if ( ( ( c = ctl) & ( REQS | ACTIVE ) ) == REQS && ( ( c = getAndBitwiseOrCtl ( RUN | ACTIVE ) ) & ( RUN | CLOSED ) ) == 0 ) tryStart ( ) ; . . . return stat; } final void tryStart ( ) { try { Executor e; ConsumerTask < T > task = new ConsumerTask < T > ( this ) ; if ( ( e = executor) != null ) e. execute ( task) ; } . . . } final void consume ( ) { Subscriber < ? super T > s; if ( ( s = subscriber) != null ) { subscribeOnOpen ( s) ; long d = demand; for ( int h = head, t = tail; ; ) { int c, taken; boolean empty; if ( ( ( c = ctl) & ERROR ) != 0 ) { closeOnError ( s, null ) ; break ; } else if ( ( taken = takeItems ( s, d, h) ) > 0 ) { head = h += taken; d = subtractDemand ( taken) ; } . . . else if ( t == ( t = tail) ) { if ( ( empty = ( t == h) ) && ( c & COMPLETE ) != 0 ) { closeOnComplete ( s) ; break ; } . . . } } } } final int takeItems ( Subscriber < ? super T > s, long d, int h) { Object [ ] a; int k = 0 , cap; if ( ( a = array) != null && ( cap = a. length) > 0 ) { int m = cap - 1 , b = ( m >>> 3 ) + 1 ; int n = ( d < ( long ) b) ? ( int ) d : b; for ( ; k < n; ++ h, ++ k) { Object x = QA . getAndSet ( a, h & m, null ) ; . . . else if ( ! consumeNext ( s, x) ) break ; } } return k; }
java.util.concurrent.SubmissionPublisher.ConsumerTask
static final class ConsumerTask < T > extends ForkJoinTask < Void > implements Runnable , CompletableFuture. AsynchronousCompletionTask { final BufferedSubscription < T > consumer; ConsumerTask ( BufferedSubscription < T > consumer) { this . consumer = consumer; } . . . public final void run ( ) { consumer. consume ( ) ; } }
关闭
背压
看代码或调试时,没发现 publisher
暂停的代码,可用 JConsole 查看线程栈
...
java. base@17 / jdk. internal. misc. Unsafe. park ( Native Method)
... . locks. LockSupport. park ( LockSupport. java: 211 )
... . SubmissionPublisher$BufferedSubscription. block ( SubmissionPublisher. java: 1495 )
... . ForkJoinPool. unmanagedBlock ( ForkJoinPool. java: 3463 )
... . ForkJoinPool. managedBlock ( ForkJoinPool. java: 3434 )
... . SubmissionPublisher$BufferedSubscription. awaitSpace ( SubmissionPublisher. java: 1462 )
...
java.util.concurrent.SubmissionPublisher
private int doOffer ( T item, long nanos, BiPredicate < Subscriber < ? super T > , ? super T > onDrop) { int lag = 0 ; . . . try { Thread t = Thread . currentThread ( ) , o; if ( ( unowned = ( ( o = owner) != t) ) && o != null ) . . . if ( retries != null || cleanMe) lag = retryOffer ( item, nanos, onDrop, retries, lag, cleanMe) ; } . . . . . . } private int retryOffer ( T item, long nanos, BiPredicate < Subscriber < ? super T > , ? super T > onDrop, BufferedSubscription < T > retries, int lag, boolean cleanMe) { for ( BufferedSubscription < T > r = retries; r != null ; ) { BufferedSubscription < T > nextRetry = r. nextRetry; r. nextRetry = null ; if ( nanos > 0L ) r. awaitSpace ( nanos) ; . . . } . . . return lag; }
java.util.concurrent.SubmissionPublisher.BufferedSubscription
static final class BufferedSubscription < T > implements Subscription , ForkJoinPool. ManagedBlocker { final void awaitSpace ( long nanos) { if ( ! isReleasable ( ) ) { ForkJoinPool . helpAsyncBlocker ( executor, this ) ; if ( ! isReleasable ( ) ) { timeout = nanos; try { ForkJoinPool . managedBlock ( this ) ; } . . . } } } @Override public final boolean block ( ) { . . . while ( ! isReleasable ( ) ) { . . . else if ( waiter == null ) waiter = Thread . currentThread ( ) ; . . . else LockSupport . park ( this ) ; } . . . } final int takeItems ( Subscriber < ? super T > s, long d, int h) { . . . if ( waiting != 0 ) signalWaiter ( ) ; . . . } final void signalWaiter ( ) { Thread w; waiting = 0 ; if ( ( w = waiter) != null ) LockSupport . unpark ( w) ; } }