Disruptor 有哪些典型的使用场景?

大家好,我是君哥。

Disruptor 是一款高性能的内存有界队列,它通过内存预分配、无锁并发、解决伪共享问题、使用 RingBuffer 取代阻塞队列等措施来大幅提升队列性能。

但开发者们往往对它的使用场景不太了解,到底应该在哪些场景使用呢?今天咱们就来聊一聊 Disruptor 的使用场景。

Disruptor 是一个生产-消费模式的队列,这里我们使用官网的示例,生产者发送一个 long 类型的变量,消费者收到消息后把变量打印出来。首先定义消息体:

public class LongEvent {private long value;public void set(long value){this.value = value;}@Overridepublic String toString(){return "LongEvent{" + "value=" + value + '}';}
}

为了让 Disruptor 给消息预先分配内存,定义一个 EventFactory,代码如下:

public class LongEventFactory implements EventFactory<LongEvent>
{@Overridepublic LongEvent newInstance(){return new LongEvent();}
}

下面定义个消费者 LongEventHandler:

public class LongEventHandler implements EventHandler<LongEvent>
{private String consumer;public LongEventHandler(String consumer) {this.consumer = consumer;}@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println("consumer: " + consumer + ",Event: " + event);}
}

1.广播场景

广播场景在我们的开发工作中并不少见,比如系统收到上游系统的一个请求消息,然后把这个消息发送给多个下游系统来处理。Disruptor 支持广播模式。比如消费者生产的消息由三个消费者来消费:

图片

public class Broadcast {public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");disruptor.handleEventsWith(consumer1, consumer2, consumer3);disruptor.start();RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000);}}
}

2.日志收集

再来看一个日志收集的例子。这里我们假设一个场景,业务系统集群有 3 个节点,每个节点打印的业务日志发送到 Disruptor,Disruptor 下游有 3 个消费者负责日志收集。

图片

这里我们需要重新定义一个日志收集处理类,代码如下:

public class LogCollectHandler implements WorkHandler<LongEvent> {public LogCollectHandler(String consumer) {this.consumer = consumer;}private String consumer;@Overridepublic void onEvent(LongEvent event){System.out.println("consumer: " + consumer + ",Event: " + event);}
}

下面这个代码是绑定消费者的代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);WorkHandler<LongEvent> consumer1 = new LogCollectHandler("consumer1");WorkHandler<LongEvent> consumer2 = new LogCollectHandler("consumer2");WorkHandler<LongEvent> consumer3 = new LogCollectHandler("consumer3");disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);disruptor.start();
}

需要注意的是,上面使用的是 Disruptor 的 handleEventsWithWorkerPool 方法,使用的消费者不是 EventHandler,而是 WorkHandler。消费者组里面的消费者如果是 WorkHandler,那消费者之间就是有竞争的,比如一个 Event 已经被 consumer1 消费过,那就不再会被其他消费者消费了。消费者组里面的消费者如果是 EventHandler,那消费者之间是没有竞争的,所有消息都会消费。

3.责任链

责任链这种设计模式我们都比较熟悉了,同一个对象的处理有多个不同的逻辑,每个逻辑作为一个节点组成责任链,比如收到一条告警消息,处理节点分为:给开发人员发送邮件、给运维人员发送短信、给业务人员发送 OA 消息。

图片

Disruptor 支持链式处理消息,看下面的示例代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);disruptor.start();
}

Disruptor 也支持多个并行责任链,下图是 2 条责任链的场景:

图片

这里给出一个示例代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");EventHandler<LongEvent> consumer5 = new LongEventHandler("consumer5");EventHandler<LongEvent> consumer6 = new LongEventHandler("consumer6");disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6);disruptor.start();
}

4.多任务协作

一个经典的例子,我们在泡咖啡之前,需要烧水、洗被子、磨咖啡粉,这三个步骤可以并行,但是需要等着三步都完成之后,才可以泡咖啡。

图片

当然,这个例子可以用 Java 中的 CompletableFuture 来实现,代码如下:

public static void main(String[] args){ExecutorService executor = ...;CompletableFuture future1 = CompletableFuture.runAsync(() -> {try {washCup();} catch (InterruptedException e) {e.printStackTrace();}}, executor);CompletableFuture future2 = CompletableFuture.runAsync(() -> {try {hotWater();} catch (InterruptedException e) {e.printStackTrace();}}, executor);CompletableFuture future3 = CompletableFuture.runAsync(() -> {try {grindCoffee();} catch (InterruptedException e) {e.printStackTrace();}}, executor);CompletableFuture.allOf(future1, future2, future3).thenAccept(r -> {System.out.println("泡咖啡");});System.out.println("我是主线程");
}

