1.RPC基本原理

文章目录

  • RPC
    • 1.定义
    • 2.概念
    • 3.优缺点
    • 4.RPC结构
    • 5.RPC消息协议
      • 5.1 消息边界
      • 5.2 内容
      • 5.3 压缩
    • 6.RPC的实现
      • 6.1 divide_protocol.py
      • 6.2 server.py
      • 6.3 client.py

RPC

1.定义

远程过程调用(remote procedure call)

2.概念

广义:所有通过网络进行通讯,的调用统称为RPC调用

狭义:不采用http协议的方式,采用自定义格式的二进制方式

3.优缺点

  • 优点
    • 效率高
    • 发起rpc调用的一方,可以忽略RPC的具体实现,如同编写本地函数调用
  • 缺点
    • 通用性不高

4.RPC结构

  • client(caller):调用者
  • client stub(bundle args/unbundle ret vals):客户端存根
  • client network service
  • server network service
  • server stub(bundle ret vals/unbundle args)

5.RPC消息协议

5.1 消息边界

  • 分隔符(\r\n)
  • 长度声明法(例如HTTP中 Content-Length)

5.2 内容

  • 二进制
  • 文本内容

5.3 压缩

  • 压缩处理是一把双刃剑,减少数据量减轻带宽压力同时,额外增加了压缩和解压缩的时间

6.RPC的实现

6.1 divide_protocol.py

import struct
from io import BytesIOclass InvalidOperation(Exception):...class DivideProtocol(object):"""float divide(1:int num1, 2:int num2=1)"""def _read_all(self, size):"""读取指定长度的字节:param size: 长度:return: 读取出的二进制数据"""if isinstance(self.conn, BytesIO):# BytesIO类型,用于演示buff = b''have = 0while have < size:chunk = self.conn.read(size - have)have += len(chunk)buff += chunkreturn buffelse:# socket类型buff = b''have = 0while have < size:chunk = self.conn.recv(size - have)have += len(chunk)buff += chunk# 客户端关闭了连接if len(chunk) == 0:raise EOFError()return buffdef args_encode(self, num1, num2=1):"""对调用参数进行编码:param num1: int:param num2: int:return: 编码后的二进制数据"""# 处理参数num1, 4字节整型buff = struct.pack('!B', 1)buff += struct.pack('!i', num1)# 处理参数num2, 4字节整型,如为默认值1,则不再放到消息中if num2 != 1:buff += struct.pack('!B', 2)buff += struct.pack('!i', num2)# 处理消息总长度,4字节无符号整型length = len(buff)# 处理方法名,字符串类型name = 'divide'# 字符串长度,4字节无符号整型msg = struct.pack('!I', len(name))msg += name.encode()msg += struct.pack('!I', length) + buffreturn msgdef args_decode(self, connection):"""获取调用参数并进行解码:param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据:return: 解码后的参数字典"""# 保存到当前对象中,供_read_all方式使用self.conn = connectionparam_name_map = {1: 'num1',2: 'num2'}param_len_map = {1: 4,2: 4}# 用于保存解码后的参数字典args = dict()# 读取消息总长度,4字无节符号整数buff = self._read_all(4)length = struct.unpack('!I', buff)[0]# 记录已读取的长度have = 0# 读取第一个参数,4字节整型buff = self._read_all(1)have += 1param_seq = struct.unpack('!B', buff)[0]param_len = param_len_map[param_seq]buff = self._read_all(param_len)have += param_lenargs[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]if have >= length:return args# 读取第二个参数,4字节整型buff = self._read_all(1)have += 1param_seq = struct.unpack('!B', buff)[0]param_len = param_len_map[param_seq]buff = self._read_all(param_len)have += param_lenargs[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]return argsdef result_encode(self, result):"""对调用的结果进行编码:param result: float 或 InvalidOperation对象:return: 编码后的二进制数据"""if isinstance(result, float):# 没有异常,正常执行# 处理结果类型,1字节无符号整数buff = struct.pack('!B', 1)# 处理结果值, 4字节floatbuff += struct.pack('!f', result)else:# 发生了InvalidOperation异常# 处理结果类型,1字节无符号整数buff = struct.pack('!B', 2)# 处理异常结果值, 字符串# 处理字符串长度, 4字节无符号整数buff += struct.pack('!I', len(result.message))# 处理字符串内容buff += result.message.encode()return buffdef result_decode(self, connection):"""对调用结果进行解码:param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据:return: 结果数据"""self.conn = connection# 取出结果类型, 1字节无符号整数buff = self._read_all(1)result_type = struct.unpack('!B', buff)[0]if result_type == 1:# float的结果值, 4字节floatbuff = self._read_all(4)result = struct.unpack('!f', buff)[0]return resultelse:# InvalidOperation对象# 取出字符串长度, 4字节无符号整数buff = self._read_all(4)str_len = struct.unpack('!I', buff)[0]buff = self._read_all(str_len)message = buff.decode()return InvalidOperation(message)class MethodProtocol(object):def __init__(self, connection):self.conn = connectiondef _read_all(self, size):"""读取指定长度的字节:param size: 长度:return: 读取出的二进制数据"""if isinstance(self.conn, BytesIO):# BytesIO类型,用于演示buff = b''have = 0while have < size:chunk = self.conn.read(size - have)have += len(chunk)buff += chunkreturn buffelse:# socket类型buff = b''have = 0while have < size:print('have=%d size=%d' % (have, size))chunk = self.conn.recv(size - have)have += len(chunk)buff += chunkif len(chunk) == 0:raise EOFError()return buffdef get_method_name(self):# 获取方法名# 读取字符串长度,4字节无符号整型buff = self._read_all(4)str_len = struct.unpack('!I', buff)[0]# 读取字符串buff = self._read_all(str_len)name = buff.decode()return name

