600万订单每秒Disruptor +SpringBoot,如何解决消息不丢失?

尼恩说在前面

在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、shein 希音、百度、网易的面试资格,遇到很多很重要的面试题:

Disruptor 官方说能达到每秒600w OPS订单处理能力,怎么实现的?

Disruptor 什么情况下发生消费数据丢失? 该怎么解决?

小伙伴 没有回答好,导致面试挂了。

Disruptor 是队列之王,相关面试题也是一个非常常见的面试题,考察的是高性能的基本功。

如何才能回答得很漂亮,才能 让面试官刮目相看、口水直流呢?

这里,尼恩给大家做一下系统化、体系化的梳理,让面试官爱到 “不能自已、口水直流”,然后帮大家 实现 ”offer自由”

当然,这道面试题,以及参考答案,也会收入咱们的 《尼恩Java面试宝典》V175版本PDF集群,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。

注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请关注本公众号【技术自由圈】获取。

队列之王 Disruptor 简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。

在这里插入图片描述

2011年,企业应用软件专家Martin Fowler专门撰写长文介绍Disruptor。

2011年,Disruptor还获得了Oracle官方的Duke大奖

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Disruptor通过以下设计来,来实现 单线程能支撑每秒600万订单的问题:

  • 核心架构1:无锁架构
    生产和消费,都是无锁架构。具体来说,生产者位点/消费者位点的操作,都是无锁操作,或者使用轻量级CAS原子操作。

    无锁架构好处是,既没有锁的竞争, 也没有线程的内核态、用户态切换的开销。 关于内核态、用户态的原理请参见尼恩的葵花宝典。

  • 核心架构2:环形数组架构

    数组元素不会被回收,避免频繁的GC,所以,为了避免垃圾回收,采用数组而非链表。

    同时,数组对处理器的缓存机制更加友好。

    数组长度2^n,通过位运算,加快定位的速度。

    下标采取递增的形式。不用担心index溢出的问题。

    index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 核心架构3:cache line padding

    两个维度的CPU 缓存行加速,享受到 CPU Cache 那风驰电掣的速度带来的红利:

    第一维度: 对 位点等核心组件进行 CPU cache line padding,实现高并发访问(修改和取值)。

    第二个维度: ringbuffer 是一个数据,加载的时候一般也会塞满整个 CPU cache line。也就是说 从内存加载数据到 CPU Cache 里面的时候, 如果是加载数组里面的数据(如 Disruptor),那么 CPU 就会加载到数组里面连续的多个数据。
    所以,Disruptor 数组的遍历、还是位点的增长, 很容易享受到 CPU Cache 那风驰电掣的速度带来的红利。

SpringBoot + Disruptor 使用实战

有关 Disruptor的 简单实战,请参见 尼恩的 《Disruptor 学习圣经 V3》, 由于过于简单,这里不做啰嗦。

下面,来看一个SpringBoot + Disruptor的 使用实战

使用 Disruptor 实现一个生产消费模型步骤是:

  • 准备好简单的一个springboot应用

  • 定义事件(Event) : 你可以把 Event 理解为存放在队列中等待消费的消息对象。

  • 创建事件工厂 :事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。

  • 创建处理事件的 Handler :Event 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。

  • 创建并装配 Disruptor : 事件的生产和消费需要用到Disruptor 对象。

  • 定义生产者,并使用生产者发消息

  • 对简单的SpringBoot + Disruptor 进行扩容,实现 容量监控预警+ 动态扩容

定义一个Event和工厂

首先定义一个Event来包含需要传递的数据:

在这里插入图片描述

由于需要让Disruptor为我们创建事件,我们同时还声明了一个EventFactory来创建Event对象。

public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); } 
} 

在这里插入图片描述定义

事件处理器(消费者)

我们还需要一个事件消费者,也就是一个事件处理器。

这个例子中,事件处理器的工作,就是简单地把事件中存储的数据打印到终端:

    /** * 类似于消费者*  disruptor会回调此处理器的方法*/static class LongEventHandler implements EventHandler<LongEvent> {@Overridepublic void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {System.out.println(longEvent.getValue());}}

disruptor会回调此处理器的方法

定义事件源(生产者)

