grpc python实现异步调用(不用grpc异步接口)

grpc python实现异步调用[不用grpc异步接口]

  • 1.infer_session.proto
  • 2.生成Python库函数
  • 3.infer_session_server.py
  • 4.infer_session_client.py
  • 5.common.py
  • 6.运行
  • 7.输出

grpc同步调用更简单,但是在处理复杂任务时,会导致请求阻塞,影响吞吐。当然,可以采用grpc异步接口解决,本方采用另一种方式:服务端收到请求后放入请求队列,由独立的线程处理各个请求,之后调用客户端的服务,回复处理结果。即客户端也是服务端。

以下DEMO实现的功能:

  • 客户端与服务端之间通过mmap tmpfs文件,实现图像的传输
  • 推理服务有Request和Response二个接口
  • 服务端实现Request接口,客户端实现Response接口,这二个接口只用于发送消息
  • 服务端的消息处理线程处理完客户端的请求之后,调用客户端的Response接口

1.infer_session.proto

syntax = "proto3";
service Inference {rpc Request  (InferMessage) returns (Status) {} //服务端实现rpc Response (InferMessage) returns (Status) {} //客户端实现
}
message InferMessage {int32 frame_id = 1;    //帧号int32 client_port=2;   //客户端端口int32 shm_id=3;        //共享内存块idint32 width=4;         //图像宽度int32 height=5;        //图像高度int32 channels=6;      //图像通道数string session_id=7;   //会话uuid
}
message Status {int32 status = 1;         //状态码string error_message=2;   //错误信息
}

2.生成Python库函数

python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./infer_session.proto

3.infer_session_server.py

from concurrent import futures
import logging
import threading
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import queue
import traceback
import time
from common import SharedMemory,ThreadSafeDict
import numpy as npclass InferenceServer(infer_session_pb2_grpc.InferenceServicer):def __init__(self) -> None:super().__init__()self.server=Noneself.black_list=set()def Request(self, request, context):self.request_queue.put(request)return infer_session_pb2.Status(status=0,error_message="")def Open(self,port=50051):self.process_running=Trueself.bind_addr="localhost:{}".format(port)self.client_session = ThreadSafeDict()self.request_queue= queue.Queue()self.process_thread = threading.Thread(target=self.Process)self.process_thread.start()self.service_ready_semaphore = threading.Semaphore(0)self.server_thread = threading.Thread(target=self.Run)self.server_thread.start()self.service_ready_semaphore.acquire()    return Truedef Run(self):self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server)self.server.add_insecure_port(self.bind_addr)self.server.start()print("Server started, listening on " + self.bind_addr)self.service_ready_semaphore.release()self.server.wait_for_termination()def Process(self):while self.process_running:if not self.request_queue.empty():request=self.request_queue.get(False,2)   if request.session_id in self.black_list:if request.session_id in self.client_session:del self.client_session[request.session_id]                    continuetry:if request.session_id not in self.client_session:record={}print("connect:",request.client_port)record['channel']=grpc.insecure_channel("localhost:{}".format(request.client_port))record['stub']=infer_session_pb2_grpc.InferenceStub(record['channel'])grpc.channel_ready_future(record['channel']).result(timeout=5)self.client_session[request.session_id]=recordshm=SharedMemory(request.width,request.height,request.channels,request.client_port,request.shm_id)data = np.ndarray((request.height,request.width,request.channels), dtype=np.uint8, buffer=shm.get())               data+=1 #修改数据shm.close()ret=self.client_session[request.session_id]['stub'].Response(request,timeout=5) if ret.status!=0:print("Response Error:{} {}".format(ret.status,ret.error_message))except:traceback.print_exc()self.black_list.add(request.session_id)if request.session_id in self.client_session:del self.client_session[request.session_id]else:time.sleep(0.001)def Stop(self):print("Stop")self.server.stop(3)self.process_running=Falseself.process_thread.join()self.server_thread.join()if __name__ == "__main__":logging.basicConfig()server=InferenceServer()server.Open()input()server.Stop()

4.infer_session_client.py

