RabbitMQ - 以 MQ 为例,手写一个 RPC 框架 demo

 

目录

前言 

一、再谈自定义应用层协议

二、再谈 BrokerServer

三、再谈 Connection、Channel

四、Demo

a)启动服务器

b)客户端连接


前言 


本篇文章来自于笔者之前写过的一个系列 —— “根据源码,模拟实现 RabbitMQ” 系列,不妨可以去看看~

一、再谈自定义应用层协议

a)这个自定义应用层协议实际上就是在描述将来 客户端 和 服务器 之间通讯的消息格式长啥样

b)首先是一个 Int 类型的 type,描述了这个消息到底是用来干什么的(要调用服务器这边的哪一个服务).

c)然后就是 payload 的数据载荷,承载着将来调用 VirtualHost 中的具体的服务所需要的参数(例如创建交换机所需要的参数就有:交换机名字、交换机类型、是否自动删除、是否持久化、扩展参数).

因为 TCP 是面向字节流的(IO 流中主要提供的就是二进制数据的读写),因此这里不太适合使用 JSON 格式数据进行网络传输(可读性不好,效率不高),因此这里 payload 是一个 字节数组,将具体的数据序列化成 byte 数组放进来.

d)这里要注意的一点是,TCP 是面向字节流的,因此会出现粘包问题,那么为了解决这个问题,由两种办法,第一种就是约定分割符(读到指定分隔符就截止),第二种就是描述好 payload 的长度.

这里我采用的就是第二种办法,只需要在协议里面在添加一个 length 字段,用来描述 payload 的长度.

import java.io.Serializable//Socket 自定义应用层协议(请求)
data class Request(val type: Int,val length: Int,val payload: ByteArray,
): Serializable//Socket 自定义应用层协议(响应)
data class Response(val type: Int,val length: Int,val payload: ByteArray,
): Serializable//基本参数(每个请求都会携带的参数,这里进行了一个封住)
open class ReqBaseArguments(open val rid: String = "",open val channelId: String = "",
): Serializable//基本响应参数(每个响应都会携带的参数),主要是为了应对 mq 回调响应处理
open class RespBaseArguments(open val rid: String,open val channelId: String,open val ok: Boolean,
): Serializable//主要的请求: 创建交换机、删除交换机、创建队列
data class ExchangeDeclareReq(val name: String,val type: ExchangeType,val durable: Boolean,val autoDelete: Boolean,val arguments: MutableMap<String, Any>,override val rid: String,override val channelId: String,
): ReqBaseArguments(), Serializabledata class ExchangeDeleteReq(val name: String,override val rid: String,override val channelId: String,
): ReqBaseArguments(), Serializabledata class QueueDeclareReq(val name: String,val durable: Boolean,val exclusive: Boolean,val autoDelete: Boolean,val arguments: MutableMap<String, Any>,override val rid: String,override val channelId: String,
): ReqBaseArguments(), Serializable

 

二、再谈 BrokerServer

a)BrokerServer 就是一个中间服务,也可以简单理解为 VirtualHost 的代理(BrokerServer 接收客户端请求,调用 VirtualHost 中具体的服务).

b)BrokerServer 启动的时候,就会通过 accept 阻塞等待客户端这边的 TCP 连接,连接成功之后只需要为该客户端其分配一个线程,处理之后的任务.

c)此时这个线程就会处于一个死循环循环,通过 IO 流读取到 客户端 请求中的 type、length、payload ,并按照约定的格式进行解析 payload,得到具体数据(这里不仅包含了 VirtualHost 服务中所需要的具体的参数,还携带了 channelId 和 rid)

d)此时,只需要根据 IO 流中读取出的 type,调用对应 VirtualHost 中的服务即可.

e)最后再将 VirtualHost 处理后得到的响应封装成 我们约定的应用层协议格式,通过 IO 写入到流中,让客户端去读取.

