Spring Boot3.x集成Disruptor4.0

Disruptor介绍

Disruptor是一个高性能内存队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

Disruptor 是一个 Java 的并发编程框架,大大的简化了并发程序开发的难度,在性能上也比 Java 本身提供的一些并发包要好。它源于LMAX对并发性 、性能和非阻塞算法的研究,如今构成了其Exchange基础架构的核心部分。

Disruptor的队列功能和传统的MQ队列服务不同(比如:kafka\rabbitMq等等),Disruptor而是一个基于JDK的高性能内存队列,例如与Java的BlockingQueue进行对比。与队列一样,Disruptor的目的是在同一进程内的线程之间传递数据(例如消息或事件)。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。

Disruptor功能

  • 高性能消息传递:Disruptor 能够通过避免锁和减少线程间的数据交换来提高性能。
  • 支持多生产者和多消费者:可以由多个生产者向队列中添加事件,同时多个消费者处理这些事件。
  • 事件处理模型:Disruptor 使用预分配事件的环形数组结构,每个事件槽可以被重复使用,减少了对象创建的开销。
  • 内存屏障优化:利用内存屏障来减少不必要的CPU缓存刷新,提高效率。

Disruptor优点

  • 极高的吞吐量和低延迟:通过减少锁的使用和优化内存操作,Disruptor 能够实现极高的数据处理速率和低延迟。
  • 避免了线程阻塞:使用无锁的设计,避免了传统队列中的线程阻塞问题。
  • 资源利用率高:通过重复使用事件对象,减少了垃圾回收的压力。

Disruptor缺点

  • 复杂性:Disruptor 的使用和理解比标准的队列或者其他并发模型要复杂,需要更多的学习和调试。
  • 适用场景有限:主要适用于需要极高性能和低延迟的系统,对于一般的应用场景可能是过度设计。
  • 调试困难:由于其无锁的设计和复杂的内部结构,当出现问题时,调试可能比较困难。
  • 总的来说,Disruptor 是一个专为高性能计算设计的工具,适用于那些对性能有极端要求的场景。对于普通应用或者数据量不大的情况,使用传统的并发模型可能更为合适。

Disruptor特征

Disruptor的目标之一是在低延迟环境中使用,在低延迟系统中,必须减少或移除内存分配;

在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿;为了支持这一点,用户可以预先分配Disruptor中事件所需的存储空间(也就是声明RingBuffer的大小)。

在构造RingBuffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时候被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便他们可以调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的。

官方文档

github开源地址

GitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library

github介绍文档

LMAX Disruptor

开发示例

我们以一个项目来演示,开发一个订单业务消息处理服务,来模拟采用Disruptor队列来对订单进行管理;

采用一个生产者,多个消费者模式,并且多个消费者按不同的顺序进行链路排例,对生产者消息进行消费;

如:

某电商平台存在以下服务功能

  • 订单管理服务:生成订单后,过行订单管理与跟踪
  • 用户等级服务:用户购买商品后,重新评估用户星级等级,提供对标服务
  • 电商客服服务:负责售前售后服务,有3组,分别为A组、B组、C组,每个订单只分配到其中一组提供对接客服服务
  • 仓储管理服务:管理平台所有商品仓库,并提供已购买商品
  • 物流投递服务:负责从仓储中获取商品,投送到用户手中

电商平台每完成一笔支付订单,将消息发送到此Disruptor示例服务。

  • 首先分别经过订单管理服务和用户等级服务,注意:两个服务均为独立消费消息
  • 完成前置两个服务消费后,才能执行电商客服A组、电商客服B组、电商客服C组其中任意一个服务独立消费消息,注意:三选一,不能全部执行
  • 完成前置电商客服服务消费后,执行仓储管理服务消费消息
  • 完成前置仓储管理服务消费后,最后执行物流投递服务

消费链路流程如下:

工程环境

  • JDK:17
  • SpringBoot:3.1.3-SNAPSHOT

注:假设你已创建基础工程,并完成SpringBoot组件引入

引入Disruptor依赖

<dependencies><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>4.0.0</version></dependency>
</dependencies>

项目Disruptor配置