6.2 server.py

import socket
import threadingfrom customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperationclass Handlers:@staticmethoddef divide(num1, num2=1):"""除法:param num1::param num2::return:"""if num2 == 0:raise InvalidOperation()val = num1 / num2return valclass ServerStub(object):def __init__(self, connection, handlers):"""服务器存根:param connection: 与客户端的socket连接:param handlers: 存放被调用的方法"""self._process_map = {'divide': self._process_divide,}self.conn = connectionself.method_proto = MethodProtocol(self.conn)self.handlers = handlersdef process(self):"""被服务器调用的入口,服务器收到请求后调用该方法"""# 获取解析调用请求的方法名name = self.method_proto.get_method_name()# 调用对应的处理方法self._process_map[name]()def _process_divide(self):"""执行divide本地调用,并将结果返回给客户端"""# 接收调用参数proto = DivideProtocol()args = proto.args_decode(self.conn)# 进行本地divide调用try:result = self.handlers.divide(**args)except InvalidOperation as e:result = e# 构造返回值消息并返回result = proto.result_encode(result)self.conn.sendall(result)class Server(object):def __init__(self, host, port, handlers):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.host = hostself.port = portself.sock.bind((host, port))self.handlers = handlersdef serve(self):"""开始服务"""self.sock.listen(128)print("开始监听")while True:conn, addr = self.sock.accept()print("建立链接%s" % str(addr))stub = ServerStub(conn, self.handlers)try:while True:stub.process()except EOFError:print("客户端关闭连接")# 关闭服务端连接conn.close()class ThreadServer(object):def __init__(self, host, port, handlers):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.host = hostself.port = portself.sock.bind((host, port))self.handlers = handlersdef serve(self):"""开始服务"""self.sock.listen(128)print("开始监听")while True:conn, addr = self.sock.accept()print("建立链接%s" % str(addr))t = threading.Thread(target=self.handle, args=(conn,))t.start()def handle(self, client):stub = ServerStub(client, self.handlers)try:while True:stub.process()except EOFError:print("客户端关闭连接")client.close()if __name__ == '__main__':server = Server('127.0.0.1', 8000, Handlers)server.serve()    

6.3 client.py

import time
import socketfrom customize_rpc.divide_protocol import DivideProtocol, InvalidOperationclass Channel(object):"""连接通道"""def __init__(self, host, port):self.host = hostself.port = portdef get_connection(self):"""获取一个tcp连接"""sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.connect((self.host, self.port))return sockclass ClientStub(object):"""客户端存根"""def __init__(self, channel: Channel):self.channel = channelself.conn = self.channel.get_connection()def divide(self, num1, num2=1):# 构造proto = DivideProtocol()args = proto.args_encode(num1, num2)self.conn.sendall(args)result = proto.result_decode(self.conn)if isinstance(result, InvalidOperation):raise resultelse:return resultif __name__ == '__main__':channel = Channel('127.0.0.1', 8000)stub = ClientStub(channel)for i in range(5):try:val = stub.divide(i * 100, 10)except InvalidOperation as e:print(e.message)else:print(val)time.sleep(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/65462.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