from __future__ import print_function
from concurrent import futures
import logging
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import threading
import numpy as np
import os
import queue
from common import SharedMemory
import time
import argparse
import uuidclass InferenceClient(infer_session_pb2_grpc.InferenceServicer):def __init__(self) -> None:super().__init__()self.send_index=0self.recv_index=Noneself.uuid=uuid.uuid4()print(self.uuid)def Response(self, response, context):request=self.busy_q.get()pred_data = np.ndarray((request.height,request.width,request.channels), dtype=np.uint8, buffer=request.get())golden=np.ones(pred_data.shape,dtype=np.uint8)golden.fill(response.frame_id+1)result=(golden==pred_data).all()if not result:print("ID:{} ShmId:{} Pass:{}".format(response.frame_id,response.shm_id,result))self.free_q.put(request)self.recv_index=response.frame_idreturn infer_session_pb2.Status(status=0,error_message="")def WaitFinish(self):while True:if self.send_index==self.recv_index:returntime.sleep(0.001)def Open(self,client_port,width,height,channel,qsize,remote_addr="localhost:50051"):try:self.client_port=client_portself.bind_addr="localhost:{}".format(client_port)self.free_q= queue.Queue(qsize*2)self.busy_q= queue.Queue(qsize*2)for shm_id in range(qsize):self.free_q.put(SharedMemory(width,height,channel,self.client_port,shm_id))            self.channel=grpc.insecure_channel(remote_addr)grpc.channel_ready_future(self.channel).result(timeout=5)self.stub = infer_session_pb2_grpc.InferenceStub(self.channel)self.server_ready=Falseself.service_ready_semaphore = threading.Semaphore(0)self.server_thread = threading.Thread(target=self.Run)self.server_thread.start()self.service_ready_semaphore.acquire()return self.server_readyexcept:return Falsedef Stop(self):print("Stop")self.server.stop(3)self.server_thread.join()def Request(self,frame_index):request=self.free_q.get()   data = np.ndarray((request.height,request.width,request.channels), dtype=np.uint8, buffer=request.get())data.fill(frame_index)response = self.stub.Request(infer_session_pb2.InferMessage(frame_id=frame_index,client_port=self.client_port,shm_id=request.shm_id,width=request.width,height=request.height,channels=request.channels,session_id="{}".format(self.uuid)))self.busy_q.put(request)self.send_index=frame_indexreturn response.status==0def Run(self):try:self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server)self.server.add_insecure_port(self.bind_addr)self.server.start()self.server_ready=Trueprint("Server started, listening on " + self.bind_addr)self.service_ready_semaphore.release()self.server.wait_for_termination()except:self.service_ready_semaphore.release()if __name__ == "__main__":parser = argparse.ArgumentParser(description="Demo of argparse")parser.add_argument('--port', type=int,  default=50000)parser.add_argument('--remote_addr', type=str, default="localhost:50051")args = parser.parse_args()logging.basicConfig()client=InferenceClient()client.Open(client_port=args.port,width=320,height=240,channel=1,qsize=10,remote_addr=args.remote_addr)while True:t0=time.time()count=128for i in range(count):client.Request(i)client.WaitFinish()t1=time.time()print("{} FPS:{:.3f}".format(args.port,count/(t1-t0)))   client.Stop()

5.common.py