class BrokerServer(port: Int
) {private val socket = ServerSocket(port)private val clientPool = Executors.newFixedThreadPool(5)//key: channelId ,value: Socket//注意:这里的 Channel 只表示一个 "逻辑" 上的连接(创建,销毁 channel),这个 Map 是为了后台信息统计private val channelSession = ConcurrentHashMap<String, Socket>()private val virtualHost = VirtualHost()fun start() {println("[BrokerServer] 启动!")while (true) {val client = socket.accept()clientPool.submit {clientProcess(client)}}}private fun clientProcess(client: Socket) {println("[BrokerServer] 客户端上线!ip: ${client.inetAddress}, port: ${client.port}")try {client.getInputStream().use { inputStream ->client.getOutputStream().use { outputStream ->DataInputStream(inputStream).use { dataInputStream ->DataOutputStream(outputStream).use { dataOutputStream ->while (true) {val request = readRequest(dataInputStream)val response = process(request, client)writeResponse(response, dataOutputStream)}}}}}} catch (e: EOFException) {println("[BrokerServer] 客户端正常下线!ip: ${client.inetAddress}, port: ${client.port}")} catch (e: Exception) {println("[BrokerServer] 客户端连接异常!ip: ${client.inetAddress}, port: ${client.port}")} finally {client.close()removeChannelSession(client)}}private fun process(request: Request, client: Socket) = with(request) {//1.解析请求val req = BinaryTool.bytesToAny(payload)//2.获取请求中的 channelId,记录和 Socket 的关系(让每个 channel 都对应自己的 Socket,类似于 Session)val reqBase = req as ReqBaseArguments//3.根据 type 类型执行不同的服务(创建 Channel、销毁 Channel、创建交换机、删除交换机...)val ok = when(type) {1 -> {channelSession[reqBase.channelId] = clientprintln("[BrokerServer] channel 创建成功!channelId: ${reqBase.channelId}")true}2 -> {channelSession.remove(reqBase.channelId)println("[BrokerServer] channel 销毁成功!channelId: ${reqBase.channelId}")true}3 -> virtualHost.exchangeDeclare(req as ExchangeDeclareReq)4 -> virtualHost.exchangeDelete(req as ExchangeDeleteReq)5 -> virtualHost.queueDeclare(req as QueueDeclareReq)//...else -> throw RuntimeException("[BrokerServer] 客户端请求 type 非法!type: $type")}//4.返回响应val respBase = RespBaseArguments(reqBase.rid, reqBase.channelId, ok)val payload = BinaryTool.anyToBytes(respBase)Response(type, payload.size, payload)}/*** 读取客户端请求* 使用 DataInputStream 的主要原因就是有多种读取方式,例如 readInt()、readLong(),这些都是原生 InputStream 没有的*/private fun readRequest(dataInputStream: DataInputStream) = with(dataInputStream) {val type = readInt()val length = readInt()val payload = ByteArray(length)val n = read(payload)if (n != length) throw RuntimeException("[BrokerServer] 读取客户端请求异常!")Request(type, length, payload)}/*** 将响应写回给客户端*/private fun writeResponse(response: Response, outputStream: DataOutputStream) = with(outputStream) {writeInt(response.type)writeInt(response.length)write(response.payload)flush()}//删除所有和这个 clientSocket 有关的 Channelprivate fun removeChannelSession(client: Socket) {val channelIdList = mutableListOf<String>()//这里不能直接删除,会破坏迭代器结构for (entry in channelSession) {if (entry.value == client) channelIdList.add(entry.key)}for (channelId in channelIdList) {channelSession.remove(channelId)}}}

class VirtualHost {fun exchangeDeclare(req: ExchangeDeclareReq): Boolean {//执行业务逻辑//...println("[VirtualHost] 创建交换机成功!")return true}fun exchangeDelete(req: ExchangeDeleteReq): Boolean {//执行业务逻辑//...println("[VirtualHost] 删除交换机成功!")return true}fun queueDeclare(req: QueueDeclareReq): Boolean {//执行业务逻辑//...println("[VirtualHost] 创建队列成功!")return true}}

 

三、再谈 Connection、Channel

a)一个 Connection 就是一个 TCP 连接,因此频繁 建立/断开连接(三次握手、四次挥手...)的开销也是相当大的,因此就引入了 Channel. 

b)一个 Connection 下可以有多个 Channel(此处使用 map 来维护).  Channel 只是简单的表示一个逻辑上的连接,可以理解为一个大的项目下被拆分成的多个小的微服务. 实现了 TCP 连接的复用.

c)起初,我们需要先创建出 Connection 与服务端建立连接,初始化构造中只需要写一个死循环,不断的从服务端这边读取响应.