强化特种作业管理,筑牢安全生产防线

在各类生产经营活动中&#xff0c;特种作业由于其操作的特殊性和高风险性&#xff0c;一直是安全生产管理的重点领域。有效的特种作业管理体系涵盖多个关键方面&#xff0c;从作业人员的资质把控到安全设施的配备维护&#xff0c;再到特种设备的精细管理以及作业流程的严格规范…

iOS 苹果开发者账号: 查看和添加设备UUID 及设备数量

参考链接&#xff1a;苹果开发者账号下添加新设备UUID - 简书 如果要添加新设备到 Profiles 证书里&#xff1a; 1.登录开发者中心 Sign In - Apple 2.找到证书设置&#xff1a; Certificate&#xff0c;Identifiers&Profiles > Profiles > 选择对应证书 edit &g…

如何让Tplink路由器自身的IP网段 与交换机和电脑的IP网段 保持一致?

问题分析&#xff1a; 正常情况下&#xff0c;我的需求是&#xff1a;电脑又能上网&#xff0c;又需要与路由器处于同一局域网下&#xff08;串流Pico4 VR眼镜&#xff09;&#xff0c;所以&#xff0c;我是这么连接 交换机、路由器、电脑 的&#xff1a; 此时&#xff0c;登录…

(南京观海微电子)——GH7009开机黑屏案例分析

一、 现象描述&#xff1a; 不良现象: LVDS模组&#xff0c;开机大概2秒后就黑屏。 二、问题分析 等主机进入Kernel 后做以下测试&#xff1a; 1、手动reset LCM 后 可以显示正常&#xff1b; 总结&#xff1a; 1&#xff09;uboot 部分HS 太窄&#xff0c;仅有4个clk宽度&am…

第10章 初等数论

2024年12月27日一稿&#xff08;P341&#xff09; 2024年12月28日二稿 2024年12月29日三稿 当命运这扇大门向你打开的时候&#xff0c;不要犹豫和害怕&#xff0c;一直往前跑就是了&#xff01; 10.1 素数 这里写错了&#xff0c;不能整除应该表示为 10.2 最大公约数与最小公…

XXE漏洞 黑盒测试 白盒测试 有无回显问题

前言 什么是XXE&#xff08;xml外部实体注入漏洞&#xff09;&#xff1f; 就是网站以xml传输数据 的时候我们截取他的传输流进行修改&#xff08;网站没有对我们的输入进行过滤&#xff09; 添加恶意代码 导致数据传输到后台 后台解析xml形式 导致恶意代码被执行 几种常见的…

yolov5 yolov6 yolov7 yolov8 yolov9目标检测、目标分类 目标切割 性能对比

文章目录 YOLOv1-YOLOv8之间的对比如下表所示&#xff1a;一、YOLO算法的核心思想1. YOLO系列算法的步骤2. Backbone、Neck和Head 二、YOLO系列的算法1.1 模型介绍1.2 网络结构1.3 实现细节1.4 性能表现 2. YOLOv2&#xff08;2016&#xff09;2.1 改进部分2.2 网络结构 3. YOL…

jdk版本介绍

1.JDK版本编号 • 主版本号&#xff1a;表示JDK的主要版本&#xff0c;如JDK 8、JDK 11中的8和11。主版本号的提升通常意味着引入了重大的新特性或变更。 • 次版本号&#xff1a;在主版本号之后&#xff0c;有时会跟随一个或多个次版本号&#xff08;如JDK 11.0.2中的0.2&…

低代码开源项目Joget的研究——基本概念和Joget7社区版应用

大纲 1. 基本概念1.1 Form1.1.1 Form1.1.1.1 概述1.1.1.2 主要特点和用途1.1.1.3 创建和使用 Form1.1.1.4 示例 1.1.2 Section1.1.2.1 概述1.1.2.2 主要特点和用途1.1.2.3 示例 1.1.3 Column1.1.4 Field1.1.5 示例 1.2 Datalist1.2.1 Datalist1.2.1.1 主要特点和用途1.2.1.2 创…

【LeetCode 04】 209. 长度最小的子数组

暴力解法&#xff1a; 测试通过✅提交超时❌ class Solution {public int minSubArrayLen(int target, int[] nums) {//暴力解法int sum0;int subLength0;int resultInteger.MAX_VALUE;int lennums.length;for(int i0;i<len;i){//起始位置sum0;for(int ji;j<len;j){//终…

