单机无锁线程安全队列-Disruptor

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>4.0.0</version></dependency>

定义一个商品

@Data
public class Goods {private String name;}

定义生产者

public class Producer {private final RingBuffer<Goods> ringBuffer;public Producer(RingBuffer<Goods> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 生产货品* @param goodsName*/public void onData(String goodsName) {long sequence = ringBuffer.next();try {Goods goods = ringBuffer.get(sequence);goods.setName(goodsName);} finally {ringBuffer.publish(sequence);}}
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{private String name;public Consumer(String name){this.name = name;}@Overridepublic void onEvent(Goods goods, long l, boolean b)  {//消费者接收到货品System.out.println(name+"消费了"+goods.getName());}@Overridepublic void onBatchStart(long batchSize, long queueDepth) {EventHandler.super.onBatchStart(batchSize, queueDepth);}@Overridepublic void onStart() {EventHandler.super.onStart();}@Overridepublic void onShutdown() {EventHandler.super.onShutdown();}@Overridepublic void onTimeout(long sequence) throws Exception {EventHandler.super.onTimeout(sequence);}@Overridepublic void setSequenceCallback(Sequence sequenceCallback) {EventHandler.super.setSequenceCallback(sequenceCallback);}
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.SINGLE,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//单生产者,单消费者disruptor.handleEventsWith(new Consumer("Consumer1"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//多个消费者按顺序消费disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3")).then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer1 = new Producer(ringBuffer);Producer producer2 = new Producer(ringBuffer);Producer producer3 = new Producer(ringBuffer);while (true){producer1.onData("goods"+UUID.randomUUID());producer2.onData("goods"+UUID.randomUUID());producer3.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/199455.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis整理

1. 数据类型 string , hash, 链表&#xff0c;Set, ZSet. string 底层是sds, sds与普通字符串的区别: a. sds存储了字符串长度&#xff0c;获取长度的时间复杂度为O(1); b. sds操作字符串会预先判断长度是否满足要求, 不会有字符串溢出的情况出现; c. 提前预分配, 惰性回收…

Vue学习计划--Vue2(四)watch、class、style、set

Vue 监听(watch): 监听一个属性的变化 监事属性watch: 当监视的属性变化时&#xff0c;回调函数自动调用&#xff0c;进行相关操作监视的属性必须存在&#xff0c;才能进入监视监视的两种写法&#xff1a; new Vue 时传入watch配置通过 vm.$watch()监视 immediate初始化时让han…

SpectralGPT: Spectral Foundation Model 论文翻译3

遥感领域的通用大模型 2023.11.13在CVPR发表 原文地址&#xff1a;[2311.07113] SpectralGPT: Spectral Foundation Model (arxiv.org) E.消融研究 在预训练阶段&#xff0c;我们对可能影响下游任务表现的各种因素进行了全面研究。这些因素包括掩蔽比、ViT patch大小、数据规…

多线程--11--ConcurrentHashMap

ConcurrentHashMap与HashMap等的区别 HashMap线程不安全 我们知道HashMap是线程不安全的&#xff0c;在多线程环境下&#xff0c;使用Hashmap进行put操作会引起死循环&#xff0c;导致CPU利用率接近100%&#xff0c;所以在并发情况下不能使用HashMap。 ConcurrentHashMap 主…

Linux信息收集

Linux信息收集 本机基本信息 #管理员 $普通用户 之前表示登录的用户名称&#xff0c;之后表示主机名&#xff0c;再之后表示当前所在目录 / 表示根目录 ~表示当前用户家目录1、内核&#xff0c;操作系统和设备信息 uname -a 打印所有可用的系统信息 uname -r 内核版本 u…

01_阿里云_Xshell连接服务器

PC使用Xshell连接阿里云服务器 问题引出 之前使用Xshell连接阿里云服务器连接的好好的&#xff0c;今天准备上去服务器学习Linux发现连不上了&#xff0c;后来发现是防火墙的问题&#xff0c;还有阿里云的安全组也需要设置 解决方案 方法一&#xff1a;&#xff08;简单粗暴…

3D模型材质编辑

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 如今&#xff0c;3D 纹理、打印和建模都非常流行。使用可用的高级工具&#xff0c;创建 3D 模型…

基于Git的代码工程管理——学习记录一

一、Git简概[1] Git是一个分布式版本控制系统&#xff0c;它跟踪任何一组计算机文件的更改&#xff0c;通常用于在软件开发过程中协调协作开发源代码的程序员之间的工作。其为实现快速、数据完整性以及分布式非线性工作流程&#xff08;在不同计算机上运行数千个并行分支&#…

钉钉提交审批意见,并上传附件接口集成

一&#xff1a;适配器 DingtalkApprovalFilesExecute 参考方案链接&#xff1a;轻易云数据集成平台 二&#xff1a;请求接口。配置参数 接口文档&#xff1a;使用了新旧接口 服务端API发起带有附件的审批流并下载附件 - 钉钉开放平台 接口&#xff1a;topapi/processinsta…

搜索与回溯算法②

求0-9的数字可以组成的所有k 位数。 def backtrack(start, path, k, n, results):"""核心函数。:param start: 下一个添加的数字的起始位置:param path: 当前构建的路径&#xff0c;代表一个组合:param k: 组合中所需的数字个数:param n: 可选数字的最大值:par…

Python编程技巧:多层for循环的高级应用

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com Python的for循环结构是编程中最基础也是最常用的控制结构之一。通过for循环&#xff0c;可以轻松遍历数据集合和执行重复的操作。然而&#xff0c;当我们面对多层for循环时&#xff0c;性能和可读性可能会成为挑…

linux使用逻辑券lvm进行磁盘管理

lvm的安装 在线安装 yum install lvm2 离线安装&#xff0c;下载后执行 rpm -ivh * --nodeps --force 在如下网站中挨个下载http://mirrors.163.com/centos/7/os/x86_64/Packages/ device-mapper-1.02.170-6.el7_9.5.x86_64.rpm device-mapper-event-1.02.170-6.el…

A-B 数对

A-B 数对 题目背景 出题是一件痛苦的事情&#xff01; 相同的题目看多了也会有审美疲劳&#xff0c;于是我舍弃了大家所熟悉的 AB Problem&#xff0c;改用 A-B 了哈哈&#xff01; 题目描述 给出一串正整数数列以及一个正整数 C C C&#xff0c;要求计算出所有满足 A −…

Redis的三种消息队列实现方式

目录 前言 List实现消息队列 PubSub消息队列 Stream消息队列 三种实现方式对比 前言 为什么要使用Redis的消息队列&#xff1f; 成本低&#xff0c;对于RabbitMQ或是Kafka来说&#xff0c;已经是重量级的消息队列。 Redis的三种实现方式&#xff1a; List结构&#xff1…

形态学操作—形态学梯度

形态学梯度&#xff08;Morphological Gradient&#xff09;是图像形态学处理中的一种操作&#xff0c;它通过对图像的膨胀和腐蚀操作之间的差异来突出图像中的边缘信息。这种操作有助于增强图像中物体的边界&#xff0c;使它们更加突出。   在数学上&#xff0c;形态学梯度的…

【安卓12源码】WMS系列:addWindow 和 removeWindow流程

一、Window 的属性 Window的属性定义在WindowManager的内部类LayoutParams中&#xff0c;了解Window的属性能够更好的理解WMS的内部原理。Window的属性有很多种&#xff0c;与应用开发最密切的有三种&#xff0c;它们分别是Type(Window的类型)、Flag(Window的标志)和SoftInputM…

SMART PLC温度采集模块温度转换FC(梯形图+SCL代码)

对于模拟量输入采集,温度变送器等我们可以利用线性转换功能块完成温度采集,西门子PLC有温度采集模块,利用温度采集模块采集温度我们的转换关系无需进行线性变换,下面我们具体介绍。温度采集线性转换功能块请参考下面的文章链接: https://rxxw-control.blog.csdn.net/arti…

Hadoop学习笔记(HDP)-Part.06 安装OracleJDK

目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …

代码随想录二刷 | 栈与队列 |用栈实现队列

代码随想录二刷 &#xff5c; 栈与队列 &#xff5c;用栈实现队列 题目描述解题思路 & 代码实现 题目描述 232.用栈实现队列 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 M…

外包干了8个月,技术退步明显.......

先说一下自己的情况&#xff0c;大专生&#xff0c;18年通过校招进入武汉某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落! 而我已经在一个企业干了四年的功能测…