目录
项目介绍
核心功能
核心技术
演示直接交换机
演示扇出交换机
演示主题交换机
项目介绍
- 此处我们模拟 RabbitMQ 实现了一个消息队列服务器
核心功能
- 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理
- 九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、创建绑定、解除绑定、发布消息、订阅消息、确认消息
- 实现了三种典型的消息转换方式 直接交换机(Direct)、扇出交换机(Fanout)、主题交换机(Topic)
- 交换机、队列、绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
- 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作
核心技术
- Spring Boot / MyBatis / Lombok
- SQLite
- TCP
- 关于该项目的需求分析,可点击下方链接跳转
模拟实现 RabbitMQ —— 需求分析
- 关于该项目的核心类,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现核心类
- 关于该项目的数据库操作,可点击下方链接跳转
模拟实现 RabbitMQ —— 数据库操作
- 关于该项目的消息持久化,可点击下方链接跳转
模拟实现 RabbitMQ —— 消息持久化
- 关于该项目的内存数据管理,可点击下方链接跳转
模拟实现 RabbitMQ —— 内存数据管理
- 关于该项目的虚拟机设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 虚拟主机设计
- 关于该项目的交换机转发规则,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现转发规则
- 关于该项目的消费逻辑,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现消费消息逻辑
- 关于该项目网络通信设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 网络通信设计(服务器)
模拟实现 RabbitMQ —— 网络通信设计(客户端)
演示直接交换机
- 简单写一个 demo 模拟 跨主机的生产者消费者模型
- 此处为了方便,就在本机演示
- 此处我们创建的交换机类型为 直接交换机
1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!
@SpringBootApplication public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();} }
2、编写生产者代码
/* * 这个类用来表示一个生产着 * 通常这是一个单独的服务器程序 * */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/* * 这个类表示一个消费者 * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
演示扇出交换机
- 此处我们创建的交换机类型为 扇出交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者A 代码
/** 这个类表示一个消费者A* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerA {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue1",true,false,false,null); // 设置绑定channel.queueBind("testQueue1","testExchange",""); // 订阅消息channel.basicConsume("testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、编写消费者B 代码
/** 这个类表示一个消费者B* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerB {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue2",true,false,false,null); // 设置绑定channel.queueBind("testQueue2","testExchange",""); // 订阅消息channel.basicConsume("testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
5、启动 Spring Boot 项目(启动 Broker Server)
6、运行消费者A 代码
7、运行消费者B 代码
8、运行生产者代码
9、继续观察消费者A 的控制台
10、继续观察消费者B 的控制台
演示主题交换机
- 此处我们创建的交换机为 主题交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建消息A 并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);// 创建消息B 并发送body = "hi".getBytes();ok = channel.basicPublish("testExchange","aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/** 这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null); // 创建队列channel.queueDeclare("testQueue",true,false,false,null); // 设置绑定channel.queueBind("testQueue","testExchange","*.aaa.bbb"); // 订阅消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}}); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台