import mmap
import numpy as np
import os
import threading# 定义一个SharedMemory类,用于在共享内存中读取和写入数据
class SharedMemory(object):def __init__(self,width,height,channels,port,shm_id) -> None:self.width=widthself.height=heightself.channels=channelsself.shm_id=shm_idself.filepath="/sys/fs/cgroup/{}_{}".format(port,shm_id)self.size=width*height*channelsif not os.path.exists(self.filepath):os.mknod(self.filepath)        self.fd=os.open(self.filepath,os.O_RDWR|os.O_CREAT)os.ftruncate(self.fd,self.size)else:self.fd=os.open(self.filepath,os.O_RDWR)                self.mm=mmap.mmap(self.fd,self.size,access=mmap.ACCESS_WRITE)self.mm.seek(0)# 获取共享内存中的数据def get(self):return self.mm# 关闭共享内存def close(self):self.mm.close()os.close(self.fd)# 定义一个ThreadSafeDict类,用于在多线程中安全地操作字典
class ThreadSafeDict:def __init__(self, initial_dict=None):self._dict = {} if initial_dict is None else initial_dict.copy()self.lock = threading.Lock()# 获取字典中的值def __getitem__(self, key):with self.lock:return self._dict[key]# 设置字典中的值def __setitem__(self, key, value):with self.lock:self._dict[key] = value# 删除字典中的值def __delitem__(self, key):with self.lock:del self._dict[key]# 检查字典中是否存在某个键def __contains__(self, item):with self.lock:return item in self._dict# 获取字典中的值,如果不存在则返回默认值def get(self, key, default=None):with self.lock:return self._dict.get(key, default)# 设置字典中的值,如果键已经存在则不改变值def setdefault(self, key, default):with self.lock:return self._dict.setdefault(key, default)# 更新字典def update(self, other_dict):with self.lock:self._dict.update(other_dict)

6.运行

python3 infer_session_server.py &
python3 infer_session_client.py --port 50001

7.输出

50001 FPS:2296.293
50001 FPS:2222.019
50001 FPS:2347.274
50001 FPS:2124.001

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/753987.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器人在果园内行巡检仿真

文章目录 创建工作空间仿真果园场景搭建小车模型搭建将机器人放在仿真世界中创建工作空间 mkdir -p ~/catkin_ws/src cd ~/catkin_ws仿真果园场景搭建 cd ~/catkin_ws/src git clone https://gitcode.com/clearpathrobotics/cpr_gazebo.git小车模型搭建 DiffBot是一种具有两个…

【English Learning】Day16

2024/03/18 和小录打卡的第16天 目录 Words & phrases Words & phrases weakness a sign of weakness 懦弱的表现strengthens and weaknesses 优点和缺点a weakness for 对---迷恋qualify qualify for the Olympics 取得参加奥运会资格gloomy a gloomy room 昏暗的房间…

Java 快速幂

在Java中,实现快速幂算法可以极大地提高计算大整数幂次的效率。快速幂算法的基本思想是,将幂次转化为二进制形式,然后利用二进制位的特性,通过不断平方和乘法操作来得到结果。 以下是一个Java实现的快速幂算法: java…

Vmware虚拟机配置虚拟网卡

背景 今天同事咨询了我一个关于虚拟机的问题,关于内网用Vmware安装的虚拟机,无法通过本机访问虚拟上的Jenkins的服务。   验证多次后发现有如下几方面问题。 Jenkins程序包和JDK版本不兼容(JDK1.8对应Jenkins不要超过2.3.57)虚…

信号量——生产消费者模型

前文 在这一篇博客(信号量博客)中我曾经提及过信号量的知识,而当对信号量进行提炼总结时,大致是以下三点: 1. 信号量本质是一个计数器(代表资源的数量) 2. 申请信号量本质就是对资源的一种预定机…

final关键字

final关键字 基本介绍final使用细节 基本介绍 final 中文意思:最后的,最终的。 final 可以修饰类、属性、方法和局部变量。 在某些情况下会使用到final: 1) 当不希望类被继承时,可以用 final 修饰; // 如…

Python--成员方法、@staticmethod将成员方法静态化、self参数释义

在 Python 中,成员方法是指定义在类中的函数,用于操作类的实例对象。成员方法通过第一个参数通常命名为 self,用来表示调用该方法的实例对象本身。通过成员方法,可以实现类的行为和功能。 成员方法的定义 在类中定义成员…

深入理解数据结构树

文章目录 一、树是什么二、树的相关概念三、树常见类型四、树的应用应用场景五、树的代码实现 一、树是什么 数据结构树是一种非线性的数据结构,它由节点和边组成。树的节点之间通过边连接,形成层次结构。树的顶部节点称为根节点,每个节点可以…

【Linux】Linux上代码的编译与调试