事件都会有一个生成事件的源,类似于 生产者的角色.

注意,这是一个 600wqps 能力的 异步生产者。 这里定义两个版本:

在这里插入图片描述

生产者的角色的接口定义如下

在这里插入图片描述

入门级:一个简单 DisruptorProducer 生产者的定义和使用

定义一个简单 DisruptorProducer 生产者

大致的代码如下

package com.crazymaker.cloud.disruptor.demo.business.impl;@Slf4j
public class DisruptorProducer implements AsyncProducer {//一个translator可以看做一个事件初始化器,publicEvent方法会调用它//填充Eventprivate static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =new EventTranslatorOneArg<LongEvent, Long>() {public void translateTo(LongEvent event, long sequence, Long data) {event.setValue(data);}};private final RingBuffer<LongEvent> ringBuffer;public DisruptorProducer() {this.ringBuffer = disruptor().getRingBuffer();}public void publishData(Long data) {log.info("生产一个数据:" + data + " | ringBuffer.remainingCapacity()= " + ringBuffer.remainingCapacity());ringBuffer.publishEvent(TRANSLATOR, data);}private Disruptor<LongEvent> disruptor() {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();LongEventFactory eventFactory = new LongEventFactory();int bufferSize = 1024;Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, namedThreadFactory,ProducerType.MULTI, new BlockingWaitStrategy());// 连接 消费者 处理器 ,两个消费者LongEventWorkHandler1 handler1 = new LongEventWorkHandler1();LongEventWorkHandler2 handler2 = new LongEventWorkHandler2();disruptor.handleEventsWith(handler1, handler2);//为消费者配置异常处理器disruptor.handleExceptionsFor(handler1).with(exceptionHandler);disruptor.handleExceptionsFor(handler2).with(exceptionHandler);// 开启 分裂者(事件分发)disruptor.start();return disruptor;}ExceptionHandler exceptionHandler =...//省略非核心代码  异常处理器实现
}

上面的代码,通过 disruptor() 方法创建和装配 一个Disruptor对象 ,Disruptor 里边有一个环形队列。然后 disruptor() 方法给 Disruptor对象设置消费者,并且为消费者设置异常处理器。

使用这一个简单 DisruptorProducer 生产者

定义一个配置类,用于实例化 生产者

在这里插入图片描述

定义controller, 注入这个 生产者,就可以异步发布数据 给消费者了

在这里插入图片描述

springboot应用启动之后, 可以通过 httpclient 工具,测试一下:

在这里插入图片描述

看一下测试数据

在这里插入图片描述

具体的代码和,演示过程,后面参考尼恩录制和发布《尼恩Java面试宝典》配套视频。

Disruptor:消费数据丢失问题的分析与解决

在处理高并发、大数据量等场景时,Disruptor虽然其高性能、低延迟,然而,在使用过程中,一些用户可能会遇到消费数据丢失问题。

为了解决这些问题,我们需要深入了解Disruptor的工作原理,并采取相应的解决方案。

消费数据丢失问题的根因

消费线程丢失问题通常发生在消费者处理速度跟不上生产者的时候。

由于Disruptor采用环形队列来存储数据,当队列满时,新的数据会覆盖旧的数据。

Disruptor 中,生产和消费的index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

生产和消费的index 下标采取递增的形式。不用担心index溢出的问题。

生产和消费的index 是通过 取模, 映射到 ring 环形数据的。

在这里插入图片描述

如果消费者速度慢, 生产者快,消费跟不上,生产的index(/Sequence)就会越来越大,取模的时候,又会从0开始,去覆盖ring前面的数据,从而导致没有消费的数据被丢失。

在这里插入图片描述

从上图可以看到,只要生产者 的Sequence 大于消费者 一个ring 的数量, 就开始 覆盖旧的数据,也就是开始丢失数据。

