Java队列-Disruptor 的使用

一、什么是 Disruptor 

从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。

我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。

这些都是 Disruptor 能做到的,与之不同的是,Disruptor 能做更多:

  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
  • 预分配用于存储事件内容的内存空间;
  • 针对极高的性能目标而实现的极度优化和无锁的设计;

以上的描述虽然简单地指出了 Disruptor 是什么,但对于它“能做什么”还不是那么直截了当。一般性地来说,当你需要在两个独立的处理过程(两个线程)之间交换数据时,就可以使用 Disruptor 。当然使用队列(如上面提到的 BlockingQueue)也可以,只不过 Disruptor 做得更好。

拿队列来作比较的做法弱化了对 Disruptor 有多强大的认识,如果想要对此有更多的了解,可以仔细看看 Disruptor 在其东家 LMAX 交易平台(也是实现者) 是如何作为核心架构来使用的,这方面就不做详述了,问度娘或谷哥都能找到。

二、Disruptor 的核心概念

当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。

BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。

SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。

YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。

PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

  • Ring Buffer
    如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
  • Sequence  Disruptor
    通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
    (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
  • Sequencer 
    Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
  • Sequence Barrier
    用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
  • Wait Strategy
    定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
  • Event
    在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
  • EventProcessor
    EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
  • EventHandler
    Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
  • Producer
    即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

 

三、如何使用 Disruptor 

Disruptor 的 API 十分简单,主要有以下几个步骤:

  1. 定义事件
    事件(Event)就是通过 Disruptor 进行交换的数据类型。
public class LongEvent
{private long value;public void set(long value){this.value = value;}
}

 定义事件工厂
事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory<T>。
Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。
一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。

import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent>
{public LongEvent newInstance(){return new LongEvent();}
}

定义事件处理的具体实现
通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。

import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent>
{public void onEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println("Event: " + event);}
}

 

定义用于事件处理的线程池
Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。例如:

ExecutorService executor = Executors.newCachedThreadPool();

指定等待策略
Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。
Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。
例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,
BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

启动 Disruptor

EventFactory<LongEvent> eventFactory = new LongEventFactory();
ExecutorService executor = Executors.newSingleThreadExecutor();
int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,ringBufferSize, executor, ProducerType.SINGLE,new YieldingWaitStrategy());EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);disruptor.start();

发布事件
Disruptor 的事件发布过程是一个两阶段提交的过程:
  第一步:先从 RingBuffer 获取下一个可以写入的事件的序号;
  第二步:获取对应的事件对象,将数据写入事件对象;
  第三部:将事件提交到 RingBuffer;
事件只有在提交之后才会通知 EventProcessor 进行处理;

// 发布事件;
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//请求下一个事件序号;try {LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;long data = getEventData();//获取要通过事件传递的业务数据;event.set(data);
} finally{ringBuffer.publish(sequence);//发布事件;
}

注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。

Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。

tatic class Translator implements EventTranslatorOneArg<LongEvent, Long>{@Overridepublic void translateTo(LongEvent event, long sequence, Long data) {event.set(data);}    
}public static Translator TRANSLATOR = new Translator();public static void publishEvent2(Disruptor<LongEvent> disruptor) {// 发布事件;RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();long data = getEventData();//获取要通过事件传递的业务数据;ringBuffer.publishEvent(TRANSLATOR, data);
}
  1. 此外,Disruptor 要求 RingBuffer.publish 必须得到调用的潜台词就是,如果发生异常也一样要调用 publish ,那么,很显然这个时候需要调用者在事件处理的实现上来判断事件携带的数据是否是正确的或者完整的,这是实现者应该要注意的事情。

关闭 Disruptor

disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629641.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第二百六十九回

文章目录 概念介绍设置方法示例代码内容总结 我们在上一章回中介绍了Card Widget相关的内容&#xff0c;本章回中将介绍国际化设置.闲话休提&#xff0c;让我们一起Talk Flutter吧。 概念介绍 我们在这里说的国际化设置是指在App设置相关操作&#xff0c;这样可以让不同国家的…

书生·浦语大模型--第二节课作业

书生浦语大模型--第二节课作业 基础部分生成300字小故事hugging face 下载功能 进阶部分浦语灵笔的图文理解及创作部署Lagent 工具调用 Demo 创作部署 基础部分 生成300字小故事 hugging face 下载功能 hugging face被墙了&#xff0c;在本地电脑无论是不是科学上网&#xff…

STM32 定时器输入捕获2——捕获高电平时长

由上图我们可以知道&#xff0c;高电平时间t2-t1。在代码中&#xff0c;可以记录此时t1的时间然后再记录t2的时间&#xff0c;t2-t1&#xff0c;就是我们所想要的答案。 但是&#xff0c;还有更简单一点点的&#xff0c;当到达t1的时候&#xff0c;我们把定时器清零&#xff0c…

现代工程科技杂志现代工程科技杂志社现代工程科技编辑部2023年第21期目录

能源科技 配网故障停电原因及改进对策研究 上官安琪 110kV变电站电气自动化技术及应用策略 陈祥 变电运维误操作事故预控措施分析 高翔;韦婉 智能变电站变电运维安全与设备维护探究 温亮亮;覃万全 110kV变电站电气设计及其防雷保护案例研析 谢旭平 变电运维…

解决哈希冲突的几种方法

什么是hash冲突 哈希函数是一个映像&#xff0c;把任意长度的输入&#xff0c;通过Hash算法变换成固定长度的输出&#xff0c;这个输出就是Hash值&#xff1b; 当两个不同的输入&#xff0c;产生了同一个输出值即为哈希冲突 解决方式 开放定址法 开放寻址法的核心思想是&am…

