支撑每秒 600 万订单无压力,SpringBoot + Disruptor 太猛了!

一、背景

工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录.

二、Disruptor介绍

  1. Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。
  2. Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。
  3. 从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。
  4. Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。
  5. 其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案。
  6. Disruptor的github主页:https://github.com/LMAX-Exchange/disruptor

三、Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

1. Ring Buffer

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

2. Sequence Disruptor

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。

3. Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

4. Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

5. Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

6. Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

7. EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

8. EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

9. Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
在这里插入图片描述

四、案例-demo

  • 通过下面8个步骤,你就能将Disruptor Get回家啦:
    • 1、添加pom.xml依赖
    <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.4</version>
    </dependency>
    
    • 2、消息体Model
    /*** 消息体*/
    @Data
    public class MessageModel {private String message;
    }
    
    • 3、构造EventFactory
    public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
    }
    
    • 4、构造EventHandler-消费者
    @Slf4j
    public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {//这里停止1000ms是为了确定消费消息是异步的Thread.sleep(1000);log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}",event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
    }
    
    • 5、构造BeanManager
    /*** 获取实例化对象*/
    @Component
    public class BeanManager implements ApplicationContextAware {private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() { return applicationContext; }public static Object getBean(String name) {return applicationContext.getBean(name);}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
    }
    
    • 6、构造MQManager
    @Configuration
    public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
    
    • 7、构造Mqservice和实现类-生产者
    public interface DisruptorMqService {/*** 消息* @param message*/void sayHelloMq(String message);
    }@Slf4j
    @Component
    @Service
    public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sayHelloMq(String message) {log.info("record the message: {}",message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
    }
    
    • 8、构造测试类及方法
    @Slf4j
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = DemoApplication.class)
    public class DemoApplicationTests {@Autowiredprivate DisruptorMqService disruptorMqService;/*** 项目内部使用Disruptor做消息队列* @throws Exception*/@Testpublic void sayHelloMqTest() throws Exception{disruptorMqService.sayHelloMq("消息到了,Hello world!");log.info("消息队列已发送完毕");//这里停止2000ms是为了确定是处理消息是异步的Thread.sleep(2000);}
    }
    
    测试运行结果
    2020-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: 消息到了,Hello world!
    2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
    2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : 消息队列已发送完毕
    2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息开始
    2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
    2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : 消费者处理消息结束

五、总结

其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。


---------------------
作者:不二天次
来源:CSDN
原文:https://blog.csdn.net/buertianci/article/details/105327031
版权声明:本文为作者原创文章,转载请附上博文链接!
内容解析By:CSDN,CNBLOG博客文章一键转载插件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【docker入门】

在软件开发过程中&#xff0c;环境配置是一个至关重要的步骤&#xff0c;它不仅影响开发效率&#xff0c;也直接关联到软件的最终质量。正确的环境配置可以极大地减少开发中的潜在问题&#xff0c;提升软件发布的流畅度和稳定性。以下是几个关键方面&#xff0c;以及如何优化环…

【机器学习】第6章 支持向量机(SVM)

一、概念 1.支持向量机&#xff08;support vector machine&#xff0c;SVM&#xff09;&#xff1a; &#xff08;1&#xff09;基于统计学理论的监督学习方法&#xff0c;但不属于生成式模型&#xff0c;而是判别式模型。 &#xff08;2&#xff09;支持向量机在各个领域内的…

如何在不丢失数据的情况下解锁安卓手机密码

手机是我们生活中必不可少的工具&#xff0c;可以帮助我们与朋友和家人保持联系&#xff0c;了解最新消息&#xff0c;甚至经营我们的业务。然而&#xff0c;当我们在 Android 手机或 iPhone 上设置密码时&#xff0c;我们经常会忘记密码&#xff0c;或者根本没有设置密码。当这…

IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照(snapshot)JAR 包

IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照&#xff08;snapshot&#xff09;JAR 包 目录 IntelliJ IDEA 使用 Maven 时不加载本地私服的最新版本快照&#xff08;snapshot&#xff09;JAR 包1. 检查 settings.xml2. IDEA Maven 配置3. 强制更新 Snapshot4. 使用…

学习笔记——路由网络基础——路由度量值

3、路由度量值 (1)基本概念 路由度量值表示到达这条路由所指目的地址的代价。度量值数值越小越优先&#xff0c;度量值最小路由将会被添加到路由表中。度量值很多时候被称为开销(Cost)。 路由度量(路由开销 cost)对于同一个路由协议&#xff0c;当到达某目标网段有多条路由供…

SQL Server入门-安装和测试(2008R2版)

环境&#xff1a;win10&#xff0c;SQL Server 2008 R2 因为工作需要用到SQL Server&#xff08;而且要用2008R2版&#xff09;&#xff0c;完全不熟&#xff0c;所以来学习学习。 SQL Server是微软开发的关系型数据库&#xff0c;支持SQL。同时还有微软还开发了自己的T-SQL&am…

小鹏汽车2025冲刺类L4智驾,挑战与机遇并存

随着科技的飞速发展&#xff0c;智能驾驶已成为汽车行业的前沿领域。近日&#xff0c;小鹏汽车在AI DAY上宣布国内首个量产上车的端到端大模型&#xff0c;这一创新举措无疑为智能驾驶的发展注入了新的活力。然而&#xff0c;在迈向2025年实现类L4级智能驾驶的道路上&#xff0…

大前端 业务架构 插件库 设计模式 属性 线程

大前端 业务架构 插件库 适配模式之(多态)协议1对多 抽象工厂模式 观察者模式 外观模式 装饰模式之参考catagory 策略模式 属性

服务器新硬盘分区、格式化和挂载

文章目录 参考文献查看了一下起点现状分区(base) ~ sudo parted /dev/sdcmklabel gpt&#xff08;设置分区类型&#xff09;增加分区 格式化需要先退出quit&#xff08;可以&#xff09;(base) / sudo mkfs.xfs /dev/sdc/sdc1&#xff08;失败&#xff09;sudo mkfs.xfs /dev/s…

通过nginx转发后应用偶发502bad gateway

序言 学习了一些东西&#xff0c;如何才是真正自己能用的呢&#xff1f;好像就是看自己的潜意识的反应&#xff0c;例如解决了一个问题&#xff0c;那么下次再碰到类似的问题&#xff0c;能直接下意识的去找到对应的信息&#xff0c;从而解决&#xff0c;而不是和第一次碰到一样…

CRC循环冗余校验

CRC循环冗余校验 循环冗余校验码是一种用在数字网络和存储设备上的差错校验码&#xff0c;可以校验原始数据的偶然差 错。 CRC 计算单元使用固定多项式计算 32 位 CRC 校验码。 1. 硬件CRC 在单片机中&#xff0c;芯片具有专用的CRC计算单元&#xff0c;它是按照32位数据长…

LeetCode 48.旋转图像

1.做题要求: 2.从此题我们可以看出规律为第几行要变为倒数第几列&#xff0c;所以我们最好先把二维数组存入一维数组中&#xff0c;然后先从最后一列遍历&#xff0c;把一维数组里的元素&#xff0c;依次等于遍历的元素即可: void rotate(int** matrix, int matrixSize, int*…

Scala函数

文章目录 一、第1关&#xff1a;方法S 三角形 ​实验代码&#xff1a; 二、第2关&#xff1a;Scala函数以及函数调用实验代码&#xff1a; 一、第1关&#xff1a;方法 任务描述 本关任务&#xff1a;根据三角形的三边长 a、b、c&#xff0c;返回三角形的面积。 任意三角形面积…

外网怎么访问内网?

当我们需要在外网环境下访问内网资源时&#xff0c;常常会面临一些困扰。通过使用一些相关的技术与工具&#xff0c;我们可以轻松地实现这一目标。本文将介绍如何通过【天联】组网产品&#xff0c;解决外网访问内网的问题。 【天联】组网是一款由北京金万维科技有限公司自主研…

JAVAFX打包部署真正能用的办法(jdk21,javafx23)IntelliJ IDEA

我之前创建了javafx项目&#xff0c;想打包试试。一试&#xff0c;全是坑&#xff0c;所以记录下来&#xff0c;为有缘人节约时间。直接构建工件是错误的&#xff0c;别尝试了&#xff0c;找不在JDK的。我也花了一天多的时间尝试了网上各种大神的办法&#xff0c;就没找到一个是…

stm32学习-软件I2C读取MPU6050

接线 SDAPB11SCLPB10 I2C 对操作端口的库函数进行封装 void MyI2C_W_SCL(uint8_t BitValue)//写 {GPIO_WriteBit(GPIOB, GPIO_Pin_10, (BitAction)BitValue);Delay_us(10); }void MyI2C_W_SDA(uint8_t BitValue)//写 {GPIO_WriteBit(GPIOB, GPIO_Pin_11, (BitAction)BitValu…

Redis 7.x 系列【4】命令手册

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 说明2. 命令手册2.1 Generic2.2 数据类型2.2.1 String2.2.2 Hash2.2.3 List2.2.4 S…

如何在纯内网环境下,将EasyCVR视频汇聚网关通过4G与第三方公网云平台级联?

EasyCVR视频汇聚网关是TSINGSEE青犀软硬一体的一款产品&#xff0c;可提供多协议的接入、音视频采集、处理&#xff0c;能实现海量前端设备的轻量化接入/转码/分发、视频直播、云端录像、云存储、检索回看、智能告警、平台级联等&#xff0c;兼容多种操作系统&#xff0c;轻松扩…

HTTP/2 协议学习

HTTP/2 协议介绍 ​ HTTP/2 &#xff08;原名HTTP/2.0&#xff09;即超文本传输协议 2.0&#xff0c;是下一代HTTP协议。是由互联网工程任务组&#xff08;IETF&#xff09;的Hypertext Transfer Protocol Bis (httpbis)工作小组进行开发。是自1999年http1.1发布后的首个更新。…

第3讲:关于Pixi的Text、Container、Sprite、Graphics组件功能作用

首先这里提供一个公用代码&#xff1a; 下部分各种组件基于这个公用代码直接往下添加代码即可。 import {Application, Text, Container, Sprite, BaseTexture, Texture, Graphics} from pixi.js import ./style.css import testImageUrl from ./images/test.jpg // 指明Appli…