Disruptor 有哪些典型的使用场景?

大家好,我是君哥。

Disruptor 是一款高性能的内存有界队列,它通过内存预分配、无锁并发、解决伪共享问题、使用 RingBuffer 取代阻塞队列等措施来大幅提升队列性能。

但开发者们往往对它的使用场景不太了解,到底应该在哪些场景使用呢?今天咱们就来聊一聊 Disruptor 的使用场景。

Disruptor 是一个生产-消费模式的队列,这里我们使用官网的示例,生产者发送一个 long 类型的变量,消费者收到消息后把变量打印出来。首先定义消息体:

public class LongEvent {private long value;public void set(long value){this.value = value;}@Overridepublic String toString(){return "LongEvent{" + "value=" + value + '}';}
}

为了让 Disruptor 给消息预先分配内存,定义一个 EventFactory,代码如下:

public class LongEventFactory implements EventFactory<LongEvent>
{@Overridepublic LongEvent newInstance(){return new LongEvent();}
}

下面定义个消费者 LongEventHandler:

public class LongEventHandler implements EventHandler<LongEvent>
{private String consumer;public LongEventHandler(String consumer) {this.consumer = consumer;}@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println("consumer: " + consumer + ",Event: " + event);}
}

1.广播场景

广播场景在我们的开发工作中并不少见,比如系统收到上游系统的一个请求消息,然后把这个消息发送给多个下游系统来处理。Disruptor 支持广播模式。比如消费者生产的消息由三个消费者来消费:

图片

public class Broadcast {public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");disruptor.handleEventsWith(consumer1, consumer2, consumer3);disruptor.start();RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000);}}
}

2.日志收集

再来看一个日志收集的例子。这里我们假设一个场景,业务系统集群有 3 个节点,每个节点打印的业务日志发送到 Disruptor,Disruptor 下游有 3 个消费者负责日志收集。

图片

这里我们需要重新定义一个日志收集处理类,代码如下:

public class LogCollectHandler implements WorkHandler<LongEvent> {public LogCollectHandler(String consumer) {this.consumer = consumer;}private String consumer;@Overridepublic void onEvent(LongEvent event){System.out.println("consumer: " + consumer + ",Event: " + event);}
}

下面这个代码是绑定消费者的代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);WorkHandler<LongEvent> consumer1 = new LogCollectHandler("consumer1");WorkHandler<LongEvent> consumer2 = new LogCollectHandler("consumer2");WorkHandler<LongEvent> consumer3 = new LogCollectHandler("consumer3");disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);disruptor.start();
}

需要注意的是,上面使用的是 Disruptor 的 handleEventsWithWorkerPool 方法,使用的消费者不是 EventHandler,而是 WorkHandler。消费者组里面的消费者如果是 WorkHandler,那消费者之间就是有竞争的,比如一个 Event 已经被 consumer1 消费过,那就不再会被其他消费者消费了。消费者组里面的消费者如果是 EventHandler,那消费者之间是没有竞争的,所有消息都会消费。

3.责任链

责任链这种设计模式我们都比较熟悉了,同一个对象的处理有多个不同的逻辑,每个逻辑作为一个节点组成责任链,比如收到一条告警消息,处理节点分为:给开发人员发送邮件、给运维人员发送短信、给业务人员发送 OA 消息。

图片

Disruptor 支持链式处理消息,看下面的示例代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);disruptor.start();
}

Disruptor 也支持多个并行责任链,下图是 2 条责任链的场景:

图片

这里给出一个示例代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");EventHandler<LongEvent> consumer5 = new LongEventHandler("consumer5");EventHandler<LongEvent> consumer6 = new LongEventHandler("consumer6");disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6);disruptor.start();
}

4.多任务协作

一个经典的例子,我们在泡咖啡之前,需要烧水、洗被子、磨咖啡粉,这三个步骤可以并行,但是需要等着三步都完成之后,才可以泡咖啡。

图片

当然,这个例子可以用 Java 中的 CompletableFuture 来实现,代码如下:

public static void main(String[] args){ExecutorService executor = ...;CompletableFuture future1 = CompletableFuture.runAsync(() -> {try {washCup();} catch (InterruptedException e) {e.printStackTrace();}}, executor);CompletableFuture future2 = CompletableFuture.runAsync(() -> {try {hotWater();} catch (InterruptedException e) {e.printStackTrace();}}, executor);CompletableFuture future3 = CompletableFuture.runAsync(() -> {try {grindCoffee();} catch (InterruptedException e) {e.printStackTrace();}}, executor);CompletableFuture.allOf(future1, future2, future3).thenAccept(r -> {System.out.println("泡咖啡");});System.out.println("我是主线程");
}

