版本:3.4.2
使用案例
初始化
Disruptor<T> disruptor = new Disruptor<>(T::new, RING_BUFFER_SIZE,(Runnable r) -> new Thread(r, "MY-DISRUPTOR-THREAD"),ProducerType.MULTI,new SleepingWaitStrategy(50, TimeUnit.MICROSECONDS.toNanos(50)));disruptor.handleEventsWith(new MyFirstHandler()).then(new MySecondHandler);
disruptor.setDefaultExceptionHandler(new MyExceptionHandler());
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
生产数据
long seq = ringBuffer.next();
try {T data = ringBuffer.get(seq);fillData(data);
} finally {ringBuffer.publish(seq);
}
生产者
AbstractSequencer
重点属性
属性 | 类型 | 默认值 | 含义 |
---|---|---|---|
bufferSize | int | 0 | 缓存队列大小,例如: 消费者已消费的最小消费序列号:12,bufferSzie=8 生产者已生产的序列号移动到20时就需要等待(20 + 1 - 8 > 12),说明此时生产者已经转了一圈与消费者相遇 |
waitStrategy | WaitStrategy | null | 1. BlockingWaitStrategy:阻塞等待 2. BusySpinWaitStrategy:自旋 3. LiteBlockingWaitStrategy /LiteTimeoutBlockingWaitStrategy:原阻塞类策略的变体,尝试在锁无人争用时消除条件唤醒。 4. PhasedBackoffWaitStrategy:吞吐量与低延迟置换cpu资源的策略。自选一定时间后,让出cpu资源(yields),然后转为兜底策略等待唤醒:WaitStrategy 5. SleepingWaitStrategy:自旋+yield+休眠等待 6. TimeoutBlockingWaitStrategy:等待超时会抛出异常:TimeoutException 7. YieldingWaitStrategy:自旋+yield |
cursor | Sequence | -1 | 当前生产者已生产的序列号 |
gatingSequences | Sequence[] | new Sequence[0] | 当前消费者已消费的序列号 |
MultiProducerSequencer
重点属性
属性 | 类型 | 默认值 | 含义 |
---|---|---|---|
availableBuffer | int[] | new int[bufferSize] 默认每个下标处为-1 | 生产者已发布的有效序列号 |
gatingSequenceCache | Sequence | -1 | 消费者已消费的最小序列号缓存 |
重点方法
- hasAvailableCapacity:是否有空间可以生产消息。
- (cursorValue + requiredCapacity) - bufferSize > cachedGatingSequence:生产者追上了消费者
- cachedGatingSequence > cursorValue:消费者已经消费完所有消息
- 满足条件a或者条件b。判断:(cursorValue + requiredCapacity) - bufferSize > min(gatingSequences)
- 是:返回没有剩余空间
- 否:返回有剩余空间
- next:下一个生产数据可用的序列号
- 自旋
- 满足hasAvailableCapacity
- 更新cachedGatingSequence
- CAS设置cursor序列号并退出自旋返回
- 不满足hasAvailableCapacity,则休眠LockSupport.parkNanos(1)
- remainingCapacity:剩余空间
- long consumed = Util.getMinimumSequence(gatingSequences, cursor.get());
- long produced = cursor.get();
- return getBufferSize() - (produced - consumed);
- publish:生产者发布序列号
- 将availableBuffer对应位置标志位设置为:有效
- 唤醒等待的消费者:com.lmax.disruptor.WaitStrategy#signalAllWhenBlocking
SingleProducerSequencer
重点属性
属性 | 类型 | 默认值 | 含义 |
---|---|---|---|
nextValue | long | -1 | 生产者已发布的有效序列号。 使用了缓存行填充 |
cachedValue | long | -1 | 消费者已消费的序列号缓存。 使用了缓存行填充 |
重点方法
与Multi算法完全一致,只是单生产者模式不需要考虑并发问题,没有CAS操作,并且使用了缓存行填充提升性能
消费者
BatchEventProcessor
在多生产者多消费者模式下,每个sequence会被所有processor消费。等同于rocketmq的广播消费模式(broadcast)
重点属性
属性 | 类型 | 默认值 | 含义 |
---|---|---|---|
sequenceBarrier | SequenceBarrier | ProcessingSequenceBarrier | 消费者消费序列号屏障,如果存在依赖,保障依赖顺序消费 |
sequence | Sequence | -1 | 当前消费者已消费的序列号 |
重点方法
- run
- 调用processEvents,自旋消费消息
- nextSequence = sequence.get() + 1L
- 阻塞等待有效消息发布的序列号availableSequence:sequenceBarrier.waitFor(nextSequence)
- 如果nextSequence <= availableSequence,自旋消费消息
- 更新本地sequence
WorkProcessor
在多生产者多消费者模式下,确保每个sequence在同一个group中只被一个processor消费,在同一个WorkPool(与EventHandlerGroup1:1关系,对应rocketmq中的消费者group)。等同于rocketmq的集群消费模式(group)
重点属性
属性 | 类型 | 默认值 | 含义 |
---|---|---|---|
sequenceBarrier | SequenceBarrier | ProcessingSequenceBarrier | 消费者消费序列号屏障,如果存在依赖,保障依赖顺序消费 |
workSequence | Sequence | ringBuffer.getCursor() | 同group下所有handler共享的已消费序列号 CAS操作为sequence+1,当前正在消费的序列号 |
sequence | Sequence | -1 | 当前消费者已消费的序列号 |
重点方法
消费强依赖生产者下标:com.lmax.disruptor.RingBuffer#getCursor,因为同一个group下的所有消费者只有一个handler可以消费本地sequence与生产者workSequence之间的消息
- run
- 自旋消费
- 如果本地sequence已经落后生产者workSequence,更新sequence
- 获取消息消费,阻塞等待有效消息发布:sequenceBarrier.waitFor(nextSequence)
等待消息策略
WaitStrategy
- waitFor:等待被唤醒消费指定序列号消息
- signalAllWhenBlocking:唤醒所有阻塞等待消费的线程
waitFor参数
- sequence:long类型,等待消费的序列号
- cursor:Sequence类型,ringbuffer主序列号(生产者发布的序列号),即更新时候会发送通知的序列号
- dependentSequence:Sequence类型(例如:FixedSequenceGroup,SequenceGroup)。依赖的序列号,要等到依赖的消费者消费到这些序列号之后,本地才能开始消费
- barrier:ProcessingSequenceBarrier
BlockingWaitStrategy
- 如果cursorSequence < sequence:阻塞等待:ReentrantLock.Condition.await()。因为生产者还没有发布该序列号的消息
- 如果存在依赖dependentSequence(默认为cursor),等待依赖消费者消费至依赖的序列号之后再开始消费:while ((availableSequence = dependentSequence.get()) < sequence)
- 返回availableSequence
BusySpinWaitStrategy
- 如果存在依赖dependentSequence(默认为cursor),等待依赖消费者消费至依赖的序列号之后再开始消费:while ((availableSequence = dependentSequence.get()) < sequence)
- 返回availableSequence
PhasedBackoffWaitStrategy
- 自旋
- 如果if ((availableSequence = dependentSequence.get()) >= sequence),即依赖消费者已经消费者依赖的序列号,直接返回availableSequence(默认为0)
- counter(常量10000)计算器递减后为0,startTime如果为0则更新为当前时间
- timeDelta=System.nanoTime() - startTime
- 如果timeDelta>yieldTimeoutNanos,委派兜底策略waitFor返回的序列号:fallbackStrategy.waitFor
- 如果timeDelta>spinTimeoutNanos,让出线程资源:Thread.yield()。
- 重置counter=常量10000
- 继续自旋
SleepingWaitStrategy
- 如果存在依赖dependentSequence(默认为cursor),等待依赖消费者消费至依赖的序列号之后再开始消费:while ((availableSequence = dependentSequence.get()) < sequence)
- 自旋
- 调用applyWaitMethod等待
- 如果counter>100,递减后继续自旋
- 如果counter>0,递减后让出线程资源,继续自旋
- 否则阻塞等待唤醒,超时后自动唤醒:LockSupport.parkNanos(sleepTimeNs)
- 返回availableSequence
YieldingWaitStrategy
与SleepingWaitStrategy策略类似,只是等待逻辑只有在counter==0的时候会让出线程资源一次,否则就递减自旋等待
LiteBlockingWaitStrategy(试验期)
BlockingWaitStrategy 的变体,尝试在锁无人争用时消除条件唤醒。增加本地变量signalNeeded(默认为false),如果为false的时候,说明没有锁竞争,此时不需要获取锁之后尝试唤醒等待的线程
signalNeeded只有在waitFor锁持有的时候才被更新为true
@Overridepublic void signalAllWhenBlocking(){// false说明无锁竞争,不需要唤醒,直接退出if (signalNeeded.getAndSet(false)){lock.lock();try{processorNotifyCondition.signalAll();}finally{lock.unlock();}}}
LiteTimeoutBlockingWaitStrategy(试验期)
TimeoutBlockingWaitStrategy 的变体,尝试在锁无人争用时消除条件唤醒
TimeoutBlockingWaitStrategy
超时版本的BlockingWaitStrategy,即等待的时候有超时自动唤醒
模型
EventProcessorInfo
负责启动/暂停事件处理器并跟踪各阶段处理的序列号
EventProcessor:BatchEventProcessor,每一个EventHandler被封装为一个事件处理器
SequenceBarrier:ProcessingSequenceBarrier,等待序列号(Sequence)消息发布后处理消息
BatchEventProcessor
实际的事件消费者
SequenceBarrier:ProcessingSequenceBarrier
Sequence:默认-1
ProcessingSequenceBarrier
序列号处理通用逻辑
cursorSequence
- 如果不存在dependentSequences:则是cursor(即生产者已生产序列号)
- 否则:new FixedSequenceGroup(dependentSequences)
方法:waitFor
- 调用waitStrategy.waitFor返回有效序列号:availableSequence
- 如果availableSequence < sequence,返回availableSequence
- 否则返回(即availableSequence>=sequence):sequencer.getHighestPublishedSequence(sequence, availableSequence)
- SingleProducerSequencer:直接返回availableSequence
- MultiProducerSequencer:返回sequence与availableSequence之间第一个无效的序列号-1。否则返回availableSequence,即全都有效,那么返回最大值,即availableSequence。因为sequence与availableSequence之间都应该是有效的序列号,如果有一个无效的序列号,那么它的前一个位置一定是最大的有效序列号。官方注释为了确保该方法工作正常,传入的sequence应该是比上一个处理成功的序列号大1
EventHandlerGroup
then方法将父EventHandlerGroup所有processorSequences作为依赖传递给当前Handler列表的SequenceBarrier作为屏障实现按照依赖顺序消费
Disruptor.after方法:将指定handler列表对应的Sequence列表封装为EventHandlerGroup
ConsumerInfo
- WorkerPoolInfo:执行的handler被封装为WorkProcessor消费消息
- EventProcessorInfo:执行的handler被封装为EventProcessorInfo,自旋消费消息
Disruptor
ConsumerRepository
消费者仓库,存储所有消费者信息,即所有的event handler
consumerInfos:EventProcessorInfo
Executor
BasicExecutor,调用线程工厂启动线程执行命令
start
遍历consumerInfos启动:consumerInfo.start(executor)。每一个handler起一个线程