2024.2.29 模拟实现 RabbitMQ —— 项目展示

目录

项目介绍

核心功能

核心技术

演示直接交换机

演示扇出交换机

演示主题交换机


项目介绍

  • 此处我们模拟 RabbitMQ 实现了一个消息队列服务器

核心功能

  • 提供了 虚拟主机交换机队列绑定消息 概念的管理
  • 九大核心 API 创建队列销毁队列创建交换机销毁交换机创建绑定解除绑定发布消息订阅消息确认消息
  • 实现了三种典型的消息转换方式 直接交换机(Direct)扇出交换机(Fanout)主题交换机(Topic)
  • 交换机队列绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
  • 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作

核心技术

  • Spring Boot / MyBatis / Lombok
  • SQLite
  • TCP

  • 关于该项目的需求分析,可点击下方链接跳转

模拟实现 RabbitMQ —— 需求分析


  • 关于该项目的核心类,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现核心类


  • 关于该项目的数据库操作,可点击下方链接跳转

模拟实现 RabbitMQ —— 数据库操作


  • 关于该项目的消息持久化,可点击下方链接跳转

模拟实现 RabbitMQ —— 消息持久化


  • 关于该项目的内存数据管理,可点击下方链接跳转

模拟实现 RabbitMQ —— 内存数据管理


  • 关于该项目的虚拟机设计,可点击下方链接跳转

模拟实现 RabbitMQ —— 虚拟主机设计


  • 关于该项目的交换机转发规则,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现转发规则


  • 关于该项目的消费逻辑,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现消费消息逻辑


  • 关于该项目网络通信设计,可点击下方链接跳转

模拟实现 RabbitMQ —— 网络通信设计(服务器)

模拟实现 RabbitMQ —— 网络通信设计(客户端)

演示直接交换机

  • 简单写一个 demo 模拟 跨主机的生产者消费者模型
  • 此处为了方便,就在本机演示

  • 此处我们创建的交换机类型为 直接交换机

1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!

@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}
}

2、编写生产者代码

/*
* 这个类用来表示一个生产着
* 通常这是一个单独的服务器程序
* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//        创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);//        创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}

3、编写消费者代码

/*
* 这个类表示一个消费者
* 通常这个类也应该是在一个独立的服务器中被执行
* */
public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

4、启动 Spring Boot 项目(启动 Broker Server)


5、运行消费者代码


6、运行生产者代码


7、继续观察消费者的控制台

演示扇出交换机

  • 此处我们创建的交换机类型为 扇出交换机

 1、编写生产者代码

/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
//        创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}

3、编写消费者A 代码

/** 这个类表示一个消费者A* 通常这个类也应该是在一个独立的服务器中被执行* */
public class DemoConsumerA {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null);
//        创建队列channel.queueDeclare("testQueue1",true,false,false,null);
//        设置绑定channel.queueBind("testQueue1","testExchange","");
//        订阅消息channel.basicConsume("testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

4、编写消费者B 代码

/** 这个类表示一个消费者B* 通常这个类也应该是在一个独立的服务器中被执行* */
public class DemoConsumerB {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
//        创建队列channel.queueDeclare("testQueue2",true,false,false,null);
//        设置绑定channel.queueBind("testQueue2","testExchange","");
//        订阅消息channel.basicConsume("testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

5、启动 Spring Boot 项目(启动 Broker Server)


6、运行消费者A 代码


7、运行消费者B 代码


8、运行生产者代码


9、继续观察消费者A 的控制台


10、继续观察消费者B 的控制台

演示主题交换机

  • 此处我们创建的交换机为 主题交换机

 1、编写生产者代码

/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//        创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);//        创建消息A 并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);//        创建消息B 并发送body = "hi".getBytes();ok = channel.basicPublish("testExchange","aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();}
}

3、编写消费者代码

/** 这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* */
public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
//        创建交换机channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);
//        创建队列channel.queueDeclare("testQueue",true,false,false,null);
//        设置绑定channel.queueBind("testQueue","testExchange","*.aaa.bbb");
//        订阅消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});
//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}}
}

4、启动 Spring Boot 项目(启动 Broker Server)


5、运行消费者代码


6、运行生产者代码


7、继续观察消费者的控制台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/703999.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于JSP的毕业设计选题系统的设计与实现

基于JSP的毕业设计选题系统的设计与实现 (源代码论文) A. 项目简介 毕业设计选题系统就是能够使学生通过互联网完成毕业设计课题的选定,它采用Web方式,同时适用于局域网和Internet,它要实现审核,权限管理,邮件通知…

Python中的atexit模块:优雅地处理程序退出

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站AI学习网站。 目录 前言 atexit模块概述 atexit模块的基本用法 示例代码:文件操作时的应用场景 典型应用场景 1 资源释放…

云里物里轻薄系列电子价签,如何革新零售?

云里物里的DS轻薄系列电子价签,凭借轻巧外观和强劲性能,为零售行业提供了更便捷的商品改价方案。这不仅是对纸质价标的替代,更以其安全性和可持续发展性,实现对零售行业的效率升级,让商家们轻松迎接数字化时代的挑战&a…

【Vue3】学习watch监视:深入了解Vue3响应式系统的核心功能(下)

💗💗💗欢迎来到我的博客,你将找到有关如何使用技术解决问题的文章,也会找到某个技术的学习路线。无论你是何种职业,我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章,也欢…

OD(9)之Mermaid序列图(Sequence diagrams)使用详解