d)接着,通过 Connection 创建出 Channel 来完成具体的业务(Channel 中就提供了一系列方法,就像调用本地的方法一样,调用到远程服务器的接口).

e)例如 Channel 中提供的创建叫交换机方法(channel.exchangeDeclare(...)),这个方法中具体要做的就是将传入的参数,封装到一个对象中,序列化成 二进制 数据,这就是将来协议中要传输的 payload.   进一步的,协议 Request 就构造出来了,通过 IO 写到流中,供服务端读取.

d)为了能够让每次请求和响应都能对的上,Channel 这里我维护了一个 map(key 是 rid、value 是具体的响应),客户端和服务端之间的每个请求和响应都会携带上这个 rid 这个参数,这样将来 Connection 客户端接受到响应的时候,就可以直接把 响应中的 rid 提取出来,交给 Channel 的 map 中(响应来之前,Channel 一直阻塞等待,直到响应来了 -> 能通过 rid  从 map 中得到).

class ConnectionFactory(private val host: String,private val port: Int,
) {fun newConnection() = Connection(host, port)}
class Connection(ip: String,port: Int,
) {private val socket = Socket(ip, port)private val channelMap = ConcurrentHashMap<String, Channel>()//下述这样提前创建好,是为了将来 Channel 在读写请求的时候的方便(Channel 就不用获取输入输出流了)private val inputStream = socket.getInputStream()private val outputStream = socket.getOutputStream()private val dataInputStream = DataInputStream(inputStream)private val dataOutputStream = DataOutputStream(outputStream)init {//此线程负责不停的从服务器这边获取响应Thread {try {while (!socket.isClosed) {//读取服务器响应val resp = readResp()//将响应交给对应的 ChannelputRespToChannel(resp)}} catch (e: SocketException) {println("[Connection] 客户端正常断开连接")} catch (e: Exception) {println("[Connection] 客户端异常断开连接")e.printStackTrace()}}.start()}/*** 将客户端 Connection 接收到的请求,交给对应的 Channel 处理(此时 Channel 还在阻塞等待服务端响应)*/private fun putRespToChannel(resp: Response) {//这里由于不涉及回调,所以每个 type 类型的响应都长一样,就按照一样的方式解析了val baseResp = BinaryTool.bytesToAny(resp.payload) as RespBaseArgumentsval channel = channelMap[baseResp.channelId]?: throw RuntimeException("[Connection] 该响应对应的 Channel 不存在!channelId: ${baseResp.channelId}")//将响应交给 Channelchannel.notifyResp(baseResp)}/*** 创建 Channel*/fun createChannel(): Channel { //1.创建 Channel,保存到 map 种val channelId = "C-${UUID.randomUUID()}"val channel = Channel(channelId, this)channelMap[channelId] = channel//2.告知服务端 Channel 创建val ok = channel.createChannel()//3.如果 Channel 创建不成功,客户端这边也应该要删除对应的 Channel 信息if (!ok) channelMap.remove(channelId)return channel}private fun readResp() = with(dataInputStream) {val type = readInt()val length = readInt()val payload = ByteArray(length)val n = read(payload)if (n != length) throw RuntimeException("[Connection] 客户端读取响应异常!")Response(type, length, payload)}fun writeReq(request: Request) = with(dataOutputStream) {writeInt(request.type)writeInt(request.length)write(request.payload)flush()}}
class Channel(private val channelId: String,private val connection: Connection,  //自己当前属于哪个 Channel
) {//key: rid(为了能让每个 Channel 对应上自己的响应)//value: RespBaseArguments(具体的响应)//当 Connection 的扫描线程接收到响应之后,就会将响应传给这个 mapprivate val ridRespMap = ConcurrentHashMap<String, RespBaseArguments>()//这个锁是用来阻塞等待服务端响应的(避免轮询),当服务端传来响应时,Connection 就会唤醒锁private val locker = Object()private fun generateRid() = "R-${UUID.randomUUID()}"private fun waitResp(rid: String): RespBaseArguments {val respBase: RespBaseArgumentswhile (ridRespMap[rid] == null) { // 如果为空,说明此时服务端还没有传来响应synchronized(locker) { //为了避免轮询,就让其阻塞等待locker.wait()}}//出了这个循环,那么 ridRespMap[rid] 一定不为空return ridRespMap[rid]!!}fun notifyResp(respBase: RespBaseArguments) {ridRespMap[respBase.rid] = respBasesynchronized(locker) {//当前也不直到有多少线程在等待响应,就全部唤醒locker.notifyAll()}}/*** 创建 Channel*/fun createChannel(): Boolean {//1.创建基本请求val reqBase = ReqBaseArguments(rid = generateRid(),channelId = channelId)//2.构造 TCP 通信请求val payload = BinaryTool.anyToBytes(reqBase)val req = Request(type = 1,length = payload.size,payload = payload)//3.发送请求connection.writeReq(req)//4.等待客户端响应val respBase = waitResp(reqBase.rid)return respBase.ok}fun removeChannel(): Boolean {//1.创建基本请求val reqBase = ReqBaseArguments(rid = generateRid(),channelId = channelId)//2.构造 TCP 通信请求val payload = BinaryTool.anyToBytes(reqBase)val req = Request(type = 2,length = payload.size,payload = payload)//3.发送请求connection.writeReq(req)//4.等待客户端响应val respBase = waitResp(reqBase.rid)return respBase.ok}fun exchangeDeclare(name: String,type: ExchangeType,durable: Boolean,autoDelete: Boolean,arguments: MutableMap<String, Any>,): Boolean {val exchangeDeclareReq = ExchangeDeclareReq(name = name,type = type,durable = durable,autoDelete = autoDelete,arguments = arguments,rid = generateRid(),channelId = channelId,)val payload = BinaryTool.anyToBytes(exchangeDeclareReq)val req = Request(type = 3,length = payload.size,payload = payload,)connection.writeReq(req)val respBase = waitResp(exchangeDeclareReq.rid)return respBase.ok}fun exchangeDelete(name: String): Boolean {val exchangeDeleteReq = ExchangeDeleteReq(name = name,rid = generateRid(),channelId = channelId,)val payload = BinaryTool.anyToBytes(exchangeDeleteReq)val req = Request(type = 4,length = payload.size,payload = payload,)connection.writeReq(req)val respBase = waitResp(exchangeDeleteReq.rid)return respBase.ok}fun queueDeclare(name: String,durable: Boolean,exclusive: Boolean,autoDelete: Boolean,arguments: MutableMap<String, Any>,): Boolean {val queueDeclareReq = QueueDeclareReq(name = name,durable = durable,exclusive = exclusive,autoDelete = autoDelete,arguments = arguments,rid = generateRid(),channelId = channelId,)val payload = BinaryTool.anyToBytes(queueDeclareReq)val req = Request(type = 5,length = payload.size,payload = payload,)connection.writeReq(req)val resp = waitResp(queueDeclareReq.rid)return resp.ok}}

 

四、Demo

a)启动服务器

fun main() {val server = BrokerServer(9000)server.start()
}

b)客户端连接