消费数据丢失问题解决方案:

  1. 增加消费者数量:增加消费者线程的数量,可以并行处理更多的数据,提高消费速度。

    同时,合理配置消费者与生产者的数量比例,确保队列生产者 的Sequence 不会大于消费者 一个ring 的数量。

  2. 增加ring环形数组的大小:通过增加数组的大小,从而保证一个环可以存放足够多的数据,但这个可能会导致OOM。

  3. 剩余容量监控与告警:
    通过Prometheus 对 remainingCapacity剩余容量 进行实时监控,当remainingCapacity 超过80%(阈值)及时发出告警通知相关人员处理,进行微服务实例的 HPA 横向扩容,或者进行 Disruptor 队列进行动态扩容。

  4. Disruptor 动态扩容
    对 Disruptor 框架进行合理封装,从单个Disruptor 队列模式,变成 ringbuffer 数组的形式,并且可以结合nacos 或者 Zookeeper 这种发布订阅组件, 对 ringbuffer 数组 进行在线扩容。

总之,通过增加消费者数量、增加ring环形数组的大小、剩余容量监控与告警, Disruptor 动态扩容等方式,可以有效解决 消费数据丢失问题。

高级版:Spring Boot + Prometheus 监控剩余容量 大小

我们的微服务项目中使用了 spring boot,集成 prometheus。

在这里插入图片描述

我们可以通过将remainingCapacity 作为指标暴露到 prometheus 中,通过如下代码:

在这里插入图片描述

增加这个代码之后,请求 /actuator/prometheus 之后,可以看到对应的返回:

在这里插入图片描述

这样,当这个值低于20%,我们就认为这个剩余空间不够,可以扩容了。

Disruptor 如何 动态扩容?

关于 Disruptor 动态扩容的方案,可以实现一个可以扩容的子类

在这里插入图片描述

定义一个环形队列的数据

    private Disruptor<LongEvent>[] executors;

在构造函数中,初始化为1:

    public ResizableDisruptorProducer() {executors = new Disruptor[1];executors[0] = disruptor();this.ringBuffer = executors[0].getRingBuffer();}

发布事件的时候,通过取模的方式,确定使用executors 数组 其中的一个RingBuffer

在这里插入图片描述

在next方法,执行indx 取模和 获取 ringbuffer 的操作

在这里插入图片描述

这里参考了netty源码里边 PowerOfTwoEventExecutorChooser 反应器选择的方式,使用位运算取模,从而实现高性能取模。

什么时候扩容呢? 当监控发现超过80%的阈值后,运维会收到告警,然后可以通过naocos、Zookeeper的发布订阅, 通知微服务进行扩容。

微服务 扩容要回调下面的risize方法

在这里插入图片描述

具体的代码和,演示过程,后面参考尼恩录制和发布《尼恩Java面试宝典》配套视频。

在使用Disruptor框架时,需要根据实际情况选择合适的监控和扩容解决方案,并不断优化和调整系统配置,以满足日益增长的业务需求。

说在最后

以上的内容,如果大家能对答如流,如数家珍,基本上 面试官会被你 震惊到、吸引到。最终,让面试官爱到 “不能自已、口水直流”。offer, 也就来了。

在面试之前,建议大家系统化的刷一波 5000页《尼恩Java面试宝典》V174,在刷题过程中,如果有啥问题,大家可以来 找 40岁老架构师尼恩交流。

另外,如果没有面试机会,可以找尼恩来帮扶、领路。

尼恩已经指导了大量的就业困难的小伙伴上岸,前段时间,帮助一个40岁+就业困难小伙伴拿到了一个年薪100W的offer,小伙伴实现了 逆天改命 。

尼恩技术圣经系列PDF

  • 《NIO圣经:一次穿透NIO、Selector、Epoll底层原理》
  • 《Docker圣经:大白话说Docker底层原理,6W字实现Docker自由》
  • 《K8S学习圣经:大白话说K8S底层原理,14W字实现K8S自由》
  • 《SpringCloud Alibaba 学习圣经,10万字实现SpringCloud 自由》
  • 《大数据HBase学习圣经:一本书实现HBase学习自由》
  • 《大数据Flink学习圣经:一本书实现大数据Flink自由》
  • 《响应式圣经:10W字,实现Spring响应式编程自由》
  • 《Go学习圣经:Go语言实现高并发CRUD业务开发》

……完整版尼恩技术圣经PDF集群,请找尼恩领取

《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》PDF,请到下面公号【技术自由圈】取↓↓↓

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/712272.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】指针初阶2.0版本

