Disruptor概览

版本:3.4.2

使用案例

初始化

Disruptor<T> disruptor = new Disruptor<>(T::new, RING_BUFFER_SIZE,(Runnable r) -> new Thread(r, "MY-DISRUPTOR-THREAD"),ProducerType.MULTI,new SleepingWaitStrategy(50, TimeUnit.MICROSECONDS.toNanos(50)));disruptor.handleEventsWith(new MyFirstHandler()).then(new MySecondHandler);
disruptor.setDefaultExceptionHandler(new MyExceptionHandler());
disruptor.start();
ringBuffer = disruptor.getRingBuffer();

生产数据

long seq = ringBuffer.next();
try {T data = ringBuffer.get(seq);fillData(data);
} finally {ringBuffer.publish(seq);
}

生产者

AbstractSequencer

重点属性
属性类型默认值含义
bufferSizeint0缓存队列大小,例如:
消费者已消费的最小消费序列号:12,bufferSzie=8
生产者已生产的序列号移动到20时就需要等待(20 + 1 - 8 > 12),说明此时生产者已经转了一圈与消费者相遇
waitStrategyWaitStrategynull1. BlockingWaitStrategy:阻塞等待
2. BusySpinWaitStrategy:自旋
3. LiteBlockingWaitStrategy /LiteTimeoutBlockingWaitStrategy:原阻塞类策略的变体,尝试在锁无人争用时消除条件唤醒。
4. PhasedBackoffWaitStrategy:吞吐量与低延迟置换cpu资源的策略。自选一定时间后,让出cpu资源(yields),然后转为兜底策略等待唤醒:WaitStrategy
5. SleepingWaitStrategy:自旋+yield+休眠等待
6. TimeoutBlockingWaitStrategy:等待超时会抛出异常:TimeoutException
7. YieldingWaitStrategy:自旋+yield
cursorSequence-1当前生产者已生产的序列号
gatingSequencesSequence[]new Sequence[0]当前消费者已消费的序列号

MultiProducerSequencer

重点属性
属性类型默认值含义
availableBufferint[]new int[bufferSize]
默认每个下标处为-1
生产者已发布的有效序列号
gatingSequenceCacheSequence-1消费者已消费的最小序列号缓存
重点方法
  1. hasAvailableCapacity:是否有空间可以生产消息。
    1. (cursorValue + requiredCapacity) - bufferSize > cachedGatingSequence:生产者追上了消费者
    2. cachedGatingSequence > cursorValue:消费者已经消费完所有消息
    3. 满足条件a或者条件b。判断:(cursorValue + requiredCapacity) - bufferSize > min(gatingSequences)
    4. 是:返回没有剩余空间
    5. 否:返回有剩余空间
  2. next:下一个生产数据可用的序列号
    1. 自旋
    2. 满足hasAvailableCapacity
    3. 更新cachedGatingSequence
    4. CAS设置cursor序列号并退出自旋返回
    5. 不满足hasAvailableCapacity,则休眠LockSupport.parkNanos(1)
  3. remainingCapacity:剩余空间
    1. long consumed = Util.getMinimumSequence(gatingSequences, cursor.get());
    2. long produced = cursor.get();
    3. return getBufferSize() - (produced - consumed);
  4. publish:生产者发布序列号
    1. 将availableBuffer对应位置标志位设置为:有效
    2. 唤醒等待的消费者:com.lmax.disruptor.WaitStrategy#signalAllWhenBlocking

SingleProducerSequencer

重点属性
属性类型默认值含义
nextValuelong-1生产者已发布的有效序列号。
使用了缓存行填充
cachedValuelong-1消费者已消费的序列号缓存。
使用了缓存行填充
重点方法

与Multi算法完全一致,只是单生产者模式不需要考虑并发问题,没有CAS操作,并且使用了缓存行填充提升性能

消费者

BatchEventProcessor

在多生产者多消费者模式下,每个sequence会被所有processor消费。等同于rocketmq的广播消费模式(broadcast)