package com.example.disruptor;import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicate;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;/*** @Description 服务配置类*/
@Slf4j
@Configuration(proxyBeanMethods = false)
public class WebConfig {private static final RequestPredicate JSON_ACCEPT = accept(new MediaType(MediaType.APPLICATION_JSON, StandardCharsets.UTF_8));//缓冲区大小,必需是2的N次方final static int BUFFER_SIZE = 1024 * 1024;/*** 创建基于环的可重用队列存储,实现消息数据存储与推送到处理器执行* @return*/@Bean("orderRingBuffer")public RingBuffer<OrderEvent> createRingBuffer(){// 创建消息处理器,注:正常业务模式下,由不同的业务类实现;此处为了简化演示,从createEventHandler()方法中获取模拟实现类;EventHandler<OrderEvent> orderHandler = createEventHandler("消息序例:{}, 发送到《订单管理服务》, 订单详情:{}");EventHandler<OrderEvent> userLevelHandler = createEventHandler("消息序例:{}, 发送到《用户等级服务》, 订单详情:{}");EventHandler<OrderEvent> customerService0Handler = createEventHandler(0, 3, "消息序例:{}, 发送到《电商客服A组》, 订单详情:{}");EventHandler<OrderEvent> customerService1Handler = createEventHandler(1, 3, "消息序例:{}, 发送到《电商客服B组》, 订单详情:{}");EventHandler<OrderEvent> customerService2Handler = createEventHandler(2, 3, "消息序例:{}, 发送到《电商客服C组》, 订单详情:{}");EventHandler<OrderEvent> storageHandler = createEventHandler("消息序例:{}, 发送到《仓储管理服务》, 订单详情:{}");EventHandler<OrderEvent> deliveryHandler = createEventHandler("消息序例:{}, 发送到《物流投递服务》, 订单详情:{}");//创建环形缓冲区处理器事件生成器Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(OrderEvent::new,//缓冲区大小BUFFER_SIZE,//默认线程工厂Executors.defaultThreadFactory(),//ProducerType.SINGLE(表示生产者只有一个)和ProducerType.MULTY(表示有多个生产者)ProducerType.SINGLE,/**可用事件策略:BlockingWaitStrategy:用了ReentrantLock的等待&&唤醒机制实现等待逻辑,是默认策略,比较节省CPUBusySpinWaitStrategy:持续自旋,JDK9之下慎用(最好别用)DummyWaitStrategy:返回的Sequence值为0,正常环境是用不上的LiteBlockingWaitStrategy:基于BlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,不建议使用TimeoutBlockingWaitStrategy:带超时的等待,超时后会执行业务指定的处理逻辑LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作SleepingWaitStrategy:三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的的睡眠YieldingWaitStrategy:二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPUPhasedBackoffWaitStrategy:四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个注意:BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性*/new YieldingWaitStrategy());//消息消费配置处理器执行链路:orderHandler, userLevelHandler》customerService[0~2]Handler》storageHandler》deliveryHandler//分别独立执行:orderHandler(订单管理服务), userLevelHandler(用户等级服务)disruptor.handleEventsWith(orderHandler, userLevelHandler)//前置消费后,再执行其中任意一个处理器:customerService[0~2]Handler(电商客服A组\电商客服B组\电商客服C组,三选一).then(customerService0Handler, customerService1Handler, customerService2Handler)//前置消费后,再执行处理器:storageHandler(仓储管理服务).then(storageHandler)//前置消费后,再执行处理器:deliveryHandler(物流投递服务).then(deliveryHandler);//启动disruptor服务disruptor.start();return disruptor.getRingBuffer();}/*** 创建消息处理器* @param msg* @return*/private EventHandler<OrderEvent> createEventHandler(final String msg){return (event, sequence, endOfBatch)->{log.info(msg, sequence,event);//业务逻辑代码...};}/*** 创建消息处理器,支持相同处理器多选一(取模计算)* @param index* @param handlerCount* @param msg* @return*/private EventHandler<OrderEvent> createEventHandler(final int index, final int handlerCount, final String msg){return (event, sequence, endOfBatch)->{if (sequence % handlerCount == index) {log.info(msg, sequence, event);//业务逻辑代码...}};}/*** 后台服务请求router* @param orderRingBuffer* @return*/@Beanpublic RouterFunction<ServerResponse> webRouterFunction(RingBuffer<OrderEvent> orderRingBuffer) {return route().POST("/order/push", JSON_ACCEPT, (request)->{String orderId = request.queryParam("orderId").orElse("");String name = request.queryParam("name").orElse("");String price = request.queryParam("price").orElse("");orderRingBuffer.publishEvent((orderEvent, sequence) -> {orderEvent.setOrderId(orderId);orderEvent.setName(name);orderEvent.setPrice(Double.parseDouble(price));});return ServerResponse.status(HttpStatus.OK).bodyValue("ok,200");}).build();}
}

订单对象

package com.example.disruptor;import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
public class OrderEvent {private String orderId;private String name;private Double price;
}

启动类

package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DisruptorSamplesApplication {public static void main(String[] args) {SpringApplication.run(DisruptorSamplesApplication.class, args);}
}

YML配置文件

# 本地服务访问
server:# 服务端口port: 8080# 服务IPaddress: 0.0.0.0
# 配置日志
logging:level:org.springframework: info
# 开启debug模式
debug: false

工程测试

Postman发送POST请求,将模拟表单数据提交到服务端;

