异步编程 - 13 高性能线程间消息传递库 Disruptor

文章目录

  • Disruptor概述
    • Disruptor中的核心术语
    • Disruptor 流程图
  • Disruptor的特性详解
  • 基于Disruptor实现异步编程

在这里插入图片描述


Disruptor概述

Disruptor是一个高性能的线程间消息传递库,它源于LMAX对并发性、性能和非阻塞算法的研究,如今构成了其Exchange基础架构的核心部分。

要理解Disruptor是什么,最好的方法是将它与目前你已经很好地理解且与之非常相似的东西进行比较,例如与Java的BlockingQueue进行对比。

与队列一样,Disruptor的目的也是在同一进程内的线程之间传递数据(例如消息或事件);

而与传统JDK中的队列不同的是,Disruptor提供了以下关键功能:

  • Disruptor中的同一个消息会向所有消费者发送,即多播能力(Multicast Event)。

  • 为事件预先分配内存(Event Preallocation),避免运行时因频繁地进行垃圾回收与内存分配而增加开销。

  • 可选择无锁(Optionally Lock-free),使用两阶段协议,让多个线程可同时修改不同元素。

  • 缓存行填充,避免伪共享(prevent false sharing)。

Disruptor中的核心术语

在理解Disruptor如何工作前,先了解一下Disruptor中的核心术语,即Disruptor中的DDD(Domain-Driven Design)域对象:

  • Ring Buffer:环形缓冲区,通常被认为是Disruptor的核心,但是从3.0版本开始,Ring Buffer仅负责存储和更新Disruptor中的数据(事件)。

  • Sequence:Disruptor使用Sequence作为识别特定组件所在位置的方法。每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence。大多数并发代码依赖于这些Sequence值的移动,因此Sequence支持AtomicLong的许多当前功能。事实上,3.0版本与2.0版本之间唯一真正的区别是防止了Sequence和其他变量之间出现伪共享。

  • Sequencer:Sequencer是Disruptor的真正核心。该接口的2个实现(单生产者和多生产者)实现了所有并发算法,用于在生产者和消费者之间快速、正确地传递数据。

  • Sequence Barrier:序列屏障,由Sequencer产生,包含对Sequencer中主要发布者的序列Sequence和任何依赖的消费者的序列Sequence的引用。它包含了确定是否有可供消费者处理的事件的逻辑。

  • Wait Strategy:等待策略,确定消费者如何等待生产者将事件放入Disruptor。

  • Event:从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。

  • EventProcessor:用于处理来自Disruptor的事件的主事件循环,并拥有消费者序列的所有权。其有一个名为BatchEventProcessor的实现,它包含事件循环的有效实现,并将回调使用者提供的EventHandler接口实现(在线程池内运行BatchEventProcessor的run方法)。

  • EventHandler:由用户实现并代表Disruptor的消费者的接口。

  • Producer:调用Disruptor以将事件放入队列的用户代码。这个概念在代码中也没有具体表示。


Disruptor 流程图

介绍完Disruptor中的核心概念,我们将这些元素组合在一起,下所示为LMAX在其高性能核心服务中使用Disruptor的示例。图中有3个消费者,即日志记录JournalConsumer(将输入数据写入持久性日志文件)、复制ReplicationConsumer(将输入数据发送到另一台机器以确保存在数据的远程副本)和业务逻辑ApplicationConsumer(真正的处理工作),其中JournalConsumer和ReplicationConsumer是可以并行执行的。

在这里插入图片描述

【Disruptor示例流程图】

Producer向Disruptor的Ring Buffer中写入事件,消费者JournalConsumer和Replication Consumer(EventHandler)使用多播方式同时消费Ring Buffer中的每一个元素,两者都有各自的SequenceBarrier用来控制当前可消费Ring Buffer中的哪一个事件,并且当不存在可用事件时如何处理。消费者ApplicationConsumer则是等JournalConsumer和ReplicationConsumer对同一个元素处理完毕后,再处理该元素,这可以使用下图来概括。

在这里插入图片描述

【Disruptor示例流程简化图】

每个消费者持有自己的当前消费序号,由于是环形buffer,因而生产者写入事件时要看序号最小的消费者序号,以避免覆盖还没有被消费的事件,另外Consumer3只能消费已经被Consumer1、Consumer2都处理过的事件。