重点属性
属性类型默认值含义
sequenceBarrierSequenceBarrierProcessingSequenceBarrier消费者消费序列号屏障,如果存在依赖,保障依赖顺序消费
sequenceSequence-1当前消费者已消费的序列号
重点方法
  1. run
    1. 调用processEvents,自旋消费消息
    2. nextSequence = sequence.get() + 1L
    3. 阻塞等待有效消息发布的序列号availableSequence:sequenceBarrier.waitFor(nextSequence)
    4. 如果nextSequence <= availableSequence,自旋消费消息
    5. 更新本地sequence

WorkProcessor

在多生产者多消费者模式下,确保每个sequence在同一个group中只被一个processor消费,在同一个WorkPool(与EventHandlerGroup1:1关系,对应rocketmq中的消费者group)。等同于rocketmq的集群消费模式(group)

重点属性
属性类型默认值含义
sequenceBarrierSequenceBarrierProcessingSequenceBarrier消费者消费序列号屏障,如果存在依赖,保障依赖顺序消费
workSequenceSequenceringBuffer.getCursor()同group下所有handler共享的已消费序列号
CAS操作为sequence+1,当前正在消费的序列号
sequenceSequence-1当前消费者已消费的序列号
重点方法

消费强依赖生产者下标:com.lmax.disruptor.RingBuffer#getCursor,因为同一个group下的所有消费者只有一个handler可以消费本地sequence与生产者workSequence之间的消息

  1. run
    1. 自旋消费
    2. 如果本地sequence已经落后生产者workSequence,更新sequence
    3. 获取消息消费,阻塞等待有效消息发布:sequenceBarrier.waitFor(nextSequence)

等待消息策略

WaitStrategy

  1. waitFor:等待被唤醒消费指定序列号消息
  2. signalAllWhenBlocking:唤醒所有阻塞等待消费的线程

waitFor参数

  1. sequence:long类型,等待消费的序列号
  2. cursor:Sequence类型,ringbuffer主序列号(生产者发布的序列号),即更新时候会发送通知的序列号
  3. dependentSequence:Sequence类型(例如:FixedSequenceGroup,SequenceGroup)。依赖的序列号,要等到依赖的消费者消费到这些序列号之后,本地才能开始消费
  4. barrier:ProcessingSequenceBarrier

BlockingWaitStrategy

  1. 如果cursorSequence < sequence:阻塞等待:ReentrantLock.Condition.await()。因为生产者还没有发布该序列号的消息
  2. 如果存在依赖dependentSequence(默认为cursor),等待依赖消费者消费至依赖的序列号之后再开始消费:while ((availableSequence = dependentSequence.get()) < sequence)
  3. 返回availableSequence

BusySpinWaitStrategy

  1. 如果存在依赖dependentSequence(默认为cursor),等待依赖消费者消费至依赖的序列号之后再开始消费:while ((availableSequence = dependentSequence.get()) < sequence)
  2. 返回availableSequence

PhasedBackoffWaitStrategy

  1. 自旋
  2. 如果if ((availableSequence = dependentSequence.get()) >= sequence),即依赖消费者已经消费者依赖的序列号,直接返回availableSequence(默认为0)
  3. counter(常量10000)计算器递减后为0,startTime如果为0则更新为当前时间
  4. timeDelta=System.nanoTime() - startTime
  5. 如果timeDelta>yieldTimeoutNanos,委派兜底策略waitFor返回的序列号:fallbackStrategy.waitFor
  6. 如果timeDelta>spinTimeoutNanos,让出线程资源:Thread.yield()。
  7. 重置counter=常量10000
  8. 继续自旋

SleepingWaitStrategy

  1. 如果存在依赖dependentSequence(默认为cursor),等待依赖消费者消费至依赖的序列号之后再开始消费:while ((availableSequence = dependentSequence.get()) < sequence)
    1. 自旋
    2. 调用applyWaitMethod等待
      1. 如果counter>100,递减后继续自旋
      2. 如果counter>0,递减后让出线程资源,继续自旋
      3. 否则阻塞等待唤醒,超时后自动唤醒:LockSupport.parkNanos(sleepTimeNs)
  2. 返回availableSequence