这篇博文我们来继续学习指针的其他内容 指针2.0 传值调用与传址调用传值调用传址调用 一维数组与指针理解数组名使用指针深入理解一维数组 二级指针指针数组二维数组与指针 传值调用与传址调用 在开始之前&#xff0c;我们需要先了解这个概念&#xff0c;后面才能够正常的学习…

利用 Python 抓取数据探索汽车市场趋势

一、引言 随着全球对环境保护意识的增强和技术的进步&#xff0c;新能源汽车作为一种环保、高效的交通工具&#xff0c;正逐渐受到人们的关注和青睐。在这个背景下&#xff0c;对汽车市场的数据进行分析和研究显得尤为重要。 本文将介绍如何利用 Python 编程语言&#xff0c;结…

VSCode上搭建C/C++开发环境(vscode配置c/c++环境)Windows系统---保姆级教程

引言劝退 VSCode&#xff0c;全称为Visual Studio Code&#xff0c;是由微软开发的一款轻量级&#xff0c;跨平台的代码编辑器。大家能来搜用VSCode配置c/c&#xff0c;想必也知道VSCode的强大&#xff0c;可以手握一个VSCode同时编写如C&#xff0c;C&#xff0c;C#&#xff…

微服务day02-Ribbon负载均衡与Nacos安装与入门

一.Ribbon负载均衡 在上一节中&#xff0c;我们通过在RestTemplte实例中加上了注解 LoadBalanced,表示将来由RestTemplate发起的请求会被Ribbon拦截和处理&#xff0c;实现了访问服务时的负载均衡&#xff0c;那么他是如何实现的呢&#xff1f; 1.1 Ribbon负载均衡的原理 Rib…

LabVIEW非接触式电阻抗层析成像系统

LabVIEW非接触式电阻抗层析成像系统 非接触式电阻抗层析成像&#xff08;NEIT&#xff09;技术以其无辐射、非接触、响应速度快的特点&#xff0c;为实时监测提供了新的解决方案。基于LabVIEW的电阻抗层析成像系统&#xff0c;实现了数据的在线采集及实时成像&#xff0c;提高…

Java SE:多线程(Thread)

1. 线程两个基本概念 并发&#xff1a;即线程交替运行多个指令并行&#xff1a;即多个线程同时运行指令 并发并行不矛盾&#xff0c;两者可同时发生&#xff0c;即多个线程交替运行指令 2. 多线程3种实现方式 2.1 直接创建线程对象 /*** 方式1&#xff1a;* 1. 创建thread类的…

【Linux系统化学习】信号的保存

目录 阻塞信号 信号处理常见方式概览 信号的其他相关概念 在内核中的表示 sigset_t 信号集操作函数 sigprocmask函数 sigpending函数 信号的捕捉 内核如何实现信号的捕捉 sigaction函数 可重入函数 volatile 阻塞信号 信号处理常见方式概览 当信号来临时&#x…

GEE:使用Sigmoid激活函数对单波段图像进行变换(以NDVI为例)

作者:CSDN @ _养乐多_ 本文将介绍在 Google Earth Engine (GEE)平台上,对任意单波段影像进行 Sigmoid 变换的代码。并以对 NDVI 影像像素值的变换为例。 文章目录 一、Sigmoid激活函数1.1 什么是 Sigmoid 激活函数1.2 用到遥感图像上有什么用?二、代码链接三、完整代码一…

MYSQL02高级_目录结构、默认数据库、表文件、系统独立表空间

文章目录 ①. MySQL目录结构②. 查看默认数据库③. MYSQL5.7和8表文件③. 系统、独立表空间 ①. MySQL目录结构 ①. 如何查看关联mysql目录 [rootmysql8 ~]# find / -name mysql /var/lib/mysql /var/lib/mysql/mysql /etc/selinux/targeted/tmp/modules/100/mysql /etc/seli…

前端src中图片img标签资源的几种写法?

在 Vue 项目中引用图片路径有几种不同的方法&#xff0c;具体取决于你的项目结构和配置。以下是几种常见的方式&#xff1a; 1. 静态资源目录 (Public) 如果你的图片放在了项目的 public 目录下&#xff08;例如&#xff0c;Vite 和 Create Vue App 脚手架工具通常使用这个目…