Disruptor的特性详解

Disruptor具有多播能力(Multicast),这是Java中队列和Disruptor之间最大的行为差异。

当有多个消费者在同一个Disruptor上监听事件时,所有事件都会发布给所有消费者,而Java队列中的每个事件只会发送给某一个消费者。Disruptor的行为旨在用于需要对同一数据进行独立的多个并行操作的情况。

Disruptor的目标之一是在低延迟环境中使用。在低延迟系统中,必须减少或移除运行时内存分配;在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿。为了支持这一点,用户可以预先为Disruptor中的事件分配其所需的存储空间(也就是声明Ring Buffer的大小)。在构造Ring Buffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的。

低延迟期望推动的另一个关键实现细节是使用无锁算法来实现Disruptor,所有内存可见性和正确性保证都是使用内存屏障(体现为volatile关键字)或CAS操作实现的。在Disruptor的实现中只有一种情况需要实际锁定——当使用BlockingWaitStrategy策略时,这仅仅是为了使用条件变量,以便在等待新事件到达时parked消费线程。许多低延迟系统将使用忙等待(busy-wait)来避免使用条件可能引起的抖动,但是大量在系统繁忙等待的操作可能导致性能显著下降,尤其是在CPU资源严重受限的情况下。

在JDK的BlockingQueue中添加或取出元素时是需要加独占锁的,通过锁来保证多线程对底层共享的数据结构进行并发读写的线程安全性,使用锁会导致同时只有一个线程可以向队列添加或删除元素。Disruptor则使用两阶段协议,让多个线程可同时修改不同元素,需要注意,消费元素时只能读取到已经提交的元素。在Disruptor中某个线程要访问Ring Buffer中某个序列号下对应的元素时,要先通过CAS操作获取对应元素的所有权(第一阶段),然后通过序列号获取对应的元素对象并对其中的属性进行修改,最后再发布元素(第二阶段),只有发布后的元素才可以被消费者读取。当多个线程写入元素时,它们都会先执行CAS操作,获取到Ring buffer中的某一个元素的所有权,然后可以并发对自己的元素进行修改。注意,只有序列号小的元素发布后,后面的元素才可以发布。可知相比使用锁,使用CAS大大减少了开销,提高了并发度。

其实在单生产者的情况下Disruptor甚至可以省去CAS的开销,因为在这种情况下,只有一个线程来请求修改Ring Buffer中的数据,生产者的序列号使用普通的long型变量即可。在创建Disruptor时是可以指定是单生产者还是多生产者的,如果你的业务就是单生产者模型,那么创建Disruptor时指定生产者模式为ProducerType.SINGLE效果会更好。

计算机系统中为了解决主内存与CPU运行速度的差距,在CPU与主内存之间添加了一级或多级高速缓冲存储器(Cache),这个Cache一般是集成到CPU内部的,所以也叫CPU Cache,下图所示为两级Cache结构。

在这里插入图片描述

【Cache结构图】

Cache内部是按行存储的,其中每一行称为一个Cache行(见下图)。Cache行是Cache与主内存进行数据交换的单位,大小一般为2的幂次数字节。

CPU访问某一个变量时,首先会去看CPU Cache内是否有该变量,如果有则直接从中获取,否则就去主内存里获取该变量,然后把该变量所在内存区域的一个Cache行大小的内存复制到Cache。由于存放到Cache行的是内存块而不是单个变量,所以可能会把多个变量存放到了一个Cache行。当多个线程同时修改一个Cache行里的多个变量时,由于同时只能有一个线程操作缓存行,因而相比每个变量放到一个Cache行性能会有所下降,这就是伪共享。

在这里插入图片描述

【Cache行】

如下图所示,变量x,y同时被放到了CPU的一级和二级缓存,当线程1使用CPU1对变量x进行更新时,首先会修改CPU1的一级缓存变量x所在缓存行,这时候缓存一致性协议会导致CPU2中变量x对应的缓存行失效,那么线程2写入变量x的时候就只能去二级缓存查找,这就破坏了一级缓存,而一级缓存比二级缓存更快,这里也说明了多个线程不可能同时去修改自己所使用的CPU中缓存行中相同缓存行里面的变量。

更坏的情况下CPU只有一级缓存,会导致频繁地直接访问主内存 。

