单机无锁线程安全队列-Disruptor

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>4.0.0</version></dependency>

定义一个商品

@Data
public class Goods {private String name;}

定义生产者

public class Producer {private final RingBuffer<Goods> ringBuffer;public Producer(RingBuffer<Goods> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 生产货品* @param goodsName*/public void onData(String goodsName) {long sequence = ringBuffer.next();try {Goods goods = ringBuffer.get(sequence);goods.setName(goodsName);} finally {ringBuffer.publish(sequence);}}
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{private String name;public Consumer(String name){this.name = name;}@Overridepublic void onEvent(Goods goods, long l, boolean b)  {//消费者接收到货品System.out.println(name+"消费了"+goods.getName());}@Overridepublic void onBatchStart(long batchSize, long queueDepth) {EventHandler.super.onBatchStart(batchSize, queueDepth);}@Overridepublic void onStart() {EventHandler.super.onStart();}@Overridepublic void onShutdown() {EventHandler.super.onShutdown();}@Overridepublic void onTimeout(long sequence) throws Exception {EventHandler.super.onTimeout(sequence);}@Overridepublic void setSequenceCallback(Sequence sequenceCallback) {EventHandler.super.setSequenceCallback(sequenceCallback);}
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.SINGLE,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//单生产者,单消费者disruptor.handleEventsWith(new Consumer("Consumer1"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//多个消费者按顺序消费disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3")).then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer1 = new Producer(ringBuffer);Producer producer2 = new Producer(ringBuffer);Producer producer3 = new Producer(ringBuffer);while (true){producer1.onData("goods"+UUID.randomUUID());producer2.onData("goods"+UUID.randomUUID());producer3.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/199455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpectralGPT: Spectral Foundation Model 论文翻译3

遥感领域的通用大模型 2023.11.13在CVPR发表 原文地址&#xff1a;[2311.07113] SpectralGPT: Spectral Foundation Model (arxiv.org) E.消融研究 在预训练阶段&#xff0c;我们对可能影响下游任务表现的各种因素进行了全面研究。这些因素包括掩蔽比、ViT patch大小、数据规…

多线程--11--ConcurrentHashMap

ConcurrentHashMap与HashMap等的区别 HashMap线程不安全 我们知道HashMap是线程不安全的&#xff0c;在多线程环境下&#xff0c;使用Hashmap进行put操作会引起死循环&#xff0c;导致CPU利用率接近100%&#xff0c;所以在并发情况下不能使用HashMap。 ConcurrentHashMap 主…

Linux信息收集

Linux信息收集 本机基本信息 #管理员 $普通用户 之前表示登录的用户名称&#xff0c;之后表示主机名&#xff0c;再之后表示当前所在目录 / 表示根目录 ~表示当前用户家目录1、内核&#xff0c;操作系统和设备信息 uname -a 打印所有可用的系统信息 uname -r 内核版本 u…

01_阿里云_Xshell连接服务器

PC使用Xshell连接阿里云服务器 问题引出 之前使用Xshell连接阿里云服务器连接的好好的&#xff0c;今天准备上去服务器学习Linux发现连不上了&#xff0c;后来发现是防火墙的问题&#xff0c;还有阿里云的安全组也需要设置 解决方案 方法一&#xff1a;&#xff08;简单粗暴…

3D模型材质编辑

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 如今&#xff0c;3D 纹理、打印和建模都非常流行。使用可用的高级工具&#xff0c;创建 3D 模型…

基于Git的代码工程管理——学习记录一

一、Git简概[1] Git是一个分布式版本控制系统&#xff0c;它跟踪任何一组计算机文件的更改&#xff0c;通常用于在软件开发过程中协调协作开发源代码的程序员之间的工作。其为实现快速、数据完整性以及分布式非线性工作流程&#xff08;在不同计算机上运行数千个并行分支&#…

钉钉提交审批意见,并上传附件接口集成

一&#xff1a;适配器 DingtalkApprovalFilesExecute 参考方案链接&#xff1a;轻易云数据集成平台 二&#xff1a;请求接口。配置参数 接口文档&#xff1a;使用了新旧接口 服务端API发起带有附件的审批流并下载附件 - 钉钉开放平台 接口&#xff1a;topapi/processinsta…

Python编程技巧:多层for循环的高级应用

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com Python的for循环结构是编程中最基础也是最常用的控制结构之一。通过for循环&#xff0c;可以轻松遍历数据集合和执行重复的操作。然而&#xff0c;当我们面对多层for循环时&#xff0c;性能和可读性可能会成为挑…

Redis的三种消息队列实现方式

目录 前言 List实现消息队列 PubSub消息队列 Stream消息队列 三种实现方式对比 前言 为什么要使用Redis的消息队列&#xff1f; 成本低&#xff0c;对于RabbitMQ或是Kafka来说&#xff0c;已经是重量级的消息队列。 Redis的三种实现方式&#xff1a; List结构&#xff1…

【安卓12源码】WMS系列:addWindow 和 removeWindow流程

一、Window 的属性 Window的属性定义在WindowManager的内部类LayoutParams中&#xff0c;了解Window的属性能够更好的理解WMS的内部原理。Window的属性有很多种&#xff0c;与应用开发最密切的有三种&#xff0c;它们分别是Type(Window的类型)、Flag(Window的标志)和SoftInputM…

SMART PLC温度采集模块温度转换FC(梯形图+SCL代码)

对于模拟量输入采集,温度变送器等我们可以利用线性转换功能块完成温度采集,西门子PLC有温度采集模块,利用温度采集模块采集温度我们的转换关系无需进行线性变换,下面我们具体介绍。温度采集线性转换功能块请参考下面的文章链接: https://rxxw-control.blog.csdn.net/arti…

Hadoop学习笔记(HDP)-Part.06 安装OracleJDK

目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …

外包干了8个月,技术退步明显.......

先说一下自己的情况&#xff0c;大专生&#xff0c;18年通过校招进入武汉某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落! 而我已经在一个企业干了四年的功能测…

HarmonyOS带大家创建自己的第一个Page页面并实现路由跳转

我们 在开发过程中 经常会看到 被 艾特修饰的代码 有限像 java中的注解 在 harmonyOS 中 这叫 装饰器 被关键字装饰取来的代码 会具备某某功能 我们这里先来创建一个新的界面 在pages 目录下 右键 如下图 选择page创建 这里 我们取名叫 AppView 然后点击右下角 Finish 这样…

P1006 [NOIP2008 提高组] 传纸条

洛谷的题 网址&#xff1a;P1006 [NOIP2008 提高组] 传纸条 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 还是动态规划&#xff0c;这题和我上一篇博客写的题差不多 区别在于&#xff0c;这个地图不再是方阵&#xff0c;路线不能交叉&#xff0c;而且地图的大小可能大得多…

IDEA中,光标移动快捷键(Shift + 滚轮前后滚动:当前文件的横向滚动轴滚动。)

除此之外&#xff0c;其他常用的光标移动快捷键包括&#xff1a; Shift 滚轮前后滚动&#xff1a;当前文件的横向滚动轴滚动。Shiftenter&#xff1a;快速将鼠标移动到下一行。Ctrl ]&#xff1a;移动光标到当前所在代码的花括号结束位置。Ctrl 左方向键&#xff1a;光标跳转…

内衣迷你洗衣机什么牌子好?好用不贵的内衣洗衣机推荐

由于内衣洗衣机在目前的市场上越来越受欢迎&#xff0c;使得不少的小伙伴都在犹豫要不要为自己入手一台专用的内衣洗衣机&#xff0c;专门来清洗一些内衣裤等等贴身衣物&#xff0c;这个问题的答案是很有必要的&#xff0c;因为目前市场上的家用大型洗衣机对衣物只能够起到清洁…

SpringBoot_02

Web后端开发_07 SpringBoot_02 SpringBoot原理 1.配置优先级 1.1配置 SpringBoot中支持三种格式的配置文件&#xff1a; application.propertiesapplication.ymlapplication.yaml properties、yaml、yml三种配置文件&#xff0c;优先级最高的是properties 配置文件优先级…

前端又出新轮子Nue.js,但还是低代码更香!

前言 别TM卷了&#xff01;&#xff01;&#xff01; 自从前后端分离以来&#xff0c;前端前端的车轮滚滚向前&#xff0c;轮子造的越来越圆。每个人都在适应这个轮子的节奏&#xff0c;稍微不注意就会被甩出车轮之外。 调侃归调侃&#xff0c;既然口子已经开了&#xff0c;…

一键抠图2:C/C++实现人像抠图 (Portrait Matting)

一键抠图2&#xff1a;C/C实现人像抠图 (Portrait Matting) 目录 一键抠图2&#xff1a;C/C实现人像抠图 (Portrait Matting) 1. 前言 2. 抠图算法 3. 人像抠图算法MODNet &#xff08;1&#xff09;模型训练 &#xff08;2&#xff09;将Pytorch模型转换ONNX模型 &…