Spring Boot集成disruptor快速入门demo

1.disruptor介绍

什么是 Disruptor?

  • Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能的并发框架。可以认为是线程间通信的高效低延时的内存消息组件,它最大的特点是高性能。与 Kafka、RabbitMQ 用于服务间的消息队列不同,disruptor 一般用于一个 JVM 中多个线程间消息的传递。

  • 从功能上来看,Disruptor 实现了“队列”的功能,而且是一个有界队列(事实上它是一个无锁的线程间通信库)。作用与 ArrayBlockingQueue 有相似之处,但是 disruptor 从功能、性能上又都远好于 ArrayBlockingQueue。

Disruptor 的优势

  • Disruptor 最直接的应用场景自然就是“生产者-消费者”模型的应用场合了,虽然这些我们使用 JDK 的 BlockingQueue 也能做到,但 Disruptor 的性能比 BlockingQueue 提高了 5~10 倍左右:

  • 也就是说 BlockingQueue 能做的,Disruptor 都能做到且做的更好。同时 Disruptor 还能做得更多:

2.代码工程

实验目标:利用disruptor发送和接收消息

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>disruptor</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
</project>

application.yaml

server:port: 8088

model

package com.et.disruptor.model;import lombok.Data;@Data
public class MessageModel {private String message;
}

consumer

package com.et.disruptor.event;import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {Thread.sleep(1000);log.info("consume message start");if (event != null) {log.info("the message is:{}",event);}} catch (Exception e) {log.info("consume message fail");}log.info("consume message ending");}
}

producer

package com.et.disruptor.event;import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** @author liuhaihua* @version 1.0* @ClassName HelloEventProducer* @Description todo* @date 2024年03月29日 13:26*/
@Component
public class HelloEventProducer {@AutowiredRingBuffer<MessageModel> ringBuffer;public synchronized void sayHelloMq(String message){EventTranslator<MessageModel> et = new EventTranslator<MessageModel>() {@Overridepublic void translateTo(MessageModel messageModel, long l) {messageModel.setMessage(message);}};ringBuffer.publishEvent(et);}}

HelloEventFactory

package com.et.disruptor.event;import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventFactory;public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}

config类

package com.et.disruptor.config;import com.et.disruptor.event.HelloEventFactory;
import com.et.disruptor.event.HelloEventHandler;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Configuration
public class MqManager {@Bean("ringBuffer")public RingBuffer<MessageModel> messageModelRingBuffer() {//define the thread pool for consumer message hander, Disruptor touch the consumer event to process by java.util.concurrent.ExecutorSerivceExecutorService executor = Executors.newFixedThreadPool(2);//define Event FactoryHelloEventFactory factory = new HelloEventFactory();//ringbuffer byte sizeint bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());//set consumer eventdisruptor.handleEventsWith(new HelloEventHandler());//start disruptor threaddisruptor.start();//gain ringbuffer ring,to product eventRingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}

controller

package com.et.disruptor.controller;import com.et.disruptor.event.HelloEventProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;import java.util.HashMap;
import java.util.Map;@Controller
public class HelloWorldController {@AutowiredHelloEventProducer helloEventProducer;@RequestMapping("/hello")@ResponseBodypublic Map<String, Object> showHelloWorld(){Map<String, Object> map = new HashMap<>();map.put("msg", "HelloWorld");return map;}@RequestMapping("/send")@ResponseBodypublic String  add(String message){helloEventProducer.sayHelloMq(message);return "success";}}

启动类

package com.et.disruptor;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springboot-demo

3.测试

  1. 启动Spring Boot应用服务

  2. 浏览器输入http://127.0.0.1:8088/send?message=hello%20world

  3. 控制台输出日志

2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message start
2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : the message is:MessageModel(message=hello world)
2024-03-29 14:00:54.831 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message ending

4.引用

  • https://github.com/LMAX-Exchange/disruptor

  • http://www.liuhaihua.cn/archives/710370.html