YieldingWaitStrategy

与SleepingWaitStrategy策略类似,只是等待逻辑只有在counter==0的时候会让出线程资源一次,否则就递减自旋等待

LiteBlockingWaitStrategy(试验期)

BlockingWaitStrategy 的变体,尝试在锁无人争用时消除条件唤醒。增加本地变量signalNeeded(默认为false),如果为false的时候,说明没有锁竞争,此时不需要获取锁之后尝试唤醒等待的线程
signalNeeded只有在waitFor锁持有的时候才被更新为true

    @Overridepublic void signalAllWhenBlocking(){// false说明无锁竞争,不需要唤醒,直接退出if (signalNeeded.getAndSet(false)){lock.lock();try{processorNotifyCondition.signalAll();}finally{lock.unlock();}}}

LiteTimeoutBlockingWaitStrategy(试验期)

TimeoutBlockingWaitStrategy 的变体,尝试在锁无人争用时消除条件唤醒

TimeoutBlockingWaitStrategy

超时版本的BlockingWaitStrategy,即等待的时候有超时自动唤醒

模型

EventProcessorInfo

负责启动/暂停事件处理器并跟踪各阶段处理的序列号
EventProcessor:BatchEventProcessor,每一个EventHandler被封装为一个事件处理器
SequenceBarrier:ProcessingSequenceBarrier,等待序列号(Sequence)消息发布后处理消息

BatchEventProcessor

实际的事件消费者
SequenceBarrier:ProcessingSequenceBarrier
Sequence:默认-1

ProcessingSequenceBarrier

序列号处理通用逻辑
cursorSequence

  1. 如果不存在dependentSequences:则是cursor(即生产者已生产序列号)
  2. 否则:new FixedSequenceGroup(dependentSequences)

方法:waitFor

  1. 调用waitStrategy.waitFor返回有效序列号:availableSequence
  2. 如果availableSequence < sequence,返回availableSequence
  3. 否则返回(即availableSequence>=sequence):sequencer.getHighestPublishedSequence(sequence, availableSequence)
    1. SingleProducerSequencer:直接返回availableSequence
    2. MultiProducerSequencer:返回sequence与availableSequence之间第一个无效的序列号-1。否则返回availableSequence,即全都有效,那么返回最大值,即availableSequence。因为sequence与availableSequence之间都应该是有效的序列号,如果有一个无效的序列号,那么它的前一个位置一定是最大的有效序列号。官方注释为了确保该方法工作正常,传入的sequence应该是比上一个处理成功的序列号大1

EventHandlerGroup

then方法将父EventHandlerGroup所有processorSequences作为依赖传递给当前Handler列表的SequenceBarrier作为屏障实现按照依赖顺序消费
Disruptor.after方法:将指定handler列表对应的Sequence列表封装为EventHandlerGroup
image.png

ConsumerInfo

  1. WorkerPoolInfo:执行的handler被封装为WorkProcessor消费消息
  2. EventProcessorInfo:执行的handler被封装为EventProcessorInfo,自旋消费消息

Disruptor

ConsumerRepository

消费者仓库,存储所有消费者信息,即所有的event handler
consumerInfos:EventProcessorInfo

Executor

BasicExecutor,调用线程工厂启动线程执行命令

start

遍历consumerInfos启动:consumerInfo.start(executor)。每一个handler起一个线程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/764965.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

只看到真人版《武庚纪》的顶级特效?那你亏大了!

“一不留神就看6集”&#xff0c;一看一个不吱声&#xff0c;相信看过《烈焰》&#xff08;原名&#xff1a;武庚纪&#xff09;的观众或多或少都有和笔者一样的感受。 与其他国产剧不同的是&#xff0c;《烈焰》改编自动画《武庚纪》&#xff0c;“漫改”让他的人物装造更贴近…

基于python+vue超市在线销售系统的设计与实现flask-django-php-nodejs

根据此问题&#xff0c;研发一套超市在线销售系统&#xff0c;既能够大大提高信息的检索、变更与维护的工作效率&#xff0c;也能够方便信息系统的管理运用&#xff0c;从而减少信息管理成本&#xff0c;提高效率。 该超市在线销售系统采用B/S架构、并采用python语言以及django…

【Python 滑块不同的操作】对滑块进行处理,列如切割、还原、去除、无脑识别距离等等

文章日期&#xff1a;2024.03.23 使用工具&#xff1a;Python 类型&#xff1a;图片滑块验证的处理&#xff08;不限于识别距离&#xff09; 使用场景&#xff1a;&#xff1f; 文章全程已做去敏处理&#xff01;&#xff01;&#xff01; 【需要做的可联系我】 AES解密处理&a…

Python计算机二级选择易错题(三)

选择题第02&#xff0c;03&#xff0c;04套 题目来源&#xff1a;python计算机二级真题&#xff08;选择题&#xff09; - 知乎 选择题第02套 选择题第03套 选择题第04套 time()获取当前时间&#xff0c;即计算机内部时间&#xff0c;浮点数&#xff1b;import time库&#x…

用户多部门切换部门,MySQL根据多个部门id递归获取所有上级(祖级)、获取部门的全路径(全结构名称)

背景 之前做过的项目&#xff0c;都是一个用户就一个部门的&#xff0c;现在碰到个一个用户在多个部门的需求&#xff0c;而且需要可以切换不同部门查看不同数据。 就比如说一个大公司下面有多个子公司&#xff0c;每个子公司有好多部门、子部门等等&#xff0c;然后有部分用…

【赠书第21期】游戏力:竞技游戏设计实战教程

文章目录 前言 1 竞技游戏设计的核心要素 1.1 游戏机制 1.2 角色与技能 1.3 地图与环境 2 竞技游戏设计的策略与方法 2.1 以玩家为中心 2.2 不断迭代与优化 2.3 营造竞技氛围与社区文化 3 实战案例分析 4 结语 5 推荐图书 6 粉丝福利 前言 在数字化时代的浪潮中&…

Rust之构建命令行程序(五):环境变量

开发环境 Windows 11Rust 1.77.0 VS Code 1.87.2 项目工程 这次创建了新的工程minigrep. 使用环境变量 我们将通过添加一个额外的功能来改进minigrep:一个不区分大小写的搜索选项&#xff0c;用户可以通过环境变量打开该选项。我们可以将此功能设置为命令行选项&#xff0c;…

uniapp(vue3) H5页面连接打印机并打印

一、找到对应厂商打印机的驱动并在windows上面安装。查看是否安装完成可以在&#xff1a;控制面板->查看设备和打印机&#xff0c;找到对应打印机驱动是否安装完成 二、打印机USB连接电脑 三、运行代码调用浏览器打印&#xff0c;主要使用的是window.print()功能。下面使用…

前端学习笔记 | Node.js

一、Node.js入门 1、什么是Node.js 定义&#xff1a;是跨平台JS运行环境&#xff08;可以独立执行JS的环境&#xff09;作用&#xff1a; 编写数据接口&#xff0c;提供网页资源功能等等前端工程化&#xff1a;为后续学Vue和React等框架做铺垫 2、Node.js为何能执行JS&#xff…

python分类信息服务平台移动端的设计与实现flask-django-php-nodejs

分类信息服务平台设计的目的是为用户提供活动信息、活动记录等方面的平台。 与PC端应用程序相比&#xff0c;分类信息服务平台的设计主要面向于移动端&#xff0c;旨在为管理员和用户、商铺提供一个分类信息服务平台。用户可以通过Android及时查看活动信息等。 分类信息服务平台…

IDEA调优-四大基础配置-编码纵享丝滑

文章目录 1.JVM虚拟机选项配置2.多线程编译速度3.构建共享堆内存大小4.关闭不必要的插件 1.JVM虚拟机选项配置 -Xms128m -Xmx8192m -XX:ReservedCodeCacheSize1024m -XX:UseG1GC -XX:SoftRefLRUPolicyMSPerMB50 -XX:CICompilerCount2 -XX:HeapDumpOnOutOfMemoryError -XX:-Omi…

pytest之fixture结合conftest.py文件使用+断言实战

pytest之fixture结合conftest.py文件使用 conftest.py--存放固件固件的优先级pytest执行流程pytest之断言实战pytest结合allure-pytest插件生成美观的报告 conftest.py–存放固件 在一个项目的测试中&#xff0c;大多数情况下会有多个类、模块、或者包要使用相同的测试夹具。这…

kafka2.x版本配置SSL进行加密和身份验证

背景&#xff1a;找了一圈资料&#xff0c;都是东讲讲西讲讲&#xff0c;最后我还没搞好&#xff0c;最终决定参考官网说明。 官网指导手册地址&#xff1a;Apache Kafka 需要预备的知识&#xff0c;keytool和openssl 关于keytool的参考&#xff1a;keytool的使用-CSDN博客 …

Pytest测试框架+allure+jenkins自动化持续集成

Pytest是python的一种单元测试框架&#xff0c;可通过pytest 目录路径来运行测试用例 可以通过断言assert来测试是否通过 1.pytest测试用例命名规范 需严格遵循此规范&#xff0c;不然使用 pytest 目录 来运行会找不到该条测试用例。 可通过这样定义main函数&#xf…

Redis入门到实战-第二弹

Redis入门到实战 Redis安装官网地址Redis概述Redis-server安装Redis-stack-server使用(可选)Redisinsight安装(可选)更新计划 Redis安装 官网地址 声明: 由于操作系统, 版本更新等原因, 文章所列内容不一定100%复现, 还要以官方信息为准 https://redis.io/Redis概述 Redis是…

LabVIEW焓差试验室流量计现场自动校准系统

LabVIEW焓差试验室流量计现场自动校准系统 在现代工业和科研领域&#xff0c;流量计的准确性对于保证生产过程的质量和效率非常重要。开发了一种基于LabVIEW的焓差试验室流量计现场自动校准系统&#xff0c;通过提高流量计校准的准确性和效率。 在空调器空气焓值法能效测量装…

java网络原理(二)------TCP确认应答和超时重传

一Tcp协议 TCP&#xff0c;即Transmission Control Protocol&#xff0c;传输控制协议。人如其名&#xff0c;要对数据的传输进行一个详细的控制。 二.TCP协议段格式 知道了端口号才能进一步确认这个数据报交给了哪一个程序。16为端口号是2字节&#xff0c;范围是0到65535.如…

redis功能点

一、redis简介 概述 Redis 是速度非常快的非关系型&#xff08;NoSQL&#xff09;内存键值数据库&#xff0c;可以存储键和五种不同类型的值之间的映射。键的类型只能为字符串&#xff0c;值支持五种数据类型&#xff1a;字符串、列表、集合、散列表、有序集合。 Redis 支持很…

windows端给python重命名,快速将默认的python修改为 python3

问题点 在windows上&#xff0c;我们实际已经安装了python&#xff0c;但默认的是 python, 可能有的程序执行需要用到 python3&#xff0c;下面的方法可以快速将默认的python修改为 python3 解决方法 此方法需要保证windows上已经安装了python 1&#xff1a;首先找到系统的…

LED显示屏视频播放器的8大功能

随着中国LED显示屏企业的规模发展和产品技术的不断创新&#xff0c;LED显示屏在各个领域中的应用得到了广泛推广。然而&#xff0c;LED显示屏的出色表现离不开LED视频播放器这一关键设备的支持。下面将介绍LED视频播放器的8大功能&#xff0c;以及它们如何提升LED显示屏的显像效…