一文快速掌握高性能内存队列Disruptor

写在文章开头

Disruptor是英国外汇公司LMAX开源的一款高性能内存消息队列,理想情况下单线程可支撑600w的订单。所以本文会从使用以及设计的角度来探讨一下这款神级java消息队列。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

基础使用示例

前置步骤

我们会基于该框架实现一个简单的生产者消费者模型,在此之前我们需要引入一下依赖:

	<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency>

约定消息模型

生产者消费者沟通的媒介就是消息,所以我们首先需要创建发送消息的消息模型,这里仅仅简单创建一个对象用户记录发送的字符串消息:

@Data
public class MessageModel {/*** 消息内容*/private String message;
}

基于消息模型初始化消息队列空间

Disruptor对于事件的存储进行更新操作都是基于RingBuffer,项目启动前它会基于我们的消息也就是上文的MessageModel 进行空间预初始化,所以我们需要继承EventFactory编写创建实例方法,告知Disruptor如何创建什么样消息对象空间,这一点我们可以在源码RingBufferFields的构造方法中得以印证:

RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){//基于sequencer完成队列数组entries 数组初始化this.sequencer = sequencer;this.bufferSize = sequencer.getBufferSize();//......this.indexMask = bufferSize - 1;this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];//基于我们给定的MsgEventFactory完成数组内部元素空间预初始化fill(eventFactory);}private void fill(EventFactory<E> eventFactory){for (int i = 0; i < bufferSize; i++){//调用我们的工厂方法完成元素内部元素空间初始化entries[BUFFER_PAD + i] = eventFactory.newInstance();}}

所以我们直接继承EventFactory给出消息模型创建的工厂方法:

public class MsgEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}

定制消息处理器

通过继承EventHandler并指定泛型即可接手并处理MessageModel消息,逻辑比较简单,读者可自行参阅:

@Slf4j
public class MsgEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {//休眠2s,模拟异步消费ThreadUtil.sleep(2000);log.info("消费者处理消息开始");if (messageModel != null) {log.info("收费者收到消息,序列号:{},消息内容:{}", sequence, messageModel.getMessage());}}
}

配置Disruptor核心参数

上述步骤完成之后,我们就可以配置环形队列RingBuffer了:

@Configuration
public class RingBufferConfig {@Beanpublic RingBuffer<MessageModel> messageModelRingBuffer() {//定义事件处理线程池,即消费者线程池ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix("thread-").build();//指定事件工厂MsgEventFactory msgEventFactory = new MsgEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(msgEventFactory, bufferSize, threadFactory,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new MsgEventHandler());// 启动disruptor线程disruptor.start();//获取RingBuffer,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}

实现生产者

上述步骤已经完成的Disruptor的所有创建和配置工作,注入环形队列,我们的服务就可以投递的消息了,这里我们给出对应的DisruptorMqServiceImpl 的实现代码:

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sendMsg(String message) {log.info("投递消息,消息内容: {}", message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加事件:{}", JSONUtil.toJsonStr(event));} catch (Exception e) {log.error("消息发送失败,失败原因 {}", e.getMessage(), e);} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
}

效果展示

我们通过外部接口直接调用,可以看到MsgEventHandler接收并准确的处理了我们投递的消息:

2024-02-16 00:17:14.223  INFO 9924 --- [io-18080-exec-1] c.s.service.impl.DisruptorMqServiceImpl  : 投递消息,消息内容: demoData
2024-02-16 00:17:14.254  INFO 9924 --- [io-18080-exec-1] c.s.service.impl.DisruptorMqServiceImpl  : 往消息队列中添加事件:{"message":"demoData"}
2024-02-16 00:17:14.255  INFO 9924 --- [io-18080-exec-1] c.sharkChili.controller.TestController   : 消息队列已发送完毕
2024-02-16 00:17:16.268  INFO 9924 --- [       thread-0] com.sharkChili.handler.MsgEventHandler   : 消费者处理消息开始
2024-02-16 00:17:16.268  INFO 9924 --- [       thread-0] com.sharkChili.handler.MsgEventHandler   : 收费者收到消息,序列号:0,消息内容:demoData

Disruptor工作流程详解

Disruptor官网文档详尽给出所有核心的组件概念,详情可参考:LMAX Disruptor User Guide

这里我们以流程化的方式给出几个比较核心的概念,如下图所示,首先是生产者Producer也就是我们上文中的DisruptorMqServiceImpl通过RingBuffer获取对应序列号的消息对象MessageModel的引用将消息设置进去。此时基于等待策略等待就绪事件的对应的SequenceBarrier就会拿到这个消息的序列号并传递给消费者即EventHandlerEventHandler会基于当前收到的序列号到RingBuffer中获取对应的消息并处理。

在这里插入图片描述

这一点我们可以在BatchEventProcessor的run方法中得以印证:

@Overridepublic void run(){//......T event = null;long nextSequence = sequence.get() + 1L;try{while (true){try{//sequenceBarrier基于等待策略获取就绪的消息序列号final long availableSequence = sequenceBarrier.waitFor(nextSequence);//拿到序列号之后从ringbuffer(也就是下文的dataProvider)获取对应的消息事件,并通过我们重写的eventHandler处理掉while (nextSequence <= availableSequence){event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);nextSequence++;}sequence.set(availableSequence);}catch (final TimeoutException e){//......}//......}}finally{//......}}

详解Disruptor高效的原因

缓存填充

JDK自带的队列ArrayBlockingQueue通过上锁并阻塞线程的方式却保证生产者和消费者之间安全通信,我们的入队(在我们的场景可直接理解为消息投递)为例,可以看到put方法会先上锁如果得不到锁线程会直接进入WAIT状态,然后判断队列是否达到上限,同样的若达到上限当前线程也会被阻塞,等待队列不满时被唤醒再次进行添加操作:

public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//上锁,若得不到锁则进入等待状态lock.lockInterruptibly();try {//队列已满则等待未满再进行操作while (count == items.length)notFull.await();//入队   enqueue(e);} finally {//释放锁lock.unlock();}}

除此之外JDK自带的ArrayBlockingQueue也没有考虑到并发场景下的伪共享问题,例如每个线程对应的CPU核心都将内存中的ArrayBlockingQueue加载到缓存行中,为了保证双方的缓存一致性,一旦一端修改了ArrayBlockingQueue,那么另一端的ArrayBlockingQueue就会被视为脏数据,这就意味着另一端的CPU需要操作ArrayBlockingQueue就需要重新从内存加载一份全新的ArrayBlockingQueue才能进行更新操作,在并发激烈的场景下,这种情况操作效率大大降低:

在这里插入图片描述

DisruptorRingBuffer中的字段RingBufferFields涉及RingBuffer中核心变量信息的记录,为了避免伪共享问题RingBufferFields继承RingBufferPad保证RingBufferFields每次被加载时前方都有7个8字节的数据填充。同理RingBuffer继承RingBufferFields在其后方填充7个8字节数据,由此保证了每一个RingBufferFields的任意字段被加载时,都有7个不可变的字段填充再任意CPU左右,避免RingBufferFields某个字段更新后,其他CPU缓存行的数据变为脏数据的缓存一致性问题:

在这里插入图片描述

对应的我们给出缓存填充的代码示例:

abstract class RingBufferPad
{protected long p1, p2, p3, p4, p5, p6, p7;
}abstract class RingBufferFields<E> extends RingBufferPad
{
//......
}public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;protected long p1, p2, p3, p4, p5, p6, p7;//....
}

分支预测

Disruptor通过数组构成一个循环队列,它在初始化时就固定了存储空间,按照局部性原理,一次即可加载批量的元素到缓存行中,结合CPU的分支预测机制,因为数组的顺序加载规律分支预测器可以非常高效的预测并缓存下一条指令从而快速获取到数组中的下一个元素,这就是Disruptor第2个高效的原因:

Disruptor

无锁操作

进行消息批量投递和消费时,Disruptor都会按照如下步骤:

  1. 计算要获取的序列范围。
  2. CAS设置获取并更新序列号进度。
  3. CAS原子更新成功则获取并进行生产或者消费,反之循环重试CAS直至成功。

在这里插入图片描述

为了印证这一点,我们将生产模式模式改为多线程生产ProducerType.MULTI

 Disruptor<MessageModel> disruptor = new Disruptor<>(msgEventFactory, bufferSize, threadFactory,ProducerType.MULTI, new BlockingWaitStrategy());

我们以上文中DisruptorMqServiceImpl投递消息前获取序列号这一步作为入口查看这一过程:

 long sequence = messageModelRingBuffer.next();

可以看到底层用sequencer进行序列号自增:

@Overridepublic long next(){return sequencer.next();}

对应MultiProducerSequencer的next方法可以看到,它会基于我们传入的n进行CAS累加操作,若成功则说明这批序列号获取成功,我们的生产者可以操作序列号对应的数组空间,从而进行消息投递,而这就是Disruptor高效的第3个原因——无锁。

@Overridepublic long next(int n){//......long current;long next;do{//获取当前序列号current = cursor.get();//获取自增范围next = current + n;//......if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){//......}//基于CAS原子更新cursor的数值,若成功则说明可以操作从current 到next这个范围的序列号的对应的数组元素空间,后续可以基于这个范围进行消息投递操作else if (cursor.compareAndSet(current, next)){break;}}//循环CAS操作直至成功while (true);return next;}

小结

以上便是笔者对于Disruptor剖析的全部内容,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

SpringBoot + Disruptor 实现特快高并发处理,支撑每秒 600 万订单无压力:https://mp.weixin.qq.com/s/k-WiWvIQcNft_fX7uroE0A

官网文档:https://lmax-exchange.github.io/disruptor/user-guide/index.html#user-guide-models

高性能队列——Disruptor:https://tech.meituan.com/2016/11/18/disruptor.html

【计组】理解Disruptor–《计算机组成原理》(十五):https://blog.csdn.net/weixin_56814032/article/details/128999761

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/7463.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java基础】JVM内存简单介绍

JVM把内存分为五块&#xff1a;栈、堆、方法区、本地方法区、寄存器当函数被调用时&#xff0c;函数内部的局部变量在栈中开辟内存&#xff0c;当局部变量的作用域结束时&#xff0c;立刻释放栈中所占据的内存。 栈 栈的特点&#xff1a;先进后出当函数被调用时&#xff0c;为…

unity制作app(5)--发送数据给数据库

这个之前做过&#xff0c;先不做照片的。下一节再做带照片的。 第一步 收集数据 1.先做一个AppModel结构体&#xff0c;这个结构体需要单做的。 using System; using System.Collections.Generic; using System.Linq; using System.Text; //using Assets.Model; public clas…

【Linux】System V 共享内存

文章目录 1. 共享内存示意图2. 共享内存数据结构3. 共享内存函数shmgetshmatshmdtshmctl 4. 实例代码测试共享内存5. 共享内存相关命令6. System V 消息队列&#xff08;了解&#xff09;7. System V 信号量&#xff08;了解&#xff09; 共享内存区是最快的 IPC 形式。一旦这样…

力扣每日一题109:有序链表转换二叉搜索树

题目 中等 给定一个单链表的头节点 head &#xff0c;其中的元素 按升序排序 &#xff0c;将其转换为 平衡 二叉搜索树。 示例 1: 输入: head [-10,-3,0,5,9] 输出: [0,-3,9,-10,null,5] 解释: 一个可能的答案是[0&#xff0c;-3,9&#xff0c;-10,null,5]&#xff0c;它…

Linux进程间通信方式

每个进程的用户空间都是独立的&#xff0c;不能相互访问。 所有进程的内核空间(32位系统3G-4G)都是共享的 应用场景 作为缓冲区&#xff0c;处理速度不同的进程之间的数据传输资源共享&#xff1a;多个进程之间共享同样的资源&#xff0c;一个进程对共享数据的修改&#xff0c…

【目录】500 行或更少(500 Lines or Less)

AOSA 500 行或更少&#xff08;500 Lines or Less&#xff09;是《开源应用程序体系结构》(Architecture of Open Source Applications, AOSA)系列的第四卷。该系列的前三卷是关于大型程序必须解决的大问题&#xff0c;而本书专注于程序员在构建新事物时在小规模中做出的设计决…

亚马逊自养号测评环境搭建需要准备哪些?

在当下电商领域竞争白热化的背景下&#xff0c;亚马逊平台的卖家们对流量之于店铺转化率的重要性有着深刻的认识。随着对平台内部流量的依赖逐渐减弱&#xff0c;他们纷纷寻求更多元化的途径来提升销售业绩和品牌的市场影响力。在此过程中&#xff0c;自养号测评成为了一种备受…

Golang日志实战教程:掌握log与syslog库的高效使用

Golang日志实战教程&#xff1a;掌握log与syslog库的高效使用 简介理解 Golang 的 log 库基本概念创建日志记录器自定义日志记录器日志级别 深入 syslogsyslog 的基础配置和使用 syslog高级应用 日志格式化与管理日志格式化日志文件管理 日志的高级应用集成第三方日志框架使用 …

盲盒小程序怎么做?盲盒创业

盲盒作为当下的新兴行业&#xff0c;从出现就备受年轻消费者的追捧&#xff0c;成为了我国发展前景巨大的行业之一。盲盒市场不仅吸引了众多消费者&#xff0c;同时也吸引了更多的创业者&#xff0c;成为了一大创业新模式。 盲盒小程序是一种线上盲盒销售模式&#xff0c;以社…

气膜滑雪馆:滑雪新宠的全面介绍—轻空间

气膜滑雪馆&#xff0c;作为一种创新型的滑雪运动设施&#xff0c;正以其独特的建筑特点和功能优势&#xff0c;成为滑雪运动领域的引领者。这些场馆凭借其轻盈的结构、优良的保温性能和环保节能的特性&#xff0c;为滑雪场馆的建设提供了全新的解决方案。相较于传统建筑&#…

【ElasticSearch】IK分词器中停用词问题

问题描述 在ES中进行部分关键词搜索时&#xff0c;搜索无结果&#xff0c;如搜索 【IT】 环境描述 中文分词插件 这里使用的是 analysis-ik 分词调试 POST test_index/_analyze {"text":"IT Manager","analyzer": "ik_max_word"…

【Django学习笔记(八)】MySQL的数据管理

MySQL的数据管理 前言正文1、新增数据2、删除数据3、修改数据4、查询数据5、案例&#xff1a;员工管理5.1 创建表结构5.1.1 创建数据库5.1.2 创建数据表 5.2 Python操作MySQL5.2.1 pymysql 的基本操作步骤5.2.2 优化 pymysql 的基本操作步骤5.2.3 查询数据5.2.4 修改数据5.2.5 …

使用 Gitea 进行私有 Git 仓库管理

在本文中&#xff0c;我们将介绍如何使用 Gitea 搭建并管理私有 Git 仓库。Gitea 是一个轻量级的 Git 服务&#xff0c;提供了类似于 GitHub 的功能&#xff0c;适合个人和小团队使用。我们将通过以下步骤来完成搭建和配置 Gitea 服务器。 步骤一&#xff1a;安装 Gitea 首先…

spss 导入数据的时候 用于确定数据类型的值所在的百分比95%是什么意思,数据分析,医学数据分析

在SPSS中&#xff0c;当提及“数据类型的值所在的百分比95%”时&#xff0c;这通常与数据的统计分布或置信区间有关&#xff0c;而不是直接关于数据类型的定义。 导入数据的时候需要定义数据类型&#xff0c;那么根据提供的数据&#xff0c;来定义&#xff0c;有时候&#xff…

【每天一个linux小知识】如何使用 oh-my-zsh 让使用zsh更高效

往期文章 tailf 和 tail -f nslookup 目录 往期文章对比演示zshoh-my-zsh安装自动提示、补全、语法高亮等插件参考 对比演示 使用 oh-my-zsh 之前&#xff1a; 使用 oh-my-zsh 之后&#xff1a; zsh 要使用oh-my-zsh前提是使用zsh。所以第一步安装zsh 可以看一下你的系统…

跟TED演讲学英文:What moral decisions should driverless cars make by Iyad Rahwan

What moral decisions should driverless cars make? Link: https://www.ted.com/talks/iyad_rahwan_what_moral_decisions_should_driverless_cars_make Speaker: Iyad Rahwan Date: September 2016 文章目录 What moral decisions should driverless cars make?Introduct…

字节人都用的婚恋交友相亲平台有哪些?聊聊互联网大厂的人是怎么脱单的!

虽然在字节这样的公司上班&#xff0c;也算是人中之人了。但是也耐不住29岁了&#xff0c;快成大龄剩女了。迫于长辈的催婚压力&#xff0c;所以带着任务体验了一遍各大相亲交友平台&#xff0c;以下是我的使用感受。 1、青藤之恋&#xff1a;偏相亲定位&#xff0c;曾经高学历…

Flask gevent启动报错UnicodeDecodeError

文章目录 环境代码报错Track解决思路 环境 acondana 24.1.2python 3.7.13 32bitflask 2.2.3gevent 21.8.0 代码 port 7236 logging.basicConfig(levellogging.INFO, # 控制台打印的日志级别filename./logs/app.log, # 将日志写入log_new.log文件中filemodea, # 模式&…

移动硬盘无法被识别怎么办?恢复移动硬盘3个正确做法

移动硬盘已成为我们日常生活和工作中不可或缺的数据存储设备。然而当移动硬盘突然无法被电脑识别时&#xff0c;往往会让人倍感焦虑。面对这种情况我们不必过于慌张&#xff0c;下面一起来看看指南解决。 解决方法一&#xff1a;检查硬件连接与供电 检查接口连接&#xff1a…

手机短信删除了还能恢复吗?该怎么恢复呢?

在我们的日常生活中&#xff0c;手机短信已经成为我们与他人沟通的重要方式之一。然而&#xff0c;有时候我们会不小心删除了一些重要的短信&#xff0c;这时候就非常希望能够恢复它们。那么&#xff0c;手机短信删除了还能恢复吗&#xff1f;该怎么恢复呢&#xff1f;本文将告…