同样,使用 Disruptor 也可以实现这个场景,看下面代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4);disruptor.start();
}

5.多消费者组

类比主流消息队列的场景,Disruptor 也可以实现多消费者组的场景,组间并行消费互不影响,组内消费者竞争消息,如下图:

图片

示例代码如下:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);WorkHandler<LongEvent> consumer1 = new LogWorkHandler("consumer1");WorkHandler<LongEvent> consumer2 = new LogWorkHandler("consumer2");WorkHandler<LongEvent> consumer3 = new LogWorkHandler("consumer3");WorkHandler<LongEvent> consumer4 = new LogWorkHandler("consumer4");WorkHandler<LongEvent> consumer5 = new LogWorkHandler("consumer5");WorkHandler<LongEvent> consumer6 = new LogWorkHandler("consumer6");disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6);disruptor.start();
}

6.总结

通过消费者的灵活组合,Disruptor 的使用场景非常丰富。本文介绍了 Disruptor 的 5 个典型使用场景。在选型的时候,除了使用场景,更多地要考虑到 Disruptor 作为高性能内存队列的这个特点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/891431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[MySQL报错]关于发生net start mysql 服务无法启动,服务没有报告任何错误的五种解决方案。

咋直接进入主题。 我遇到的问题是net start mysql 服务无法启动&#xff0c;服务没有报告任何错误 其问题出在哪里呢 一.ini文件配置问题 在于你没有给你下载好的mysql文件中配置.ini文件。 该如何配置呢。那就是先在文件夹中创建一个文本文件&#xff0c;把下面内容复制进去…

HTML5新特性|01 音频视频

音频 1、Audio (音频) HTML5提供了播放音频文件的标准 2、control(控制器) control 属性供添加播放、暂停和音量控件 3、标签: <audio> 定义声音 <source> 规定多媒体资源,可以是多个<!DOCTYPE html> <html lang"en"> <head><…

goView二开低代码平台1.0

官网文档地址&#xff1a;GoView 说明文档 | 低代码数据可视化开发平台 简介&#xff1a;GoView 是一个拖拽式低代码数据可视化开发平台&#xff0c;通过拖拽创建数据大屏&#xff0c;使用Vue3框架&#xff0c;Ts语言和NaiveUI组件库创建的开源项目。安装步骤和地址文档里都有…

2024年中国新能源汽车用车发展怎么样 PaperGPT(一)

概述 在国家政策的强力扶持下&#xff0c;2024年中国新能源汽车市场迎来了新的发展机遇。本文将基于《中国新能源汽车用车报告&#xff08;2024年&#xff09;》的数据&#xff0c;对新能源汽车的市场发展和用车趋势概述。 新能源汽车市场发展 政策推动&#xff1a;国家和地…

数据表中列的完整性约束概述

文章目录 一、完整性约束概述二、设置表字段的主键约束三、设置表字段的外键约束四、设置表字段的非空约束五、设置表字段唯一约束六、设置表字段值自动增加七、设置表字段的默认值八、调整列的完整性约束 一、完整性约束概述 完整性约束条件是对字段进行限制&#xff0c;要求…

Unity网络通信相关

Socket 通信一张图搞定 谁提供服务谁绑定端口&#xff0c;建立Listener,写Host

ChatGPT 与 AGI:人工智能的当下与未来走向全解析

在人工智能的浩瀚星空中&#xff0c;AGI&#xff08;通用人工智能&#xff09;无疑是那颗最为璀璨且备受瞩目的星辰。OpenAI 对 AGI 的定义为“在最具经济价值的任务中超越人类的高度自治系统”&#xff0c;并勾勒出其发展的五个阶段&#xff0c;当下我们大多处于以 ChatGPT 为…

Microsoft Visual Studio中的/MT, /MTd,/MD,/MDd分别是什么意思?

1. /MT&#xff0c;/MTd&#xff0c;/MD&#xff0c;/MDd的含义 /MT&#xff0c;/MTd&#xff0c;/MD&#xff0c;/MDd是 Microsoft Visual C 编译器的运行时库链接选项。它们决定了程序如何链接 C 运行时库&#xff08;CRT&#xff09;。具体含义如下&#xff1a; /MT&#x…