服务打印日志

2024-04-30T18:08:05.227+08:00  INFO 39828 --- [pool-4-thread-1] com.example.disruptor.WebConfig: 消息序例:0, 发送到《订单管理服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.227+08:00  INFO 39828 --- [pool-4-thread-2] com.example.disruptor.WebConfig: 消息序例:0, 发送到《用户等级服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-3] com.example.disruptor.WebConfig: 消息序例:0, 发送到《电商客服A组》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-6] com.example.disruptor.WebConfig: 消息序例:0, 发送到《仓储管理服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)
2024-04-30T18:08:05.230+08:00  INFO 39828 --- [pool-4-thread-7] com.example.disruptor.WebConfig: 消息序例:0, 发送到《物流投递服务》, 订单详情:OrderEvent(orderId=VL-20240495001, name=苹果1斤, price=7.65)

通过日志清楚的展示了,消费者消息处理器按程序执行配置的链路顺序正确打印;

结束

以上介绍了disruptor的基本信息与特点,并通过代码工程演示了dirsruptor在项目中如何开发,以及使用场景等;本文内容介绍有限,实际应用过程中disruptor还有很多其它用法,并未通过本文全整的展述,比如:如何使用多个消费者在线程池下完成消费,以及多生产者模式;可以通过官方文档与源码了解更多dirsruptor用法与功能;本文如有不足之处,欢迎指正与交流;

参考:

高性能队列——Disruptor-CSDN博客

LMAX Disruptor

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/833097.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

geojson文件规格

geojson文件示例&#xff0c; {"type": "FeatureCollection","features": [{"type": "Feature","geometry": {"type": "Point","coordinates": [102.0, 0.5]},"properties&q…

阴影渲染在AI去衣技术中的关键作用

引言&#xff1a; 随着人工智能技术的飞速发展&#xff0c;深度学习在图像处理领域取得了突破性的进展。其中&#xff0c;AI去衣技术作为一种高度复杂的图像到图像的转换过程&#xff0c;不仅要求算法能够精确地识别并处理衣物纹理和结构&#xff0c;还要求生成的结果具有高度的…

SaaS应用加速解决方案

随着企业业务的迅速扩展&#xff0c;SaaS应用成为企业提升办公效率的关键。然而&#xff0c;在SaaS应用广泛使用的同时&#xff0c;访问速度受限、网络拥堵等问题也逐渐浮现。为了解决这些挑战&#xff0c;SaaS应用加速方案应运而生&#xff0c;旨在助力企业高效运转&#xff0…

基于51单片机ESP8266wifi控制机器人—送餐、快递

基于51单片机wifi控制机器人 &#xff08;程序&#xff0b;原理图&#xff0b;PCB&#xff0b;设计报告&#xff09; ​功能介绍 具体功能&#xff1a; 1.L298N驱动电机&#xff0c;机器人行走&#xff1b; 2.装备红外线感应检测到周围环境&#xff0c;进行行程判断&#xf…

代码随想录第52天|300.最长递增子序列 718. 最长重复子数组

300.最长递增子序列 300. 最长递增子序列 - 力扣&#xff08;LeetCode&#xff09; 代码随想录 (programmercarl.com) 动态规划之子序列问题&#xff0c;元素不连续&#xff01;| LeetCode&#xff1a;300.最长递增子序列_哔哩哔哩_bilibili 给你一个整数数组 nums &#xff0…

multipass和multipassd命令的区别

multipassd通常是multipass服务的后台守护进程&#xff0c;它负责管理和控制虚拟机实例。 命令区别 例&#xff1a; multipass restart my-vm 这个命令用于重启Multipass中的虚拟机实例。例如有一个名为my-vm的虚拟机实例。 multipassd restart 这会重新启动Multipass后台…

Scroll生态项目Penpad,再获Presto Labs的投资

Penpad是Scroll生态的LaunchPad平台&#xff0c;其整计划像收益聚合器以及RWA等功能于一体的综合性Web3平台拓展&#xff0c;该平台在近期频获资本市场关注&#xff0c;并获得了多个知名投资者/投资机构的支持。 截止到本文发布前&#xff0c;Penpad已经获得了包括Scroll联合创…

了解 Postman:这个 API 工具的功能和用途是什么?

在软件开发中&#xff0c;经常听到 Postman 这个软件名。但其实很多新手开发者只知道这是软件开发常用的软件&#xff0c;并不知道实际是一个什么样工具&#xff0c;不知道具体的作用是什么。那今天就跟大家好好唠唠 Postman 这个软件。想要学习更多关于 Postman 的知识&#x…

Codigger:Web应用赋能的分布式操作系统让用户卓越体验