微信小程序---如何创建分包

1.在项目根目录中&#xff0c;创建分包的根目录&#xff0c;名为subpkg&#xff0c;这个名字可以自己定义 2.在 pages.json 中&#xff0c;和 pages 节点平级的位置声明 subPackages 节点&#xff0c;用来定义分包相关的结构&#xff1a; 3.在分包目录&#xff0c;点击右键新建…

Python UI框架库之kivy使用详解

概要 Python是一种广泛使用的编程语言&#xff0c;而Kivy是一个用于创建跨平台移动应用和多点触控应用的开源Python框架。Kivy的设计目标是提供一种简单而强大的方式来构建富有创意的用户界面和交互体验。本文将详细介绍Kivy的基本概念、核心特性、布局系统、用户界面设计和实…

[zabbix] zabbix监控其他

一、温习zabbix自定义监控 二、zabbix 自动发现与自动注册 2.1 zabbix 自动发现 //zabbix 自动发现&#xff08;对于 agent2 是被动模式&#xff09; zabbix server 主动的去发现所有的客户端&#xff0c;然后将客户端的信息登记在服务端上。 缺点是如果定义的网段中的主机数…

Android系统开发之浅谈广播接收器回调

广播接器BroadcastReceiver 广播Intent和广播接收器BroadcastReceiver&#xff0c;是大家android开发用的特别多的二个控件。 那如何从系统角度看待广播和广播接收器呢&#xff1f; 对于静态注册BroadcastReceiver和动态注册的BroadcastReceiver是如何回调其onReceive方法呢…

安全帽/反光衣检测AI边缘计算智能分析网关V4如何修改IP地址?

智能分析网关V4是TSINGSEE青犀推出的一款AI边缘计算智能硬件&#xff0c;硬件采用BM1684芯片&#xff0c;集成高性能8核ARM A53&#xff0c;主频高达2.3GHz&#xff0c;INT8峰值算力高达17.6Tops&#xff0c;FB32高精度算力达到2.2T&#xff0c;硬件内置了近40种AI算法模型&…

MySQL索引和视图基础练习题

一、创建表的要求 学生表&#xff1a;Student (Sno, Sname, Ssex , Sage, Sdept) 学号&#xff0c;姓名&#xff0c;性别&#xff0c;年龄&#xff0c;所在系Sno为主键 课程表&#xff1a;Course (Cno, Cname,) 课程号&#xff0c;课程名Cno为主键 学生选课表&#xff1a;SC (S…

C++(1) —— 基础语法入门

目录 一、C初识 1.1 第一个C程序 1.2 注释 1.3 变量 1.4 常量 1.5 关键字 1.6 标识符命名规则 二、数据类型 2.1 整型 2.2 sizeof 关键字 2.3 实型&#xff08;浮点型&#xff09; 2.4 字符型 2.5 转义字符 2.6 字符串型 2.7 布尔类型 bool 2.8 数据的输入 三…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -投票帖子排行实现

锋哥原创的uniapp微信小程序投票系统实战&#xff1a; uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

Redis实现全局唯一Id

一、全局唯一ID 每个店铺都可以发布优惠券&#xff1a; 当用户抢购时&#xff0c;就会生成订单并保存到tb_voucher_order这张表中&#xff0c;而订单表如果使用数据库自增ID就存在一些问题&#xff1a; id的规律性太明显 受单表数据量的限制 场景分析&#xff1a;如果我们的…

两整数之和

题目链接 两整数之和 题目描述 注意点 不使用 运算符 和 - ​​​​​​​&#xff0c;计算并返回两整数之和-1000 < a, b < 1000 解答思路 需要用位运算来模拟加法&#xff0c;关键是要找到相加的和以及进位1的部分。如果不考虑进位的话&#xff0c;相加可以运用异…

Redis内部数据结构Dict结构详解

目录 dict的数据结构定义 dict的创建&#xff08;dictCreate&#xff09; dict的查找&#xff08;dictFind&#xff09; dict的插入&#xff08;dictAdd和dictReplace&#xff09; dict的删除&#xff08;dictDelete&#xff09; 如果你使用过Redis&#xff0c;一定会像我一样对…

STM32F103标准外设库——GPIO 输入、输出 (五)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

高级 Python 面试问题与解答

文章目录 专栏导读1.什么是PIP&#xff1f;2.什么是 zip 函数&#xff1f;3.Python 中的 __init __ () 是什么&#xff1f;4.Python 中的访问说明符是什么&#xff1f;5.Python 中的单元测试是什么&#xff1f;6.Python全局解释器锁&#xff08;GIL&#xff09;&#xff1f;7.P…

浪花 - 搜索标签前后端联调

前传&#xff1a;浪花 - 根据标签搜索用户-CSDN博客 目录 一、完善后端搜索标签接口 二、前后端搜索标签接口的对接 1. 使用 Axios 发送请求 2. 解决跨域问题 3. Axios 请求传参序列化 4. 接收后端响应数据 5. 处理后端响应数据格式 6. 搜索结果为空的页面展示 附&am…

区域入侵/区域人数统计AI边缘计算智能分析网关V4如何修改IP地址?

智能分析网关V4是TSINGSEE青犀推出的一款AI边缘计算智能硬件&#xff0c;硬件采用BM1684芯片&#xff0c;集成高性能8核ARM A53&#xff0c;主频高达2.3GHz&#xff0c;INT8峰值算力高达17.6Tops&#xff0c;FB32高精度算力达到2.2T&#xff0c;硬件内置了近40种AI算法模型&…