高性能内存队列Disruptor入门和实战

目录

Disruptor简介

Disruptor的设计方案

RingBuffer数据结构

一个生产者单线程写数据的流程

多个生产者写数据的流程

消费者读数据

多个生产者写数据

Disruptor核心概念

Disruptor的使用

单生产者单消费者模式

单生产者多消费者模式

多生产者多消费者模式

消费者优先级模式


Disruptor简介

        Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。注意,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。

Github:GitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library

Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。

juc下队列存在的问题

队列

描述

ArrayBlockingQueue

基于数组结构实现的一个有界阻塞队列

LinkedBlockingQueue

基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列

PriorityBlockingQueue

支持按优先级排序的无界阻塞队列

DelayQueue

基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列

SynchronousQueue

不存储元素的阻塞队列

LinkedTransferQueue

基于链表结构实现的一个无界阻塞队列

LinkedBlockingDeque

基于链表结构实现的一个双端阻塞队列

1. juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。

2. 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。

3. 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。

  • 元素位置定位

数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

  • 利用缓存行填充解决了伪共享的问题
  • 实现了基于事件驱动的生产者消费者模型(观察者模式)

消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费

RingBuffer数据结构

使用RingBuffer来作为队列的数据结构RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:

0

  • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
  • 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉

思考:能覆盖数据是否会导致数据丢失呢?

当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,比较常用的:

  • BlockingWaitStrategy策略,常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景
  • SleepingWaitStrategy策略,会在循环中不断等待数据。先进行自旋等待如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时。典型的应用场景就是异步日志。
  • YieldingWaitStrategy策略,这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。
  • BusySpinWaitStrategy策略: 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用

一个生产者单线程写数据的流程

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。

0

多个生产者写数据的流程

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

消费者读数据

生产者多线程写入的情况下读数据会复杂很多:

  1. 申请读取到序号n;
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  3. 消费者读取元素。

如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素。

0

多个生产者写数据

多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

0

Disruptor核心概念

  • RingBuffer(环形缓冲区):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。
  • Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。
  • Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
  • Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
  • SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
  • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
  • Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
  • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。
  • EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence

0

Disruptor构造器

public Disruptor(final EventFactory<T> eventFactory,final int ringBufferSize,final ThreadFactory threadFactory,final ProducerType producerType,...)
  • EventFactory:创建事件(任务)的工厂类。
  • ringBufferSize:容器的长度。
  • ThreadFactory :用于创建执行任务的线程。
  • ProductType:生产者类型:单生产者、多生产者。
  • WaitStrategy:等待策略。

Disruptor的使用

引入依赖

<!-- disruptor -->
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version>
</dependency>

单生产者单消费者模式

1.创建Event(消息载体/事件)和EventFactory(事件工厂)

创建 OrderEvent 类,这个类将会被放入环形队列中作为消息内容。创建OrderEventFactory类,用于创建OrderEvent事件

@Data
public class OrderEvent {private long value;private String name; 
}public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}
}

2. 创建消息(事件)生产者

创建 OrderEventProducer 类,它将作为生产者使用

public class OrderEventProducer {//事件队列private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void onData(long value,String name) {// 获取事件队列 的下一个槽long sequence = ringBuffer.next();try {//获取消息(事件)OrderEvent orderEvent = ringBuffer.get(sequence);// 写入消息数据orderEvent.setValue(value);orderEvent.setName(name);} catch (Exception e) {// TODO  异常处理e.printStackTrace();} finally {System.out.println("生产者发送数据value:"+value+",name:"+name);//发布事件ringBuffer.publish(sequence);}}

3.创建消费者

创建 OrderEventHandler 类,并实现 EventHandler ,作为消费者。

public class OrderEventHandler implements EventHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println("消费者获取数据value:"+ event.getValue()+",name:"+event.getName());}

4. 测试