  • https://www.jb51.net/article/274145.htm

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/789814.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++——异常机制

目录 一&#xff0c;背景 1.1 C语言处理错误的方式 1.2 C异常概念 二&#xff0c;异常的使用 2.1 异常的简单使用 2.2 异常的匹配原则 2.3 异常抛对象 2.4 异常的重新抛出 2.5 异常安全 三&#xff0c;自定义异常体系 四&#xff0c;异常优缺点 4.1 优点 4.2 缺点 …

【爬虫框架Scrapy】02 Scrapy入门案例

接下来介绍一个简单的项目&#xff0c;完成一遍 Scrapy 抓取流程。通过这个过程&#xff0c;我们可以对 Scrapy 的基本用法和原理有大体了解。 1. 本节目标 本节要完成的任务如下。 创建一个 Scrapy 项目。 创建一个 Spider 来抓取站点和处理数据。 通过命令行将抓取的内容…

从零开始为香橙派orangepi zero 3移植主线linux——2.linux kernel

从零开始为香橙派orangepi zero 3移植主线linux——2.linux kernel 0.环境搭建补档NFS服务TFTP服务 一、linux kernel编译二、运行 0.环境搭建补档 linux kernel验证时&#xff0c;使用tftp服务从ubuntu主机下载启动更加方便&#xff0c;等到验证无误后再一次性烧写到tf卡。所以…

选择哪些即时通讯产品,提高企业内部沟通与协作?

随着移动互联网的飞速发展&#xff0c;即时通讯工具已经成为了企业内部沟通和协作的首选。无论是即时聊天、文件共享、在线会议、日程安排还是群组沟通&#xff0c;都能帮助企业提高工作效率和协同能力。本文将推荐一些国内较为常用的即时通讯产品&#xff0c;并以 Slcak 为例&…

基于java+SpringBoot+Vue的校园交友网站设计与实现

基于javaSpringBootVue的校园交友网站设计与实现 开发语言: Java 数据库: MySQL技术: SpringBoot MyBatis工具: IDEA/Eclipse、Navicat、Maven 系统展示 前台展示 后台展示 系统简介 整体功能包含&#xff1a; 校园交友网站是一个为在校师生提供一个交流互动、寻找朋友的…

vue3+ts 调用接口,数据显示

数据展示 &#xff08;例&#xff1a;展示医院等级数据&#xff0c;展示医院区域数据同理。&#xff09; 接口文档中&#xff0c;输入参数 测试一下接口&#xff0c;发请求 看是否能够拿到信息 获取接口&#xff0c;api/index.ts 中 /home/index.ts // 统一管理首页模块接口 i…

ansible-自动化工具

一、ansible概述 不是C/S架构&#xff0c;就是一种工具 1&#xff1a;linux自动化运维 编写程序实现运维自动化&#xff1a;shell python 工具模式自动化&#xff1a; ①OS Provisioning&#xff1a; RedHat satellite&#xff1b;PXE&#xff08;可实现dhcp和tftp&#…

PFC交流电压跌落测试实现方法与仿真

目录 前言 交流电压跌落测试 仿真验证 总结 前言 之前双向交错的图腾柱仿真到代码生成系统验证已经实现&#xff0c;最近在学习如何解决交流电压跌落的问题&#xff0c;目前根据需要通过仿真实现了该工况的模拟&#xff0c;这里简单记录一下。 双向交错CCM图腾柱无桥单相PF…

Python PyQt5——QThread使用方法与代码实践

QThread 在 GUI 程序中&#xff0c;如果想要让程序执行一项耗时的工作&#xff0c;例如下载文件、I/O 存取等,深度学习模型推理&#xff0c;直接在 UI 线程中进行这些操作会导致整个界面卡住&#xff0c;出现无响应的状态。为了避免这种情况&#xff0c;可以将这些耗时任务放在…

Linux提权!!!

上一篇文章讲了Windows的提权&#xff0c;那么这篇文章就来讲一下Linux的提权 1.SUID提权 suid权限 作用&#xff1a;让普通用户临时拥有该文件的属主的执行权限&#xff0c;suid权限只能应用在二进制可执行文件&#xff08;命令&#xff09;上&#xff0c;而且suid权限只能设置…

HR是怎么看待PMP证书的呢?

考PMP证书为什么值得&#xff1f;对管理人员有用么&#xff1f; 首先&#xff0c;在行业内部&#xff0c;一名项目经理&#xff0c;需要有PMP证书已经是行业内的共识了。而且面试的时候&#xff0c;如果是同样的年龄段&#xff0c;同样的背景&#xff0c;那有证书在手的人&…

瑞吉外卖实战学习--13、完善删除中的逻辑

完善删除中的逻辑 前言效果图逻辑介绍表结构根据mybatisPlus接口规范创建实体类和service和mapper文件1、实体类Dish和Setmeal2、Mapper接口DishMapper和SetealMapper3、Service接口DishService和setmealService4、Service实现类DishServiceImpl和setmealServicelmpl 编写删除函…

【御控物联】JSON结构数据转换在物流调度系统中的应用(场景案例三)

文章目录 一、前言二、场景概述三、解决方案四、在线转换工具五、技术资料 一、前言 物流调度是每个生产厂区必不可少的一个环节&#xff0c;主要包括线边物流和智能仓储。线边物流是指将物料定时、定点、定量配送到生产作业一线的环节&#xff0c;其包括从集中仓库到线边仓、…

【详解】Windows系统安装Nginx及简单使用

【详解】Windows系统安装Nginx及简单使用 一、Nginx是什么&#xff1f; “Nginx 是一款轻量级的 HTTP 服务器&#xff0c;采用事件驱动的异步非阻塞处理方式框架&#xff0c;这让其具有极好的 IO 性能&#xff0c;时常用于服务端的反向代理和负载均衡。”Nginx 是一款 http 服…

鸿蒙OS开发实例:【组件化模式】

组件化一直是移动端比较流行的开发方式&#xff0c;有着编译运行快&#xff0c;业务逻辑分明&#xff0c;任务划分清晰等优点&#xff0c;针对Android端的组件化&#xff1b;与Android端的组件化相比&#xff0c;HarmonyOS的组件化可以说实现起来就颇费一番周折&#xff0c;因为…

IoT数采平台4:测试

IoT数采平台1&#xff1a;开篇IoT数采平台2&#xff1a;文档IoT数采平台3&#xff1a;功能IoT数采平台4&#xff1a;测试 Modbus RTU串口测试 OPC测试 HTTP测试 MQTT透传测试 MQTT网关测试及数据上报 TCP / UDP 监听&#xff0c;客户端连上后发送信息&#xff0c;客户端上报数据…

pygame--坦克大战(一)

项目搭建 本游戏主要分为两个对象,分别是我方坦克和敌方坦克。用户可以通过控制我方的坦克来摧毁敌方的坦克保护自己的“家”,把所有的敌方坦克消灭完达到胜利。敌方的坦克在初始的时候是默认5个的(这可以自己设置),当然,如果我方坦克被敌方坦克的子弹打中,游戏结束。从…

Lambda表达式,Stream流

文章目录 Lambda表达式作用前提函数式接口特点 语法省略模式和匿名对象类的区别 Stream流思想作用三类方法获取方法单列集合(Collection[List,Set双列集合Map(不能直接获取)数组同一类型元素(Stream中的静态方法) 常见的中间方法终结方法收集方法 Optional类 Lambda表达式 作用…

每日三个JAVA经典面试题(三十四)

1.Mybatis的一级、二级缓存 MyBatis提供了两种缓存机制来提高查询效率&#xff1a;一级缓存和二级缓存。 一级缓存&#xff08;Session级别&#xff09; 作用范围&#xff1a;一级缓存是基于SqlSession的。这意味着&#xff0c;如果你在同一个SqlSession中执行两次相同的查询…

数据结构(六)——图的存储及基本操作

6.2 图的存储及基本操作 6.2.1 邻接矩阵法 邻接矩阵存储无向图、有向图 #define MaxVertexNum 100 //顶点数目的最大值typedef struct{char Vex[MaxVertexNum]; //顶点表int Edge[MaxVertexNum][MaxVertexNum]; //邻接矩阵&#xff0c;边表int vexnum,arcnum; //图的当前…