Codigger&#xff0c;作为一个分布式操作系统&#xff0c;其独特之处在于其采用的浏览器/服务器&#xff08;Browser/Server&#xff0c;简称B/S&#xff09;架构。这种架构的核心思想是&#xff0c;通过浏览器来进入工作界面&#xff0c;页面交互部分事务逻辑在前端&#xff0…

【Linux网络】PXE批量网络装机

目录 一、系统装机 1.1 三种引导方式 1.2 系统安装过程 1.3 四大重要文件 二、PXE 2.1 PXE实现原理 2.2 PXE手动搭建过程 2.3 kickstart配合pxe完成批量自动安装 一、系统装机 1.1 三种引导方式 硬盘光驱(U盘)网络启动 1.2 系统安装过程 加载boot loader加载启动安…

Autosar PNC网络管理配置-UserData的使用

文章目录 前言ComComSignalComIPdu CanNmSignal Mapping总结 前言 之前配置的网络管理报文中的data都由ComM管理&#xff0c;后面客户新增了需求&#xff0c;最后两个byte需要发送Wakeup Reason&#xff0c;本文记录一下相关配置的修改 Com ComSignal 之前配置的PN_TX&…

Java 线程池 ( Thread Pool )的简单介绍

想象一下&#xff0c;你正指挥着一支超级英雄团队&#xff0c;面对蜂拥而至的敌人&#xff08;任务&#xff09;&#xff0c;不是每次都召唤新英雄&#xff08;创建线程&#xff09;&#xff0c;而是精心调配现有成员&#xff0c;高效应对。这就是Java线程池的魔力&#xff0c;…

Codeforces Round 942 (Div. 2) A-D1

题目&#xff1a; Codeforces Round 942 (Div. 2) D2有缘再补吧… A. Contest Proposal 题意 两个升序&#xff08;不降&#xff09;的序列a和b&#xff0c;可以在a的任意位置插入任意数&#xff08;要保持升序&#xff09;&#xff0c;使对任意i&#xff0c;有a[i] < b[…

28.leetcode---前K个高频单词(Java版)

题目链接: https://leetcode.cn/problems/top-k-frequent-words/description/ 题解: 代码: 测试:

纯血鸿蒙APP实战开发——底部面板嵌套列表滑动案例

介绍 本示例主要介绍了利用panel实现底部面板内嵌套列表&#xff0c;分阶段滑动效果场景。 效果图预览 使用说明 点击底部“展开”&#xff0c;弹出panel面板。在panel半展开时&#xff0c;手指向上滑动panel高度充满页面&#xff0c;手指向下滑动panel隐藏。在panel完全展开…

自动引流获客软件的魅力

在数字化时代&#xff0c;企业之间的竞争愈发激烈。对于任何一家企业来说&#xff0c;如何有效地吸引潜在客户、提升品牌知名度和销售业绩成为了至关重要的课题。传统的营销手段虽然依旧有效&#xff0c;但随着互联网技术的发展&#xff0c;自动引流获客软件以其独特的魅力&…

文心一言 VS 讯飞星火 VS chatgpt (253)-- 算法导论18.2 6题

六、假设 B-TREE-SEARCH 的实现是在每个结点内采用二分查找&#xff0c;而不是线性查找。证明&#xff1a;无论怎样选择 t ( t 为 n 的函数)&#xff0c;这种实现所需的 CPU 时间都为 O(lgn)。如果要写代码&#xff0c;请用go语言。 文心一言&#xff1a; 首先&#xff0c;我…

端口被其他进程占用:OSError: [Errno 98] Address already in use

一、问题描述 错误提示端口号正在被使用 二、解决办法 1.使用 lsof 命令&#xff0c;列出所有正在监听&#xff08;即被绑定&#xff09;的网络连接&#xff0c;包括它们所使用的端口号 sudo lsof -i -P -n | grep LISTEN 2.解绑被绑定的端口号 根据 netstat 或 lsof 命令…

java入门详细教程——day01

目录 1. Java入门 1.1 Java是什么&#xff1f; 1.2 Java语言的历史 1.3 Java语言的分类 1.4 Java语言的特点 1.4.1 先编译再解释运行 1.4.2 跨平台 1.5 JRE和JDK&#xff08;记忆&#xff09; 1.6 JDK的下载和安装&#xff08;应用&#xff09; 1.6.1 下载 1.6.2 安…

04-22 周日 阿里云-瑶光上部署FastBuild过程(配置TLS、自定义辅助命令)

04-22 周日 阿里云-瑶光上部署FastBuild过程 时间版本修改人描述2024年4月22日14:18:59V0.1宋全恒新建文档2024年4月23日20:41:26V1.0宋全恒完成了基本流程的添加 简介 前提 准备两台服务&#xff0c;一台部署Docker&#xff0c;一台部署FastBuild的镜像容器服务所述的Docke…