public class DisruptorDemo {public static void main(String[] args) throws Exception {//创建disruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE, //单生产者new YieldingWaitStrategy()  //等待策略);//设置消费者用于处理RingBuffer的事件disruptor.handleEventsWith(new OrderEventHandler());disruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);//发送消息for(int i=0;i<100;i++){eventProducer.onData(i,"Fox"+i);}disruptor.shutdown();}

单生产者多消费者模式

如果消费者是多个,只需要在调用 handleEventsWith 方法时将多个消费者传递进去。

- disruptor.handleEventsWith(new OrderEventHandler());

上面传入的两个消费者会重复消费每一条消息,如果想实现一条消息在有多个消费者的情况下,只会被一个消费者消费,那么需要调用 handleEventsWithWorkerPool 方法。

- disruptor.handleEventsWith(new OrderEventHandler());

注意:消费者要实现WorkHandler接口

public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println("消费者"+ Thread.currentThread().getName()+"获取数据value:"+ event.getValue()+",name:"+event.getName());}@Overridepublic void onEvent(OrderEvent event) throws Exception {// TODO 消费逻辑System.out.println("消费者"+ Thread.currentThread().getName()+"获取数据value:"+ event.getValue()+",name:"+event.getName());}

多生产者多消费者模式

在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。

public class DisruptorDemo2 {public static void main(String[] args) throws Exception {//创建disruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.MULTI, //多生产者new YieldingWaitStrategy()  //等待策略);//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费者,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();new Thread(()->{//创建生产者OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 发送消息for(int i=0;i<100;i++){eventProducer.onData(i,"Fox"+i);}},"producer1").start();new Thread(()->{//创建生产者OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 发送消息for(int i=0;i<100;i++){eventProducer.onData(i,"monkey"+i);}},"producer2").start();//disruptor.shutdown();}

消费者优先级模式

在实际场景中,我们通常会因为业务逻辑而形成一条消费链。比如一个消息必须由 消费者A -> 消费者B -> 消费者C 的顺序依次进行消费。在配置消费者时,可以通过 .then 方法去实现顺序消费。

disruptor.handleEventsWith(new OrderEventHandler()).then(new OrderEventHandler())

handleEventsWith 与 handleEventsWithWorkerPool 都是支持 .then 的,它们可以结合使用。比如可以按照 消费者A -> (消费者B 消费者C) -> 消费者D 的消费顺序

disruptor.handleEventsWith(new OrderEventHandler()).thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler()).then(new OrderEventHandler());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/578450.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

5+双硫死亡+分型+实验,双硫死亡又上大分。干湿结合拿下5+

今天给同学们分享一篇生信文章“The role of molecular subtypes and immune infiltration characteristics based on disulfidptosis-associated genes in lung adenocarcinoma”&#xff0c;这篇文章发表在Aging (Albany NY)期刊上&#xff0c;影响因子为5.2。 结果解读&…

.Net7.0 或更高版本 System.Drawing.Common 上传图片跨平台方案

项目升级.Net7.0以后&#xff0c;System.Drawing.Common开关已经被删除&#xff0c;且System.Drawing.Common仅在 Windows 上支持 &#xff0c;于是想办法将原来上传图片验证文件名和获取图片扩展名方法替换一下&#xff0c;便开始搜索相关解决方案。 .Net6.0文档&#xff1a;…

Nature | Baker团队用AI设计出史上最高互作强度的蛋白质

蛋白质是生命的基础&#xff0c;是生命功能的主要执行者&#xff0c;其结构与功能由氨基酸序列所决定。蛋白质设计是指对新蛋白质分子进行人为的合理设计&#xff0c;旨在设计新的活性&#xff0c;行为或目的&#xff0c;并增进对蛋白质功能的基本了解。可以从头开始设计蛋白质…

有没有好用的视频提取文案工具推荐?

在如今这个快节奏的时代中&#xff0c;视频已成为人们记录和分享生活的重要媒介。当然有很多优秀的人&#xff0c;它们创作的视频文案或是演讲的台词、字幕等都非常精彩&#xff1b;难免有时候我们也会借鉴他人的优质内容供自己参考、修改等。那么怎么把这些内容自动提取出来呢…

高手写的CAN总线入门总结

关注菲益科公众号—>对话窗口发送 “CANoe ”或“INCA”&#xff0c;即可获得canoe入门到精通电子书和INCA软件安装包&#xff08;不带授权码&#xff09;下载地址。 1. 简介 CAN总线由德国BOSCH公司开发&#xff0c;最高速率可达到1Mbps。CAN的容错能力特别强&#xff0c;…

亚马逊鲲鹏AI智能养号系统助您快速养成买家号

在如今竞争激烈的电商市场&#xff0c;拥有一个优质的亚马逊买家号显得尤为重要。然而&#xff0c;要想提高账号的质量&#xff0c;不仅需要精心呵护&#xff0c;还需要借助先进的技术手段。亚马逊鲲鹏系统引入了AI智能养号功能&#xff0c;为买家们提供了更便捷、更智能的账号…

idea自动注释

前言 保存一下自己的自动注释代码 idea自动注释 前言1 创建类时&#xff0c;自动生成注释2 在方法上使用快捷键生成注释3 使用方法4 效果图 1 创建类时&#xff0c;自动生成注释 如下&#xff1a; #if (${PACKAGE_NAME} && ${PACKAGE_NAME} ! "")package …

基于机器学习算法的数据分析师薪资预测模型优化研究(文末送书)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

W5100S-EVB-Pico评估版介绍

文章目录 1 简介2 硬件资源2.1 硬件规格2.2 引脚定义2.3 工作条件 3 参考资料3.1 Datasheet3.2 原理图3.3 尺寸图&#xff08;单位&#xff1a;mm&#xff09;3.4 参考例程 4 硬件协议栈优势 1 简介 W5100S-EVB-Pico是一款基于树莓派RP2040和全硬件TCP/IP协议栈以太网芯片W5100…

Java基于TCP网络编程的群聊功能

服务端 import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List;public class Server2 {public static List<Socket> onlineList new ArrayList<>();public static void main(String[] args) throws Except…

java流浪动物保护系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web 流浪动物保护系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql…

kubernetes(k8s) Yaml 文件详解

YAML格式&#xff1a;用于配置和管理&#xff0c;YAML是一种简洁的非标记性语言&#xff0c;内容格式人性化&#xff0c;较易读。 1、查看API 资源版本标签 kubectl api-versions 2、编写资源配置清单 kubectl create -f nginx-test.yaml --validatefalse 2.3 查看创建的po…

氢燃料电池商用车系统架构开发与集成技术

一、国家及不同地区对氢能发展支持政策 近三年国家对氢能及燃料电池产业的支持政策 近年来22个省市的发展规划中提到了大力支持氢能源产业发展 二、燃料电池客车架构分解及国内外已有车型 未来燃料电池客车发展方向 未来燃料电池客车新增加的燃料电池堆产业链及供应商 国内外差…

Java毕业设计——vue+springboot音乐网站音乐播放器,歌曲管理系统

1&#xff0c;项目背景 随着计算机技术的发展&#xff0c;网络技术对我们生活和工作显得越来越重要&#xff0c;特别是现在信息高度发达的今天&#xff0c;人们对最新信息的需求和发布迫切的需要及时性。为了满足不同人们对网络需求&#xff0c;各种特色&#xff0c;各种主题的…

spring初始化bean之后执行某个方法

这个问题可以分两种解释&#xff1a; 1. 某个bean初始化执行? 2. 所有bean初始化后执行? 第一个问题可以在spring bean的生命周期中找到答案&#xff1a; bean定义-实例化-初始化-销毁。注意&#xff1a; 这里的bean定义是指所有的bean定义完成&#xff0c;然后才继续执…

猫头虎博主第六期赠书活动:《手机摄影短视频和后期从小白到高手》

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

MySQL中varchar和int隐式转换的注意事项!

一、前言 在一个阳光明媚的下午&#xff0c;我们的测试在运行SQL是发现了一个灵异事件。 别着急&#xff0c;等我慢慢说来&#xff0c;是一个查询库存的SQL&#xff0c;控制台打印了&#xff0c;查询为0条记录。 想着不太信&#xff0c;自己把SQL粘出来执行一下&#xff0c;刚…

【美团大数据面试】Java面试题附答案

目录 1.多线程代码示例 2.单例代码示例 3.LinkedBlockingQueue原理解析 4.模板设计模式讲解 5.生产者-消费者队列设计方法 6.堆内存和栈内存的区别 7.ThreadLocal底层机制 8.synchronized原理&#xff0c;存在的问题&#xff0c;解决方案 9.volatile使用场景和原理&am…

解析动态规划

本文由 简悦 SimpRead 转码&#xff0c; 原文地址 juejin.cn 前言 我们刷 leetcode 的时候&#xff0c;经常会遇到动态规划类型题目。动态规划问题非常非常经典&#xff0c;也很有技巧性&#xff0c;一般大厂都非常喜欢问。今天跟大家一起来学习动态规划的套路&#xff0c;文章…

突破PHP disable_functions方法

1. 利用 LD_PRELOAD 环境变量 知识扫盲 LD_PRELOAD&#xff1a;是Linux系统的一个环境变量&#xff0c;它指定的*.so文件会在程序本身的*.so文件之前被加载。putenv()&#xff1a;PHP函数&#xff0c;可以设置环境变量mail()&#xff0c;error_log()&#xff1a;PHP函数&…