支撑每秒 600 万订单无压力,SpringBoot + Disruptor 太猛了!

一、背景

工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录.

二、Disruptor介绍

  1. Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。
  2. Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。
  3. 从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。
  4. Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。
  5. 其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。
  6. Disruptor的github主页:https://github.com/LMAX-Exchange/disruptor

三、Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

1. Ring Buffer

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

2. Sequence Disruptor

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。

3. Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

4. Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

5. Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

6. Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

7. EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

8. EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

9. Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
在这里插入图片描述

四、案例-demo

  • 通过下面8个步骤,你就能将Disruptor Get回家啦:
    • 1、添加pom.xml依赖
    <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.4</version>
    </dependency>
    
    • 2、消息体Model
    /*** 消息体*/
    @Data
    public class MessageModel {private String message;
    }
    
    • 3、构造EventFactory
    public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
    }
    
    • 4、构造EventHandler-消费者
    @Slf4j
    public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {//这里停止1000ms是为了确定消费消息是异步的Thread.sleep(1000);log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}",event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
    }
    
    • 5、构造BeanManager
    /*** 获取实例化对象*/
    @Component
    public class BeanManager implements ApplicationContextAware {private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() { return applicationContext; }public static Object getBean(String name) {return applicationContext.getBean(name);}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
    }
    
    • 6、构造MQManager
    @Configuration
    public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
    
    • 7、构造Mqservice和实现类-生产者
    public interface DisruptorMqService {/*** 消息* @param message*/void sayHelloMq(String message);
    }@Slf4j
    @Component
    @Service
    public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sayHelloMq(String message) {log.info("record the message: {}",message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
    }
    
    • 8、构造测试类及方法
    @Slf4j
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = DemoApplication.class)
    public class DemoApplicationTests {@Autowiredprivate DisruptorMqService disruptorMqService;/*** 项目内部使用Disruptor做消息队列* @throws Exception*/@Testpublic void sayHelloMqTest() throws Exception{disruptorMqService.sayHelloMq("消息到了,Hello world!");log.info("消息队列已发送完毕");//这里停止2000ms是为了确定是处理消息是异步的Thread.sleep(2000);}
    }
    
    测试运行结果
    2020-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: 消息到了,Hello world!
    2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
    2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : 消息队列已发送完毕
    2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息开始
    2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
    2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息结束

五、总结

其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。


---------------------
作者:不二天次
来源:CSDN
原文:https://blog.csdn.net/buertianci/article/details/105327031
版权声明:本文为作者原创文章,转载请附上博文链接!
内容解析By:CSDN,CNBLOG博客文章一键转载插件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【postgresql初级使用】条件表达式触发器,兼顾DML执行性能,又能执行复杂逻辑,只在结帐时计算总帐

条件触发器 ​专栏内容&#xff1a; postgresql使用入门基础手写数据库toadb并发编程 个人主页&#xff1a;我的主页 管理社区&#xff1a;开源数据库 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物. 文章目录 条件触发器概…

【docker入门】

在软件开发过程中&#xff0c;环境配置是一个至关重要的步骤&#xff0c;它不仅影响开发效率&#xff0c;也直接关联到软件的最终质量。正确的环境配置可以极大地减少开发中的潜在问题&#xff0c;提升软件发布的流畅度和稳定性。以下是几个关键方面&#xff0c;以及如何优化环…

【机器学习】第6章 支持向量机(SVM)

一、概念 1.支持向量机&#xff08;support vector machine&#xff0c;SVM&#xff09;&#xff1a; &#xff08;1&#xff09;基于统计学理论的监督学习方法&#xff0c;但不属于生成式模型&#xff0c;而是判别式模型。 &#xff08;2&#xff09;支持向量机在各个领域内的…

如何在不丢失数据的情况下解锁安卓手机密码

手机是我们生活中必不可少的工具&#xff0c;可以帮助我们与朋友和家人保持联系&#xff0c;了解最新消息&#xff0c;甚至经营我们的业务。然而&#xff0c;当我们在 Android 手机或 iPhone 上设置密码时&#xff0c;我们经常会忘记密码&#xff0c;或者根本没有设置密码。当这…

IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照(snapshot)JAR 包

IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照&#xff08;snapshot&#xff09;JAR 包 目录 IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照&#xff08;snapshot&#xff09;JAR 包1. 检查 settings.xml2. IDEA Maven 配置3. 强制更新 Snapshot4. 使用…

学习笔记——路由网络基础——路由度量值

3、路由度量值 (1)基本概念 路由度量值表示到达这条路由所指目的地址的代价。度量值数值越小越优先&#xff0c;度量值最小路由将会被添加到路由表中。度量值很多时候被称为开销(Cost)。 路由度量(路由开销 cost)对于同一个路由协议&#xff0c;当到达某目标网段有多条路由供…

SQL Server入门-安装和测试(2008R2版)

环境&#xff1a;win10&#xff0c;SQL Server 2008 R2 因为工作需要用到SQL Server&#xff08;而且要用2008R2版&#xff09;&#xff0c;完全不熟&#xff0c;所以来学习学习。 SQL Server是微软开发的关系型数据库&#xff0c;支持SQL。同时还有微软还开发了自己的T-SQL&am…

Fontconfig head is null, check your fonts or fonts configuration问题解决

报错信息&#xff1a; Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [R equest processing failed: com.alibaba.excel.exception.ExcelGenerateException: java.lang.InternalError: java.lang.reflect.InvocationTargetExcep…

11 类型泛化

11 类型泛化 1、函数模版1.1 前言1.2 函数模版1.3 隐式推断类型实参1.4 函数模板重载1.5 函数模板类型形参的默认类型&#xff08;C11标准&#xff09; 2、类模版2.1 类模板的成员函数延迟实例化2.2 类模板的静态成员2.3 类模板的递归实例化2.4 类模板类型形参缺省值 3、类模板…

小鹏汽车2025冲刺类L4智驾,挑战与机遇并存

随着科技的飞速发展&#xff0c;智能驾驶已成为汽车行业的前沿领域。近日&#xff0c;小鹏汽车在AI DAY上宣布国内首个量产上车的端到端大模型&#xff0c;这一创新举措无疑为智能驾驶的发展注入了新的活力。然而&#xff0c;在迈向2025年实现类L4级智能驾驶的道路上&#xff0…

大前端 业务架构 插件库 设计模式 属性 线程

大前端 业务架构 插件库 适配模式之(多态)协议1对多 抽象工厂模式 观察者模式 外观模式 装饰模式之参考catagory 策略模式 属性

橡胶油封的用途是什么?

橡胶油封的用途是什么? 在机械工程和设备维护领域&#xff0c;橡胶油封发挥着至关重要的作用&#xff0c;确保各部件的耐用性和效率。那么&#xff0c;橡胶油封的具体用途是什么呢?本文将从多角度探讨橡胶油封的应用和优势&#xff0c;突出其在各个工业和汽车领域中的重要性…

QT 中文乱码 以及 tr 的使用

一、关于显示中文 1、网上常规的做法 - 第一步&#xff1a;代码文件选择用utf8编码带bom。QT Creator 文本编辑 行为配置里可以配置 - 第二步&#xff1a;在有中文汉字的代码文件顶部加一行&#xff08;一般是cpp文件&#xff09; #pragma execution_character_set("utf-…

服务器新硬盘分区、格式化和挂载

文章目录 参考文献查看了一下起点现状分区(base) ~ sudo parted /dev/sdcmklabel gpt&#xff08;设置分区类型&#xff09;增加分区 格式化需要先退出quit&#xff08;可以&#xff09;(base) / sudo mkfs.xfs /dev/sdc/sdc1&#xff08;失败&#xff09;sudo mkfs.xfs /dev/s…

通过nginx转发后应用偶发502bad gateway

序言 学习了一些东西&#xff0c;如何才是真正自己能用的呢&#xff1f;好像就是看自己的潜意识的反应&#xff0c;例如解决了一个问题&#xff0c;那么下次再碰到类似的问题&#xff0c;能直接下意识的去找到对应的信息&#xff0c;从而解决&#xff0c;而不是和第一次碰到一样…

softmax的数值溢出问题

softmax是deep learning常用的一个操作&#xff0c;虽然有很多现成的包可以调&#xff0c;但在某些场景下需要自己实现。本文简单探讨一下softmax可能会出现的数值稳定性问题 解决上溢出问题 Softmax ( x i ) exp ⁡ ( x i ) ∑ j 1 N exp ⁡ ( x j ) exp ⁡ ( x i ) / exp…

CRC循环冗余校验

CRC循环冗余校验 循环冗余校验码是一种用在数字网络和存储设备上的差错校验码&#xff0c;可以校验原始数据的偶然差 错。 CRC 计算单元使用固定多项式计算 32 位 CRC 校验码。 1. 硬件CRC 在单片机中&#xff0c;芯片具有专用的CRC计算单元&#xff0c;它是按照32位数据长…

LeetCode 48.旋转图像

1.做题要求: 2.从此题我们可以看出规律为第几行要变为倒数第几列&#xff0c;所以我们最好先把二维数组存入一维数组中&#xff0c;然后先从最后一列遍历&#xff0c;把一维数组里的元素&#xff0c;依次等于遍历的元素即可: void rotate(int** matrix, int matrixSize, int*…

RunMe_Aobut TC103848_UEFIShellFactoryDiagnostics.nsh

:: ***************************************************************************************************************************************************************** :: 20240617 :: 该脚本可以用于BIOS Case TC103848测试,功能包括&#xff1a;在EFIShell环境下运行…

Scala函数

文章目录 一、第1关&#xff1a;方法S 三角形 ​实验代码&#xff1a; 二、第2关&#xff1a;Scala函数以及函数调用实验代码&#xff1a; 一、第1关&#xff1a;方法 任务描述 本关任务&#xff1a;根据三角形的三边长 a、b、c&#xff0c;返回三角形的面积。 任意三角形面积…