在这里插入图片描述

【伪共享展示图】

Disruptor中的Ring Buffer底层是一个地址连续的数组,数组内相邻的元素很容易会被放入同一个Cache行里,从而导致伪共享的出现。Disruptor则通过缓存行填充,让数组中的每个元素独占一个缓存行从而解决了伪共享问题的出现。

另外为了避免Ring Buffer中序列号(定位元素的游标)与其他元素共享缓存行,对其也进行了缓存行填充,以提高访问序列号时缓存的命中率。


基于Disruptor实现异步编程

摘录官方的一个例子并稍加改造,例子的功能其实就是对上节介绍的例子的实现,这个例子中将含有一个生产者来生成元素,然后有两个消费者JournalConsumer和ReplicationConsumer并行消费同一个元素,等同一个元素都被消费后,消费者ApplicationConsumer再执行具体业务逻辑。

首先引入依赖包:

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version>
</dependency>

其次定义Ring Buffer中存放的事件对象,其定义如下:

public class LongEvent {private long value;public void set(long value) {this.value = value;}public long get() {return value;}
}

以上代码定义了事件类型LongEvent,其中包含业务参数value。为了让Disruptor框架预分配Ring Buffer中的事件对象,需要实现EventFactory接口提供一个构造事件对象的方法,代码如下:

public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {return new LongEvent();}
}

再次创建具体的消费者用来消费生产的元素,这需要实现EventHandler接口,实现3个消费者:

//将输入数据写入持久性日志文件的消费者
public class JournalConsumer implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {System.out.println(Thread.currentThread().getName() + "Persist Event: " + event.get());}
}//将输入数据发送到另一台机器以确保存在数据的远程副本的消费者
public class ReplicationConsumer implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {System.out.println(Thread.currentThread().getName() + "Replication Event: " + event.get());}
}//真正处理业务逻辑的消费者
public class ApplicationConsumer implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {System.out.println(Thread.currentThread().getName() + "Application Event: " + event.get());}
}

接着需要一个source来发布事件,source可以是来自于IO设备、网络、文件等的数据。下面使用原生API方式发布数据,发布者代码如下:

public class LongEventProducer {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void onData(long bb) {long sequence = ringBuffer.next(); // 8.1 第一阶段,获取序列号try {LongEvent event = ringBuffer.get(sequence); // 8.2 获取序列号对应的实体元素event.set(bb); // 8.3 修改元素的值} finally {ringBuffer.publish(sequence);// 8.4 发布元素}}
}

显然,事件发布变得比使用JDK中简单队列更复杂,这是由于对事件预分配的需求。它需要实现消息发布的两阶段,即第一阶段获取Ring Buffer的槽中对象并修改,第二阶段发布可用数据;还必须将发布包装在try/finally块中。如果在Ring Buffer中声明一个槽(调用RingBuffer.next()),那么必须发布这个序列,否则可能会导致序列状态被污染。

最后一步是将上面所有组件连接在一起。可以手动连接所有组件,但可能有点复杂,因此提供DSL以简化构造,组装代码如下:

public class LongEventMain {public static void main(String[] args) throws Exception {// 1.创建Ring Buffer中事件元素的工厂对象LongEventFactory factory = new LongEventFactory();// 2.指定Ring Buffer的大小,必须为2的幂次方int bufferSize = 1024;// 3.构造DisruptorDisruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE,new BlockingWaitStrategy());// 4.注册消费者disruptor.handleEventsWith(new JournalConsumer(), new ReplicationConsumer()).then(new ApplicationConsumer());// 5.启动Disruptor, 启动线程运行disruptor.start();// 6.从Disruptor中获取Ring BufferRingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// 7. 创建生产者LongEventProducer producer = new LongEventProducer(ringBuffer);// 8. 生产元素,并放入Ring Bufferfor (long l = 0; l < 100; l++) {producer.onData(l);Thread.sleep(1000);}// 9.挂起当前线程Thread.currentThread().join();}
}

在上述代码中,代码1创建了一个事件对象生成的工厂对象;代码2指定Ring Buffer的大小;代码3构造Disruptor对象,其构造函数内会根据bufferSize的大小调用LongEventFactory创建对应个数的事件对象(事件预分配),并且这里使用DaemonThreadFactory.INSTANCE作为其背后异步任务调用的线程池(当然也可以传递自己的线程池)。另外,因为只有一个生产者,所以生产者模式设置为了ProducerType.SINGLE以便遵循Single Writer原则;最后设置Ring Buffer的等待策略为Blocking-WaitStrategy。

代码4注册消费者,注册了JournalConsumer、ReplicationConsumer和Application Consumer三个消费者,旨在等同一个元素被JournalConsumer和ReplicationConsumer消费后,ApplicationConsumer才可以消费对应的元素。

代码4执行完毕后框架还没启动,代码5的作用是启动框架内的线程;代码6从Disruptor中获取Ring Buffer,以便在后面向里面写入事件;代码7创建了一个生产者LongEventProducer实例并且把ringbuffer作为构造函数;代码8则循环创建100个数据,然后调用LongEventProducer的onData方法把事件发送出去,这个发送操作是异步的,会马上返回。

LongEventProducer的onData方法内代码8.1首先执行两阶段的第一阶段,也就是获取当前Ring Buffer中的序列号;代码8.2获取对应序列号对应的事件对象;代码8.3修改对象的属性;代码8.4则发布事件,发布后,其他消费者就对该元素可见了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72926.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探究IP路由的工作原理与路由表查找规则

文章目录 一、定义二、IP连通的前提三、路由表1. 作用2. 路由表字段内容3. 路由表查表规则4. 路由信息的来源5. 路由表写表规则6. 路由优先级 四、常用命令 首先可以看下思维导图&#xff0c;以便更好的理解接下来的内容。 一、定义 路由器是网络中负责将数据报文在不同IP网段…

git在linux情况下设置git 命令高亮

只需要执行下面这个命令&#xff0c;这样就可以在查看git status明亮的时候高亮显示。 git config --global color.status auto未设置前 谁知之后

【Unity3D赛车游戏优化篇】【八】汽车实现镜头的流畅跟随,以及不同角度的切换

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

Redis功能实战篇之附近商户

在互联网的app当中&#xff0c;特别是像美团&#xff0c;饿了么等app。经常会看到附件美食或者商家&#xff0c; 当我们点击美食之后&#xff0c;会出现一系列的商家&#xff0c;商家中可以按照多种排序方式&#xff0c;我们此时关注的是距离&#xff0c;这个地方就需要使用到我…

高等数学笔记

|sinx|连续不可导 只要在x0处存在极限且极限等于f(x0)则函数在此处连续 如果某点可导则左右导数应该相等&#xff08;可导一定连续&#xff0c;连续不一定可导&#xff09; 双曲函数的由来 塞入dx 莱布尼茨公式 sin(nx)的k次导n^k*sin(nxkΠ/2) 注意符号&#xff01; 二阶导公…

树的基本概念和存储结构

一、树的基本概念 1、树的定义 树是n&#xff08;n>0&#xff09;个结点的有限集。当n 0时&#xff0c;称为空树。在任意一棵非空树中应满足&#xff1a; 1、有且仅有一个特定的称为根的结点。 2、当n>1时&#xff0c;其余节点可分为m&#xff08;m>0&#xff09…

Zookeeper简述

数新网络-让每个人享受数据的价值 官网现已全新升级—欢迎访问&#xff01; 前 言 ZooKeeper是一个开源的、高可用的、分布式的协调服务&#xff0c;由Apache软件基金会维护。它旨在帮助管理和协调分布式系统和应用程序&#xff0c;提供了一个可靠的平台&#xff0c;用于处理…

Qt配置使用MSVC编译器

Qt配置使用MSVC编译器_qt msvc-CSDN博客注意:Qt支持的MSVC就是2017和2015&#xff0c;所以vs也要下载2017&#xff0c;不要直接用最新的&#xff0c;安装路径都用默认的。程序运行失败时可以尝试windeployqt拷贝库文件到本地&#xff0c;然后有可能就能运行了。VS官网下载Visua…

一个帮各位填秋招表格省一点事的浏览器插件

最近应该很多和我一样的双非鼠鼠在秋招等面试&#xff0c;而且处于海投阶段&#xff0c;为了不忘记投了哪些公司&#xff0c;可以用这样一个表格来记录&#xff1a; 其中有些字段&#xff0c;比如状态、投递时间、查看进度的网址其实可以不手动输入&#xff0c;所以搞个插件来…

CESM2代码下载

这半年忙着毕业写论文&#xff0c;好久好久好久不更新了∠( ω)&#xff0f; &#xff0c;今天准备开个新坑 ๑乛◡乛๑&#xff0c;学习一下CESM&#xff08;Community Earth System Model&#xff09;&#xff0c;它是一个完全耦合的全球气候模型&#xff0c;可用于地球过去、…

智能机器人:打造自动化未来的关键技术

文章目录 1. 智能机器人的基本概念2. 智能机器人的关键技术2.1 机器视觉2.2 机器学习与深度学习2.3 传感器技术 3. 智能机器人的应用领域3.1 制造业3.2 医疗保健3.3 农业3.4 服务业 4. 智能机器人的未来趋势4.1 自主决策能力的提升4.2 协作与互操作性4.3 个性化定制4.4 环境感知…

基于uwb和IMU融合的三维空间定位算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ..........................................................................kkk 0; for E…

Mac系统,webots和pycharm联合仿真,配置问题解决方案!

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 问题描述&#xff1a;mac系统下&#xff0c;webots和pycharm 联合仿真&#xff0c;适配问题 问题描述 提示&#xff1a;这里描述项目中遇到的问题&#xff1a; 换mac电脑了&#xff0c;需要用到web…

2023数模国赛C 题 蔬菜类商品的自动定价与补货决策-完整版创新多思路详解(含代码)

题目简评&#xff1a;看下来C题是三道题目里简单一些的&#xff0c;考察的点比较综合&#xff0c;偏数据分析。涉及预测模型和运筹优化(线性规划)&#xff0c;还设了一问开放型问题&#xff0c;适合新手入门&#xff0c;发挥空间大。 题目分析与思路&#xff1a; 背景&#x…

【JMeter】 二次开发插件开发 Dubbo 接口测试插件浅析

概述 在一些企业中&#xff0c;各类业务系统非常丰富&#xff0c;相互之间或对外提供很多的服务或接口这些服务或接口中&#xff0c;有很多是需要强契约约束的&#xff0c;服务的提供方、服务的使用方必须遵守相同契约这类服务最典型的就是RPC&#xff0c;其中应用广泛的有Dub…

即拼七人拼团系统开发模式是怎么盈利赚钱的?

即拼七人拼团是市场上最近比较火爆的一款商业模式&#xff0c;它结合了二二复制和拼团两种模式玩法&#xff0c;不仅能让消费者从中获利&#xff0c;还能让平台快速获流裂变&#xff0c;对平台起盘初期和发展中期具有很强的推广能力。那么这个模式是怎么盈利赚钱的呢&#xff1…

Tomcat 日志乱码问题解决

我就是三井&#xff0c;一个永不放弃希望的男人。——《灌篮高手》 Tomcat 日志乱码问题解决 乱码原因&#xff1a;字符编码不一致 如&#xff1a;国内电脑一般都是GBK编码&#xff0c;而Tomcat日志使用的是UTF-8编码 解决方法&#xff1a;将对应字符编码由 UTF-8 改为 GBK 即…

案例实战-Spring boot Web

准备工作 需求&环境搭建 需求&#xff1a; 部门管理&#xff1a; 查询部门列表 删除部门 新增部门 修改部门 员工管理 查询员工列表&#xff08;分页、条件&#xff09; 删除员工 新增员工 修改员工 环境搭建 准备数据库表&#xff08;dept、emp&#xff09; -- 部门管理…

linux设置登录超时自动退出

问题背景 最近登录某台linux服务器&#xff0c;经常遇到超时自动退出现象&#xff0c;如下图&#xff1a; 是因为服务器设置了超时时间&#xff0c;如果某个超时时间段内服务器没有任何操作&#xff0c;则会自动注销 解决方法 查看服务器设置的超时时间(TMOUT 变量的值)&am…

深浅拷贝与赋值

数据类型 数据类型 在JavaScript中&#xff0c;数据类型有两大类。一类是基本数据类型&#xff0c;一类是引用数据类型。 基本数据类型有六种&#xff1a;number、string、boolean、null、undefined、symbol。 基本数据类型存放在栈中。存放在栈中的数据具有数据大小确定&a…