同样,使用 Disruptor 也可以实现这个场景,看下面代码:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4);disruptor.start();
}

5.多消费者组

类比主流消息队列的场景,Disruptor 也可以实现多消费者组的场景,组间并行消费互不影响,组内消费者竞争消息,如下图:

图片

示例代码如下:

public static void main(String[] args) throws InterruptedException {int bufferSize = 1024;Disruptor<LongEvent> disruptor =new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);WorkHandler<LongEvent> consumer1 = new LogWorkHandler("consumer1");WorkHandler<LongEvent> consumer2 = new LogWorkHandler("consumer2");WorkHandler<LongEvent> consumer3 = new LogWorkHandler("consumer3");WorkHandler<LongEvent> consumer4 = new LogWorkHandler("consumer4");WorkHandler<LongEvent> consumer5 = new LogWorkHandler("consumer5");WorkHandler<LongEvent> consumer6 = new LogWorkHandler("consumer6");disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6);disruptor.start();
}

6.总结

通过消费者的灵活组合,Disruptor 的使用场景非常丰富。本文介绍了 Disruptor 的 5 个典型使用场景。在选型的时候,除了使用场景,更多地要考虑到 Disruptor 作为高性能内存队列的这个特点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/891431.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[MySQL报错]关于发生net start mysql 服务无法启动,服务没有报告任何错误的五种解决方案。

咋直接进入主题。 我遇到的问题是net start mysql 服务无法启动&#xff0c;服务没有报告任何错误 其问题出在哪里呢 一.ini文件配置问题 在于你没有给你下载好的mysql文件中配置.ini文件。 该如何配置呢。那就是先在文件夹中创建一个文本文件&#xff0c;把下面内容复制进去…

HTML5新特性|01 音频视频

音频 1、Audio (音频) HTML5提供了播放音频文件的标准 2、control(控制器) control 属性供添加播放、暂停和音量控件 3、标签: <audio> 定义声音 <source> 规定多媒体资源,可以是多个<!DOCTYPE html> <html lang"en"> <head><…

goView二开低代码平台1.0

官网文档地址&#xff1a;GoView 说明文档 | 低代码数据可视化开发平台 简介&#xff1a;GoView 是一个拖拽式低代码数据可视化开发平台&#xff0c;通过拖拽创建数据大屏&#xff0c;使用Vue3框架&#xff0c;Ts语言和NaiveUI组件库创建的开源项目。安装步骤和地址文档里都有…

2024年中国新能源汽车用车发展怎么样 PaperGPT(一)

概述 在国家政策的强力扶持下&#xff0c;2024年中国新能源汽车市场迎来了新的发展机遇。本文将基于《中国新能源汽车用车报告&#xff08;2024年&#xff09;》的数据&#xff0c;对新能源汽车的市场发展和用车趋势概述。 新能源汽车市场发展 政策推动&#xff1a;国家和地…

数据表中列的完整性约束概述

文章目录 一、完整性约束概述二、设置表字段的主键约束三、设置表字段的外键约束四、设置表字段的非空约束五、设置表字段唯一约束六、设置表字段值自动增加七、设置表字段的默认值八、调整列的完整性约束 一、完整性约束概述 完整性约束条件是对字段进行限制&#xff0c;要求…

Unity网络通信相关

Socket 通信一张图搞定 谁提供服务谁绑定端口&#xff0c;建立Listener,写Host

ChatGPT 与 AGI:人工智能的当下与未来走向全解析

在人工智能的浩瀚星空中&#xff0c;AGI&#xff08;通用人工智能&#xff09;无疑是那颗最为璀璨且备受瞩目的星辰。OpenAI 对 AGI 的定义为“在最具经济价值的任务中超越人类的高度自治系统”&#xff0c;并勾勒出其发展的五个阶段&#xff0c;当下我们大多处于以 ChatGPT 为…

七次课掌握 Photoshop

mediaTEA 的《七次课掌握 Photoshop》系列文章以循序渐进的教学方式&#xff0c;帮助学员在短时间内高效掌握 Photoshop 的核心功能。 从基础知识到高级技巧&#xff0c;课程涵盖图像编辑、选区与抠图、形状与文字、绘画与修饰、调整与混合、样式与滤镜&#xff0c;以及自动化与…

【Goland】怎么执行 go mod download