七次课掌握 Photoshop

mediaTEA 的《七次课掌握 Photoshop》系列文章以循序渐进的教学方式&#xff0c;帮助学员在短时间内高效掌握 Photoshop 的核心功能。 从基础知识到高级技巧&#xff0c;课程涵盖图像编辑、选区与抠图、形状与文字、绘画与修饰、调整与混合、样式与滤镜&#xff0c;以及自动化与…

【Goland】怎么执行 go mod download

1、终端的执行 go mod tidy 2、终端执行不行的话&#xff0c;就可以通过右击go.mod文件来执行&#xff1b; 3、也可以按住Ctrl点击这个包安装&#xff1b;

数据分析与应用:如何分析7日动销率和滞销率?

目录 0 需求描述 1 数据准备 1.1 订单明细表 1.2 商品信息表 2 SQL实现 3 问题分析与总结

深度学习模型概论

深度学习模型是机器学习领域中的一个重要分支&#xff0c;它通过使用多层神经网络来模拟人脑处理信息的方式&#xff0c;从而解决复杂的学习任务。以下是一些主要的深度学习模型&#xff1a; 深度前馈神经网络&#xff08;Deep Feedforward Networks&#xff09;&#xff1a; …

玩转OCR | 腾讯云智能结构化OCR初次体验

目录 一、什么是OCR&#xff08;需要了解&#xff09; 二、产品概述与核心优势 产品概述 智能结构化能做什么 举例说明&#xff08;选看&#xff09; 1、物流单据识别 2、常见证件识别 3、票据单据识别 4、行业材料识别 三、产品特性 高精度 泛化性 易用性 四、…

十个Scala的小知识

# 1. 与Java的互操作性 Scala与Java有很好的互操作性。可以在Scala项目中直接使用Java类库&#xff0c;也可以将Scala代码编译后供Java项目使用。例如&#xff0c;一个Java框架可以轻松地集成Scala编写的代码模块。 # 2. 强大的集合库 Scala拥有功能丰富的集合库。像List、Se…

基于BiLSTM和随机森林回归模型的序列数据预测

本文以新冠疫情相关数据集为案例,进行新冠数量预测。(源码请留言或评论) 首先介绍相关理论概念: 序列数据特点 序列数据是人工智能和机器学习领域的重要研究对象,在多个应用领域展现出独特的特征。这种数据类型的核心特点是 元素之间的顺序至关重要 ,反映了数据内在的时…

c# Record关键字

在 C# 9.0 中引入了 record 关键字&#xff0c;用于定义记录类型&#xff08;Record Types&#xff09;。记录类型是一种轻量级的数据载体&#xff0c;专注于表示数据&#xff0c;它提供了内置的相等性比较、生成属性和方法等功能&#xff0c;使得编写数据类更加简洁和高效。 …

开源模型应用落地-Qwen2.5-7B-Instruct与vllm实现离线推理-降本增效(一)

一、前言 离线推理能够在模型训练完成后,特别是在处理大规模数据时,利用预先准备好的输入数据进行批量推理,从而显著提高计算效率和响应速度。通过离线推理,可以在不依赖实时计算的情况下,快速生成预测结果,从而优化决策流程和提升用户体验。此外,离线推理还可以降低云计…

安装、快速入门

安装 sudo docker run \-e RABBITMQ_DEFAULT_USERroot \-e RABBITMQ_DEFAULT_PASS123456 \-v rabbitmq-plugins:/plugins \--name rabbitmq \--hostname rabbitmq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq 1、防火墙开放两个端口 2、RabbitMQ 安装 Web 插件&#xff1a; …

JVM学习:CMS和G1收集器浅析

总框架 一、Java自动内存管理基础 1、运行时数据区 运行时数据区可分为线程隔离和线程共享两个维度&#xff0c;垃圾回收主要是针对堆内存进行回收 &#xff08;1&#xff09;线程隔离 程序计数器 虚拟机多线程是通过线程轮流切换、分配处理器执行时间来实现的。为了线程切换…

用uniapp写一个播放视频首页页面代码

效果如下图所示 首页有导航栏&#xff0c;搜索框&#xff0c;和视频列表&#xff0c; 导航栏如下图 搜索框如下图 视频列表如下图 文件目录 视频首页页面代码如下 <template> <view class"video-home"> <!-- 搜索栏 --> <view class…