class Test2 {
}fun main() {val factory = ConnectionFactory("127.0.0.1", 9000)val connection = factory.newConnection()val channel = connection.createChannel()val ok1 = channel.createChannel()val ok2 = channel.exchangeDeclare("e1", ExchangeType.DIRECT, false, false, mutableMapOf())val ok3 = channel.removeChannel()println("ok1: $ok1, ok2: $ok2, ok3: $ok3")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/833996.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOs9编译C指令报错的一种解决方案

今天使用centos9编译c代码时&#xff0c;显示bash: gcc: command not found... 下图是我的报错页面&#xff0c;依据提示信息安装gcc之后依旧显示失败 找到其中一种解决方式&#xff0c;完美解决&#xff0c;供参考 输入以下指令更新软件包列表&#xff0c;这里需要等待几分…

MT3031 AK IOI

思路&#xff1a;把每个节点存到堆&#xff08;大根堆&#xff09;里。 如果节点放入后总时间没有超过m则放入堆中&#xff1b;如果总时间超过了&#xff0c;就看堆头元素是否比新元素大。如果大&#xff0c;则删除堆头&#xff08;反悔贪心&#xff09;。 注意别忘记开long l…

keystone学习小结

1 keystone middleware 1.1 工作流程 middleware在客户端和服务端之间&#xff0c;会拦截客户端请求并判断请求身份是否是正确合法的&#xff0c;若是&#xff0c;则继续将请求发给其他middleware或app 具体看&#xff0c;干了这些事 1将请求里的auth header去除&#xff0c…

爬虫:爬取豆瓣电影

文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 上篇我们将到如何利用xpath的规则&#xff0c;那么这一次&#xff0c;我们将通过案例来告诉读者如何使用Xpath来定位到我们需要的数据&#xff0c;就算你不懂H5代码是怎么个嵌套或者十分复…

C++新特性-线程

主要内容 thread、condition、mutexatomicfunction、bind使用新特性实现线程池&#xff08;支持可变参数列表&#xff09;异常协程其他 1 C11多线程thread 重点&#xff1a; join和detach的使用场景thread构造函数参数绑定c函数绑定类函数线程封装基础类互斥锁mutexconditi…

计算机组成原理网课笔记

无符号整数的表示与运算 带符号整数的表示与运算 原反补码的特性对比 移码

搜狗输入法 PC端 v14.4.0.9307 去广告绿化版.

软件介绍 搜狗拼音输入法作为众多用户计算机配置的必备工具&#xff0c;其功能的全面性已为众所周知&#xff0c;并且以其高效便捷的输入体验受到广大使用者的青睐。然而&#xff0c;该软件在提供便利的同时&#xff0c;其内置的广告元素常常为用户带来一定的干扰。为此&#…

Linux中动态库的用法及优缺点?怎样制作动态库和静态库?

一、什么是gcc gcc的全称是GNU Compiler Collection&#xff0c;它是一个能够编译多种语言的编译器。最开始gcc是作为C语言的编译器&#xff08;GNU C Compiler&#xff09;&#xff0c;现在除了c语言&#xff0c;还支持C、java、Pascal等语言。gcc支持多种硬件平台. 在 Linux…

航空电子ARINC818采集卡

ARINC818采集卡是针对航空电子数字视频总线协议&#xff08;Avionics Digital Video BUS&#xff0c;ADVB&#xff09;的高性能PCIe视频光纤采集测试设备。ARINC818协议主要应用于机载设备间的实时高清图像传输&#xff0c;目前已经成功应用于多款民用、军用机型当中&#xff0…

渐进淡出背景个人导航页源码(火影版)

渐进淡出背景个人导航页源码&#xff08;火影版&#xff09; 效果图部分源码领取源码下期更新预报 效果图 部分源码 <!DOCTYPE html> <html> <head> <!--小K网 www.xkwo.com --><meta charset"UTF-8"><title>火影版个人主页<…

每日一博 - 闲聊架构设计中的多级缓存设计

文章目录 方法论概述客户端缓存应用层缓存服务层缓存缓存设计的注意事项总结 思维导图戳这里 方法论概述 从客户端到服务层&#xff0c;缓存的应用广泛而重要。通过合理的缓存设计&#xff0c;能够有效地提高系统的性能并降低延迟。 客户端缓存 在客户端层面&#xff0c;浏览…

多模态融合技术现实世界中的挑战与研究进展

在人工智能的诸多领域中&#xff0c;多模态融合技术正逐渐成为连接不同信息源的桥梁。这种技术通过整合来自视觉、听觉、文本等多种模态的数据&#xff0c;旨在提供更为丰富和精确的预测结果。然而&#xff0c;现实世界的数据往往是不完美和不完整的&#xff0c;这给多模态融合…

[微信小程序] 入门笔记1-滚动视图组件

[微信小程序] 入门笔记1-滚动视图组件 1.页面&组件&渲染 在小程序是由一个个页面page组成, 而页面又是由一个个组件component组成.和网页类似,这里的组件指的就是输入框<input>,按钮<button>,文本<text>,图片<image>等元素.如果你学过网页一…

Linux基础之git与调试工具gdb

目录 一、git的简单介绍和使用方法 1.1 git的介绍 1.2 git的使用方法 1.2.1 三板斧之git add 1.2.2 三板斧之git commit 1.2.3 三板斧之git push 二、gdb的介绍和一些基本使用方法 2.1 背景介绍 2.2 基本的使用方法 一、git的简单介绍和使用方法 1.1 git的介绍 Git是一…

Shell 编程规范与变量

目录 一.Shell 1.shell 的概念 2.Linux 中有哪些 Shell &#xff1f; 二.Shell 脚本概述 1.Shell 脚本的概念 2.shell 脚本应用场景 3.shell 脚本的作用 三.Shell脚本的构成与执行 1.Shell脚本的构成 2.Shell脚本的执行 四.重定向与管道操作 1.交互式硬件设备 2.重…

论文分享[cvpr2018]Non-local Neural Networks非局部神经网络

论文 https://arxiv.org/abs/1711.07971 代码https://github.com/facebookresearch/video-nonlocal-net 非局部神经网络 motivation:受计算机视觉中经典的非局部均值方法[4]的启发&#xff0c;非局部操作将位置的响应计算为所有位置的特征的加权和。 非局部均值方法 NLM&#…

男士内裤什么牌子的好?男士内裤五大排名品牌推荐

夏天快到了&#xff0c;你是不是在为内裤不够舒适透气、质量不好而困扰呢&#xff1f;现在市面上的男士内裤品牌众多&#xff0c;而且还有各种材质的分类&#xff0c;让大家一时也不知道该选什么好。 那么最近我也是特别购置了近期热门的几个男士内裤品牌进行测评&#xff0c;今…

(十六)Servlet教程——Servlet文件下载

Servlet文件下载 文件下载是将服务器上的资源下载到本地&#xff0c;可以通过两种方式来下载服务器上的资源。第一种是使用超链接来下载&#xff0c;第二种是通过代码来下载。 超链接下载 在HTML或者JSP页面中使用超链接时&#xff0c;可以实现页面之间的跳转&#xff0c;但是…

RK3568 学习笔记 : Linux emmc 内核启动 rootfs 根文件系统无法正常挂载问题的分析

问题描述 平台 &#xff1a; NanoPi-R5C 开发板 RK3568 平台。 手动编译的 Linux 内核&#xff0c;结果发现大概率 emmc 无法正常初始化&#xff0c;导致 rootfs 根文件系统无法正常挂载 Linux 内核版本&#xff1a; 6.1 Linux 内核代码位置&#xff1a; https://github.com…

第2章 WebServer进阶

2.1 使用多线程处理多用户请求 2.1.1 多线程Socket通信 在上一章的案例中&#xff0c;服务端显然只能处理一次浏览器请求&#xff0c;请求一次浏览器端就结束程序。如何解决这个问题呢&#xff1f;可以采用多线程Socket通信技术&#xff0c;解决多用户并发请求。 在多线程Sock…