1、终端的执行 go mod tidy 2、终端执行不行的话&#xff0c;就可以通过右击go.mod文件来执行&#xff1b; 3、也可以按住Ctrl点击这个包安装&#xff1b;

玩转OCR | 腾讯云智能结构化OCR初次体验

目录 一、什么是OCR&#xff08;需要了解&#xff09; 二、产品概述与核心优势 产品概述 智能结构化能做什么 举例说明&#xff08;选看&#xff09; 1、物流单据识别 2、常见证件识别 3、票据单据识别 4、行业材料识别 三、产品特性 高精度 泛化性 易用性 四、…

基于BiLSTM和随机森林回归模型的序列数据预测

本文以新冠疫情相关数据集为案例,进行新冠数量预测。(源码请留言或评论) 首先介绍相关理论概念: 序列数据特点 序列数据是人工智能和机器学习领域的重要研究对象,在多个应用领域展现出独特的特征。这种数据类型的核心特点是 元素之间的顺序至关重要 ,反映了数据内在的时…

安装、快速入门

安装 sudo docker run \-e RABBITMQ_DEFAULT_USERroot \-e RABBITMQ_DEFAULT_PASS123456 \-v rabbitmq-plugins:/plugins \--name rabbitmq \--hostname rabbitmq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq 1、防火墙开放两个端口 2、RabbitMQ 安装 Web 插件&#xff1a; …

JVM学习:CMS和G1收集器浅析

总框架 一、Java自动内存管理基础 1、运行时数据区 运行时数据区可分为线程隔离和线程共享两个维度&#xff0c;垃圾回收主要是针对堆内存进行回收 &#xff08;1&#xff09;线程隔离 程序计数器 虚拟机多线程是通过线程轮流切换、分配处理器执行时间来实现的。为了线程切换…

用uniapp写一个播放视频首页页面代码

效果如下图所示 首页有导航栏&#xff0c;搜索框&#xff0c;和视频列表&#xff0c; 导航栏如下图 搜索框如下图 视频列表如下图 文件目录 视频首页页面代码如下 <template> <view class"video-home"> <!-- 搜索栏 --> <view class…

uniapp 判断多选、选中取消选中的逻辑处理

一、效果展示 二、代码 1.父组件: :id=“this.id” : 给子组件传递参数【id】 @callParentMethod=“takeIndexFun” :给子组件传递方法,这样可以在子组件直接调用父组件的方法 <view @click="$refs.member.open()"

影刀进阶指令 | Kimi (对标ChatGPT)

文章目录 影刀进阶指令 | Kimi &#xff08;对标ChatGPT&#xff09;一. 需求二. 流程三. 实现3.1 流程概览3.2 流程步骤讲解1\. 确定问题2\. 填写问题并发送3\. 检测答案是否出完 四. 运维 影刀进阶指令 | Kimi &#xff08;对标ChatGPT&#xff09; 简单讲讲RPA调用kimi实现…

【面试系列】深入浅出 Spring Boot

熟悉SpringBoot&#xff0c;对常用注解、自动装配原理、Jar启动流程、自定义Starter有一定的理解&#xff1b; 面试题 Spring Boot 的核心注解是哪个&#xff1f;它主要由哪几个注解组成的&#xff1f;Spring Boot的自动配置原理是什么&#xff1f;你如何理解 Spring Boot 配置…

MySQL root用户密码忘记怎么办(Reset root account password)

在使用MySQL数据库的的过程中&#xff0c;不可避免的会出现忘记密码的现象。普通用户的密码如果忘记&#xff0c;可以用更高权限的用户&#xff08;例如root&#xff09;进行重置。但是如果root用户的密码忘记了&#xff0c;由于root用户本身就是最高权限&#xff0c;那这个方法…

Java之内部类*

将一个类定义在另一个类或者一个方法的内部&#xff0c;前者称为内部类&#xff0c;后者称为外部类 实例内部类&#xff1a;实力内部类所处的位置与外部类成员位置相同&#xff0c;因此也受public private等访问限定符的约束静态内部类&#xff08;static&#xff09;匿名内部…

黑马Java面试教程_P3_框架

系列博客目录 文章目录 系列博客目录前言1.Spring1.1 Spring框架中的单例bean是线程安全的吗?面试文稿 1.2 什么是AOP&#xff0c;你们项目中有没有使用到AOP&#xff1f;Spring中的事务是如何实现的&#xff1f;总结面试文稿 1.3 Spring中事务失效的场景有哪些总结面试文稿 1…