05 OpenCV图像混合技术

文章目录 理论算子示例 理论 其中 的取值范围为0~1之间 算子 addWeighted CV_EXPORTS_W void addWeighted(InputArray src1, double alpha, InputArray src2, double beta,double gamma, OutputArray dst, int dtype -1 ); 参数1&#xff1a;输入图像Mat …

2024年【广东省安全员A证第四批(主要负责人)】考试试卷及广东省安全员A证第四批(主要负责人)作业模拟考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 广东省安全员A证第四批&#xff08;主要负责人&#xff09;考试试卷根据新广东省安全员A证第四批&#xff08;主要负责人&#xff09;考试大纲要求&#xff0c;安全生产模拟考试一点通将广东省安全员A证第四批&#x…

钉钉机器人发送折线图卡片 工具类代码

钉钉机器人 “创建并投放卡片 接口 ” 可以 发送折线图、柱状图 官方文档&#xff1a;创建并投放卡片 - 钉钉开放平台 0依赖、1模板、2机器人放到内部应用、3放开这个权限 、4工具类、5调用工具类 拼接入参 卡片模板 自己看文档创建&#xff0c;卡片模板的id 有用 0、依赖…

Springboot项目中定时任务的四种实现方式

文章目录 1. 使用Scheduled注解1.1 时间间隔执行1.2 固定时间点执行 2. 使用EnableScheduling注解启用定时任务3. 实现SchedulingConfigurer接口4. 使用Quartz框架4.1 配置QuartzScheduler4.2 定义Job类和Trigger类 5. 总结 在开发现代应用时&#xff0c;定时任务是一个非常常见…

地图可视化绘制 | R-ggplot2 NC地图文件可视化

在推出两期数据分享之后&#xff0c;获取数据的小伙伴们也知道&#xff0c;数据格式都是NetCDF(nc) 格式网格数据&#xff0c;虽然我在推文分享中说明使用Python、R或者GIS类软件都是可以进行 处理和可视化绘制的&#xff0c;但是&#xff0c;还是有小伙伴咨询使用编程软件Pyth…

牛客周赛 Round 34(A,B,C,D,E,F,G)

把这场忘了。。官方也迟迟不发题解 比赛链接 出题人题解 A 小红的字符串生成 思路&#xff1a; 枚举四种字符串打印出来即可&#xff0c;为了防止重复可以用set先去一下重。 code&#xff1a; #include <iostream> #include <cstdio> #include <cstring&g…

day48 ● 198.打家劫舍 ● 213.打家劫舍II ● 337.打家劫舍III

一遍过。 当前房屋偷与不偷取决于 前一个房屋和前两个房屋是否被偷了。所以这里就更感觉到&#xff0c;当前状态和前面状态会有一种依赖关系&#xff0c;那么这种依赖关系都是动规的递推公式。 class Solution { public:int rob(vector<int>& nums) {vector<vec…

门店纵深不足、入口有遮挡影响客流准确率?近景客流帮你搞定!

为了优化运营策略、提升门店营收&#xff0c;很多店铺和商场都会安装客流摄像机。但是在实际应用中&#xff0c;由于门店纵深受限等原因&#xff0c;导致无法使用之前的常规客流产品。 针对这种情况&#xff0c;悠络客最新研发了近景客流产品&#xff0c;即使存在入口被遮挡或门…

内网信息搜集

目录 内网基础知识 基本流程图 怎么判断是否在域内 常规信息类收集-应用&服务&权限等 cs信息搜集 bloodhound安装及使用 内网基础知识 工作组&#xff1a;将不同的计算机按照功能分别列入不同的组&#xff0c;想要访问某个部门的资源&#xff0c;只要在【网络】里…

pyqt教程

一、组件安装配置 1.安装组件 在Anaconda Prompt下进入自己的python环境 pip install PyQt5 pip install PyQt5-tools 2.vscode安装插件 3.配置路径 配置Pyuic:Cmd与Qtdesigner:Path路径 1.Pyuic:Cmd路径 一般是在你安装的python环境下的 \Scripts\pyuic5.exe 2.Qtdesigner:P…