OD(8)之Mermaid序列图(Sequence diagrams)使用详解 Author: Once Day Date: 2024年2月21日 漫漫长路才刚刚开始… 全系列文章可参考专栏: Mermiad使用指南_Once_day的博客-CSDN博客 参考文章: 关于 Mermaid | Mermaid 中文网 (nodejs.cn)Mermaid | Diagramming and charti…

4.4 MySQL存储

目录 1、使用前提 2、使用连接数据库最初步骤 2.1 最初步骤 2.2 connect()方法中参数简单传递 3、创建数据库(创建架构)和创建表 3.1 创建数据库(创建架构) 3.2 创建表 3.2.1 基本创建 3.2.2 创建自增主键 4、Pycharm 可视化连接 MySQL 图形界面 5、插入、更新、查询…

【蓝桥杯】青蛙跳杯子(BFS)

一.题目描述 二.输入描述 输入为 2 行,2 个串,表示初始局面和目标局面。我们约定,输入的串的长度不超过 15。 三.输出描述 输出要求为一个整数,表示至少需要多少步的青蛙跳。 四.问题分析 注意:空杯子只有一个 …

3种SQL语句优化方法,测试人必知必会!

关于SQL语句的优化,本质上就是尽量降低SQL语句的执行时间,对于如何降低SQL语句的执行时间,可以从以下几个方面入手。 一、降低SQL语句执行时的资源消耗 这是我们在数据库性能调优中常用的方法,该方法以分析SQL语句的执行计划为切…

Python实用技巧:处理JSON文件写入换行问题

Python实用技巧:处理JSON文件写入换行问题 🌈 个人主页:高斯小哥 🔥 高质量专栏:Matplotlib之旅:零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程 👈 希望得到您的订阅…

linux中将普通用户添加sudo权限

1.登录root权限账号,编辑/etc/sudoers文件 2.找到"root ALL(ALL) ALL",并在下面添加普通用户 格式:username ALL(ALL) ALL vim /etc/sudoers ## Next comes the main part: which users can run what software …

CMake和VsCode调试的使用

目录 CMake使用 CMake下载 创建系统文件目录 MakeList编写规范 VsCode启动调试 添加配置文件 添加断点,启动调试 CMake使用 CMake下载 输入指令 sudo apt install cmake 安装cmake,使用 cmake -version可查看cmake的版本信息 创建系统文件目…

土耳其商务团一行莅临优积科技考察交流

7月31日土耳其商务代表Emre Arif Parlak等一行三人莅临优积科技考察交流,公司CEO刘其东携团队成员热情接待并深入交流。 商务团首先参观了我司产品生产基地,详细了解了钢结构模块的生产加工工艺流程和质量控制体系。随后参观了我司模块化学校样板房、模块…

Git+py+ipynb Usage

0.default config ssh-keygen -t rsa #之后一路回车,当前目录.ssh/下产生公私钥 cat ~/.ssh/id_rsa.pub #复制公钥到账号 git config --global user.email account_email git config --global user.name account_namebug of ipynb TqdmWarning: IProgress not found. Please …

一文1400字使用Jmeter进行http接口测试【建议收藏】

前言: 本文主要针对http接口进行测试,使用Jmeter工具实现。Jmter工具设计之初是用于做性能测试的,它在实现对各种接口的调用方面已经做的比较成熟,因此,本次直接使用Jmeter工具来完成对Http接口的测试。 一、开发接口…

【JavaSE】集合框架

目录 程序场景分析 Java集合框架包含的内容List接口ArrayListLinkedListList接口的常用方法ArrayList案例背景分析代码示例扩展以下功能代码示例 LinkedList案例背景分析代码示例LinkedList的特殊方法 ArrayList与LinkedList对比 Set接口HashSet 集合的特点常用方法案例背景分析…

[c 语言] 大端,小端;网络序,主机序

在网络编程中,特别是底层网卡驱动开发时,常常遇到字节序问题。字节序指的是多字节数据类型在内存中存放的顺序,高位保存在低地址还是高地址,以此来划分大端还是小端。 1 大端和小端 大端和小端指的是 cpu 的属性,常见…

Vulhub 靶场训练 DC-9解析

一、搭建环境 kali的IP地址是:192.168.200.14 DC-9的IP地址暂时未知 二、信息收集 1、探索同网段下存活的主机 arp-scan -l #2、探索开放的端口 开启端口有:80和22端口 3、目录扫描 访问80 端口显示的主页面 分别点击其他几个页面 可以看到是用户…

SpringBoot源码解读与原理分析(三十四)SpringBoot整合JDBC(三)声明式事务的传播行为控制

文章目录 前言10.5 声明式事务的传播行为控制10.5.1 修改测试代码(1)新建一个Service类,并引用UserService(2)修改主启动类 10.5.2 PROPAGATION_REQUIRED10.5.2.1 tm.getTransaction(1)获取事务…

用于自监督视觉预训练的屏蔽特征预测

Masked Feature Prediction for Self-Supervised Visual Pre-Training 一、摘要 提出了用于视频模型自监督预训练的掩模特征预测(MaskFeat)。首先随机屏蔽输入序列的一部分,然后预测屏蔽区域的特征。研究了五种不同类型的特征,发…

遥感、航拍、影像等用于深度学习的数据集集合

遥感图像的纹理特征异常繁杂,地貌类型多变,人工提取往往存在特征提取困难和特征提取不准确的问题,同时,在这个过程中还会耗费海量的人力物力。随着计算力的突破、数据洪流的暴发和算法的不断创新,在具有鲜明“大数据”…