目录 Linux上常用的编译器gcc\g 如何使用gcc/g 编译过程: 如何使用gcc编译? 进行预处理 进行编译 进行汇编 进行链接 函数库 函数库的分类 gcc选项 Linux调试器-gdb的使用 gdb的常用参数 Linux项目自动化构建工具make/Makefile 原理 利用…

MYSQL日志 redo_log更新流程 bin_log以及bin_log数据恢复

Redo_log写入策略 Redo log的Innodb_flush_log_at_trx_commit:: 这个参数有三个取值 取值为0:每次事务提交时,只是把redo_log留在 redo log buffer中,宕机会丢失数据; 取值为1(默认值):每次事…

1.中医学习-总论

目录 1.为什么要学中医 2.什么是中医 介绍 中医例子1: 中医例子2: 中医最高境界“大道至简” 中医讲究的是本质 中医核心:阴阳、表里、寒热、虚实 ​编辑医不叩门 3.阴阳 1.一天中的阴阳 2.一年中的阴阳 3.阴阳之间的关系 4.阴阳四季的变化 …

解决:visio导出公式为pdf图片乱码问题

今天需要将Visio编辑好的以后的图输出pdf,但是点击保存后公式部分一直乱码,如下图所示 保存为pdf后会变成: 解决方案:保存时点击文件下方的快速打印,存到桌面,不要直接点击保存

代码随想录算法训练营第二十五天|216.组合总和III,17.电话号码的字母组合

216.组合总和III 题目 找出所有相加之和为 n 的 k 个数的组合。组合中只允许含有 1 - 9 的正整数,并且每种组合中不存在重复的数字。 说明: 所有数字都是正整数。 解集不能包含重复的组合。 示例 1: 输入: k 3, n 7 输出: [[1,2,4]] 示例 2: 输入…

24计算机考研调剂 | 【官方】山东师范大学(22自命题)

山东师范大学2024年拟接收调剂 考研调剂信息 调剂专业目录如下: 计算机技术(085404)、软件工程(085405) 补充内容 我校2024年硕士研究生调剂工作将于4月8日教育部“中国研究生招生信息网”(https://yz.ch…

【模板题】:并查集

【模板】并查集 题目描述 如题,现在有一个并查集,你需要完成合并和查询操作。 输入格式 第一行包含两个整数 N , M N,M N,M ,表示共有 N N N 个元素和 M M M 个操作。 接下来 M M M 行,每行包含三个整数 Z i , X i , Y i Z_i,X_i,Y_i Zi​,Xi​,Yi​ 。 当 Z i …

mysql 常见问题

1、count(*) 、 count(1) 和 count(字段)区别 在MySQL中,COUNT(*)、COUNT(1) 和 COUNT(字段) 是用于统计行数的函数,它们的主要区别在于: COUNT(*):会统计符合条件的所有行的数量,不管这些行中…

深入了解JVM底层原理

一、JVM内存结构 1、方法区:存储编译后的类、常量等(.class字节码文件) 2、堆内存:存储对象 3、程序计数器:存储当前执行的指令地址(计算机处理器(CPU)正在执行的下一条指令在内存…

缺失的数字(c++题解)

题目描述 给出一个0~n组成的数组[0, 1, 2, 3 ... n],从中随机去掉一个数字,给你新的数组,求解被去掉的数字。比如给你[0, 1, 3],返回2。 输入格式 第一行是n。 第二行有n个数字,每个数字用空格隔开,表示…

openwrt下部署clouddrive2

在启动项上增加启动参数 在exit 0前面增加 mount --make-shared /mnt/data480g注意,后面的/mnt/data480g要替换成你设置的共享映射券。 拉取镜像 docker pull cloudnas/clouddrive2启动镜像 一定要用ssh在后台用docker run命令启动,因为openwrt前台…

函数-Python

师从黑马程序员 函数初体验 str1"asdf" str2"qewrew" str3"rtyuio" def my_len(data):count0for i in data:count1print(f"字符串{data}的长度是{count}")my_len(str1) my_len(str2) my_len(str3) 函数的定义 函数的调用 函数名&a…