【已解决】图片png转ico格式

起因&#xff1a; pyinstaller 打包时需要 ico 格式图片&#xff0c;但是通常手上只有png格式的图片&#xff0c;为了将png转为ico&#xff0c;直接改后缀会报错“struct.error: unpack requires a buffer of 16 bytes”&#xff0c;我就上网搜了一下&#xff0c;发现都是一些…

AMD | GPU | 深度学习 | 如何使用

问题&#xff1a;我在复现代码的时候&#xff0c;发现自己只拥有AMD的GPU&#xff0c;对于一个硬件小白来说&#xff0c;怎么办呢&#xff1f;我想看看怎么使用&#xff1b;解决&#xff1a; 首先要安装支持AMD的GPU的pytorch&#xff0c;pytorch&#xff1b; 使程序在安装了支…

Blender高效优化工作流程快捷小功能插件 Haggis Tools V1.1.5

Haggis Tools V1.1.5 是一款专为Blender设计的插件&#xff0c;旨在优化工作流程、减少单调和重复的任务&#xff0c;从而为艺术家节省时间。这款插件适用于多个版本的Blender&#xff0c;能够有效提升工作效率。 Blender插件特点&#xff1a; 工作流程优化&#xff1a;专门设…

数据采集背后的效率革命:如何优化你的爬虫性能

在爬虫技术日益发展的今天&#xff0c;性能优化成为提升数据采集效率的关键。面对日益复杂的网页结构和庞大的数据量&#xff0c;高效的爬虫能够显著降低运行时间和资源成本。本文将围绕爬虫性能优化的核心方法展开讨论&#xff0c;并通过实例对比多进程、多线程以及普通爬取的…

OpenHarmony-5.PM 子系统(2)

电池服务组件OpenHarmony-4.1-Release 1.电池服务组件 Battery Manager 提供了电池信息查询的接口&#xff0c;同时开发者也可以通过公共事件监听电池状态和充放电状态的变化。电池服务组件提供如下功能&#xff1a; 电池信息查询。充放电状态查询。关机充电。 电池服务组件架…

测试冰淇淋模型

测试领域的冰淇淋模型&#xff08;Ice Cream Cone Model&#xff09;是一个相对于传统的测试金字塔模型的反转&#xff0c;是一种与经典金字塔模型相对的测试策略。在这种模型中&#xff0c;测试的分布和重点与传统金字塔模型相反。以下是冰淇淋模型的主要特点和原因&#xff1…

短视频矩阵账号管理技术源码搭建详解,支持OEM

一、引言 在短视频矩阵系统中&#xff0c;账号管理是至关重要的一环&#xff0c;它涉及到多平台账号的接入、用户信息的安全存储与高效管理、权限的精准控制以及账号数据的同步与更新等关键功能。一个健壮、灵活且安全的账号管理技术架构&#xff0c;能够为整个短视频矩阵系统的…

【驱动开发】设备分类、设备号申请和注销,注册和移除字符设备,以及一个基本的内核驱动程序框架代码

一、Linux内核对设备的分类 Linux的文件种类 序号符号类型文件内容文件名原信息1-普通文件√√√2d目录文件√√√3p管道文件√√4s本地socket文件√√5l链接文件软链接有;硬链接相当于别名√√6c字符设备√√7b块设备√√设备类型 Linux内核按驱动程序实现模型框架的不同,…

黑马Java面试教程_P2_MySQL

系列博客目录 文章目录 系列博客目录前言1. 优化1.1 MySQL中&#xff0c;如何定位慢查询&#xff1f;面试文稿 1.2 面试官接着问&#xff1a;那这个SQL语句执行很慢,如何分析 ( 如何优化&#xff09;呢?面试文稿 1.3 了解过索引吗?(什么是索引)1.4 继续问 索引的底层数据结构…

Learning Multi-Scale Photo Exposure Correction

Abstract 用错误的曝光捕捉照片仍然是相机成像的主要错误来源。曝光问题可分为以下两类:(i)曝光过度&#xff0c;即相机曝光时间过长&#xff0c;导致图像区域明亮和褪色;(ii)曝光不足&#xff0c;即曝光时间过短&#xff0c;导致图像区域变暗。曝光不足和曝光过度都会大大降低…