1.disruptor介绍
什么是 Disruptor?
Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能的并发框架。可以认为是线程间通信的高效低延时的内存消息组件,它最大的特点是高性能。与 Kafka、RabbitMQ 用于服务间的消息队列不同,disruptor 一般用于一个 JVM 中多个线程间消息的传递。
从功能上来看,Disruptor 实现了“队列”的功能,而且是一个有界队列(事实上它是一个无锁的线程间通信库)。作用与 ArrayBlockingQueue 有相似之处,但是 disruptor 从功能、性能上又都远好于 ArrayBlockingQueue。
Disruptor 的优势
Disruptor 最直接的应用场景自然就是“生产者-消费者”模型的应用场合了,虽然这些我们使用 JDK 的 BlockingQueue 也能做到,但 Disruptor 的性能比 BlockingQueue 提高了 5~10 倍左右:
也就是说 BlockingQueue 能做的,Disruptor 都能做到且做的更好。同时 Disruptor 还能做得更多:
2.代码工程
实验目标:利用disruptor发送和接收消息
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>disruptor</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
</project>
application.yaml
server:port: 8088
model
package com.et.disruptor.model;import lombok.Data;@Data
public class MessageModel {private String message;
}
consumer
package com.et.disruptor.event;import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {Thread.sleep(1000);log.info("consume message start");if (event != null) {log.info("the message is:{}",event);}} catch (Exception e) {log.info("consume message fail");}log.info("consume message ending");}
}
producer
package com.et.disruptor.event;import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** @author liuhaihua* @version 1.0* @ClassName HelloEventProducer* @Description todo* @date 2024年03月29日 13:26*/
@Component
public class HelloEventProducer {@AutowiredRingBuffer<MessageModel> ringBuffer;public synchronized void sayHelloMq(String message){EventTranslator<MessageModel> et = new EventTranslator<MessageModel>() {@Overridepublic void translateTo(MessageModel messageModel, long l) {messageModel.setMessage(message);}};ringBuffer.publishEvent(et);}}
HelloEventFactory
package com.et.disruptor.event;import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.EventFactory;public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}
config类
package com.et.disruptor.config;import com.et.disruptor.event.HelloEventFactory;
import com.et.disruptor.event.HelloEventHandler;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Configuration
public class MqManager {@Bean("ringBuffer")public RingBuffer<MessageModel> messageModelRingBuffer() {//define the thread pool for consumer message hander, Disruptor touch the consumer event to process by java.util.concurrent.ExecutorSerivceExecutorService executor = Executors.newFixedThreadPool(2);//define Event FactoryHelloEventFactory factory = new HelloEventFactory();//ringbuffer byte sizeint bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());//set consumer eventdisruptor.handleEventsWith(new HelloEventHandler());//start disruptor threaddisruptor.start();//gain ringbuffer ring,to product eventRingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}
controller
package com.et.disruptor.controller;import com.et.disruptor.event.HelloEventProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;import java.util.HashMap;
import java.util.Map;@Controller
public class HelloWorldController {@AutowiredHelloEventProducer helloEventProducer;@RequestMapping("/hello")@ResponseBodypublic Map<String, Object> showHelloWorld(){Map<String, Object> map = new HashMap<>();map.put("msg", "HelloWorld");return map;}@RequestMapping("/send")@ResponseBodypublic String add(String message){helloEventProducer.sayHelloMq(message);return "success";}}
启动类
package com.et.disruptor;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}
}
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
https://github.com/Harries/springboot-demo
3.测试
启动Spring Boot应用服务
浏览器输入http://127.0.0.1:8088/send?message=hello%20world
控制台输出日志
2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message start
2024-03-29 14:00:54.828 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : the message is:MessageModel(message=hello world)
2024-03-29 14:00:54.831 INFO 22784 --- [pool-1-thread-1] c.et.disruptor.event.HelloEventHandler : consume message ending
4.引用
https://github.com/LMAX-Exchange/disruptor
http://www.liuhaihua.cn/archives/710370.html
https://www.jb51.net/article/274145.htm