混合编程之多线程

前言: 小白一个, 没有系统性的学习过python网络通信,只是单纯的cv加修改代码,仅作留念以及参考用,感谢互联网博主们和bito插件,使得chatGPT得以免费使用.
另外该多线程传输图片的速度比没有多线程执行还满,后续不对python服务端做优化,而改为C++服务端实现.写出来继续再分享把

前篇博客地址

python客户端

采用生存者消费者模式、模式2和joinablequeue库.
客户端实现还是比较简单的,麻烦在server端

import pickle
import time
from multiprocessing import Process, JoinableQueue
from queue import Queuefrom multiprocessing.connection import Client, Listenerfrom client_apart import draw_box
from image import plot_boxes, img_encode
import os
from natsort import ns, natsortedhost = 'localhost'
port = 9006
total_time = 0def img_product(img_queue, path, path_mode='image'):if path_mode == 'image':img = img_encode(path)img_obj = {'frame_num': 1, 'image': img}  # need frame_num?img_queue.put(img_obj)elif path_mode == 'dir':dir_list = os.listdir(path)files = natsorted(dir_list, alg=ns.PATH)  # 顺序读取文件名i = 1for filename in files:img_path = path + '/' + filenameimg = img_encode(img_path)img_obj = {'frame_num': i, 'image': img}  # need frame_num?i += 1img_queue.put(img_obj)img_queue.put({'frame_num': 0, 'image': "end"})  # end signal img_queue.join()def server_consumer(img_queue):# 1. send datawhile True:img_obj = img_queue.get()if img_obj is None:client.close()  # avoid connection-reset-by-peerbreak  # exit enddata_bytes = pickle.dumps(img_obj)start = int(round(time.time() * 1000))start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())client.send(data_bytes)  # 40ms/per send img# print('send cost time: ', (end - start))img_queue.task_done()try:det_result = client.recv()end = int(round(time.time() * 1000))end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print('recv cost time: ', (end-start))except EOFError:breakdet_result = pickle.loads(det_result)draw_box(det_result, img_obj)if __name__ == '__main__':client = Client((host, port))Listener()img_dir = './data'one_img = './data/Enterprise001.jpg'mode = 'dir'img_jq = JoinableQueue()producer = Process(target=img_product, args=(img_jq, img_dir, mode,))consumer = Process(target=server_consumer, args=(img_jq,))consumer.daemon = True  # set daemon but not set join()producer.start()consumer.start()producer.join()

Python服务端

此处接受图片和发送结果线程处在一个类内,使得可以共享一个Listener,来自chatGPT的idea.

from ctypes import *
import ctypes
import time
import pickle
from PHeader import *import cv2
import numpy as np
from multiprocessing.connection import Listener
from multiprocessing import JoinableQueue, Process, Queue, connection
import threadingclass ListenerThread(threading.Thread):def __init__(self, address, data_handler, lib):super().__init__()self.daemon = Trueself.address = addressself.data_handler = data_handlerself.lib = libself.listener = Listener(self.address, backlog=5)self.conn = Noneself.is_running = Truedef run(self):print('Listening on', self.listener.address)while self.is_running:try:self.conn = self.listener.accept()  # ready to accept data continuallyprint('Connected by', self.listener.last_accepted)t1 = threading.Thread(target=self.receive_data)t2 = threading.Thread(target=self.send_result)t1.start()t2.start()t1.join()t2.join()except OSError as e:if e.errno != 98:  # Address already in useraiseprint('Address already in use, retrying in 1 second...')time.sleep(1)def destroy_Model(self):  # when qt send a signalself.lib.interface_destoryThread()def receive_data(self):time_cost1 = 0while True:try:start = int(round(time.time() * 1000))received_bytes = self.conn.recv()  # recv Client dataend = int(round(time.time() * 1000))print('recv time cost: ', end-start)time_cost1 += end-startimg_dict = pickle.loads(received_bytes)if img_dict["frame_num"] == 0:print("receive's thread already receive all data, close thread!!")print("recv: ", time_cost1)self.lib.interface_setEmptyFlag()  # make send thread breakbreakimg_dict['image'] = cv2.imdecode(img_dict['image'], cv2.IMREAD_COLOR)self.data_handler.sendImgtoC(self.lib, img_dict, 0)  # prepare to send imgexcept EOFError:print('Connection closed')self.conn.close()breakdef send_result(self):time_cost1 = 0time_cost2 = 0while True:self.lib.interface_getClsQueue.restype = ObjClassifyOutputstart = int(round(time.time() * 1000))output = self.lib.interface_getClsQueue()  # get result from modelend = int(round(time.time() * 1000))time_cost1 += end-startprint('get cls time cost: ', end-start)if output.object_list.object_num >= 0:cls_result = self.data_handler.CtoP(output)cls_result = pickle.dumps(cls_result)start = int(round(time.time() * 1000))self.conn.send(cls_result)end = int(round(time.time() * 1000))print('send time cost: ', end-start)time_cost2 += end-startelif output.object_list.object_num == -1:   # queue is empty for nowtime.sleep(0.04)continueelif output.object_list.object_num == -2:   # all data is classifyprint("send's thread alreay handle all data, close thread!!")print("cls: ", time_cost1, ' send: ', time_cost2)# self.close()breakdef close(self):  # useless for nowself.conn.close()# self.listener.close()self.run()class DataHandler:def __init__(self):self.data = Nonedef CtoP(self, output):  # 将模型结果解析为python列表# [cv_object_list: [cv_object: [cv_box: [] ]]]cv_object_list = []cls_out = []obj_list = output.object_listif obj_list.object_num != 0:for i in range(obj_list.object_num):cv_box = []cv_object = []obj = obj_list.object[i]# bboxcv_box.append(obj.bbox.left_top_x)cv_box.append(obj.bbox.left_top_y)cv_box.append(obj.bbox.w)cv_box.append(obj.bbox.h)cv_object.append(cv_box)# classes/objectness/probcv_object.append(obj.classes)cv_object.append(obj.objectness)prob = POINTER(c_float)(obj.prob)cv_object.append(prob.contents.value)# cv_objectcv_object_list.append(cv_object)cv_object_list.append(obj_list.object_num)# cv_object_listcls_out.append(cv_object_list)return cls_outdef sendImgtoC(self, lib, img_dict, i):lib.interface_receive.argtypes = [PythonMat]# 1. combine info to imgpi = PythonMat()pi.frame_num[0] = img_dict["frame_num"]img = img_dict['image']# 1.1 set width/heightPARAM = c_int * 32height = PARAM()height[0] = img.shape[0]pi.height = heightwidth = PARAM()width[0] = img.shape[1]pi.width = width# 1.2 set Matframe_data = np.asarray(img, dtype=np.uint8)frame_data = frame_data.ctypes.data_as(c_char_p)pi.frame[0] = frame_data# 2. send img to detect modellib.interface_receive(pi)if __name__ == '__main__':address = ('localhost', 9006)data_handler = DataHandler()ll = ctypes.cdll.LoadLibrarylib = ll("../../lib/libDetClsController.so")  # create a C liblistener_thread = ListenerThread(address, data_handler, lib)listener_thread.start()try:det_process = threading.Thread(target=lib.interface_initDetectImage)cls_process = threading.Thread(target=lib.interface_initClassifyImage)det_process.start()cls_process.start()det_process.join()  # need a break signalcls_process.join()except KeyboardInterrupt:# 程序被强制关闭print('Program terminated')# 关闭ListenerThread对象listener_thread.is_running = Falselistener_thread.join()finally:# 关闭ListenerThread对象listener_thread.is_running = Falselistener_thread.join()

C++模型调用

此处涉及其他人员写的代码,做修改处理

typedef struct 
{
int width[CV_MAX_BATCH_SIZE];
int height[CV_MAX_BATCH_SIZE];
char* frame[CV_MAX_BATCH_SIZE]; 
int frame_num[CV_MAX_BATCH_SIZE]; 
}PythonMat;// 多线程控制相关
mutex mtxQueueDet;  // mutex for detect queue
mutex mtxQueueImg;  // mutex for image queue
mutex mtxQueueCls;  // mutex for classify queue
mutex mtxif;
queue<> queueDetOut;// Det output queue
queue<> queueClsOut;// Det classify queue
queue<cv::Mat> queueMat;
bool DetectFlag = true;
bool ClassifyFlag = true;
bool empty_flag = false;void receiveImg(PythonMat &img)
{cv::Mat frame(img.height[0], img.width[0], CV_8UC3, img.frame[0]);mtxQueueImg.lock();queueMat.push(frame);cout << "frame num: "<<  img.frame_num[0] << endl;mtxQueueImg.unlock();
}void DetectImage()
{Detect detect_output;Detect detectmodel;detectmodel.Init(config_path, 1);cv::Mat frame;while(1){ if (queueMat.empty()) {if(!DetectFlag){break;}usleep(2000);continue;}mtxQueueImg.lock();frame = queueMat.front();queueMat.pop();mtxQueueImg.unlock();detect_output = detectmodel.Run(detect_input);// lockmtxQueueDet.lock();queueDetOut.push(detect_output);cout << "detect run !!" << endl;mtxQueueDet.unlock();}return;
}void ClassifyImage()
{Classify objclassify;objclassify.Init(config_path, 1);ClassifyInput input;ClassifyOutput output;cv::Mat frame;while(1){if (queueDetOut.empty()) {if(!ClassifyFlag){break;}usleep(2000);continue;}mtxQueueDet.lock();detect_result = queueDetOut.front();queueDetOut.pop();mtxQueueDet.unlock();output = objclassify.Run(input);mtxQueueCls.lock();queueClsOut.push(output);mtxQueueCls.unlock();}return;
}ClassifyOutput getClsQueue(){ObjClassifyOutput output;if (queueClsOut.empty()){usleep(2000);output.object_list.object_num = -1;  // -1 is now empty; -2 is all empty if (empty_flag){output.object_list.object_num = -2;empty_flag = false;}return output;}mtxQueueCls.lock();output = queueClsOut.front();queueClsOut.pop();cout << "cls_out pop " << output.object_list.object_num << endl;mtxQueueCls.unlock();return output;}extern "C" {
int i = 0;
void interface_initDetectImage(){// if (flag ) exit thread detect/classify DetectImage();
}
void interface_initClassifyImage(){ClassifyImage();
}
void interface_receive(PythonMat &img){printf("run %d times\n", i++);receiveImg(img); 
}
void interface_setEmptyFlag(){empty_flag = true;
}
void testThread(){printf("C++ method!!\n");
}
ClassifyOutput interface_getClsQueue(){ClassifyOutput output;output = getClsQueue();return output;
}void interface_destoryThread(){DetectFlag = false;ClassifyFlag = false;
}
}

至此这个混合编程项目结束,但是server接收到client耗时居然300ms,需要优化…

参考博客

  1. 共享内存
  2. Address already in use
  3. connection-reset-by-peer 我是把client.close()加上就不报错
  4. (最终版)linux下python和c++相互调用共享内存通信
  5. 常见python通信错误
    1. EOFError
    2. OSError: handle is closed
    3. Broken pipe

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66446.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

科技资讯|苹果发布新专利:可在车内定位苹果的智能设备

根据美国商标和专利局近期公示的清单&#xff0c;苹果公司获得了一项名为《车内定位移动设备的系统和方式》专利&#xff0c;概述了在车内狭窄空间内如何定位 iPhone 等移动设备。 Find My 服务现阶段没有使用 UWB 来追踪 iPhone 或者 iPad&#xff0c;而是依赖 GPS 等相关辅…

为什么删除Windows 11上的Bloatware可以帮助加快你的电脑速度

如果你感觉你的电脑迟钝&#xff0c;彻底清除软件会有所帮助&#xff0c;而且这个过程对Windows用户来说越来越容易。 微软正在使删除以前难以删除的其他预装Windows应用程序成为可能。专家表示&#xff0c;这项新功能可能会改变用户的游戏规则。 科技公司Infatica的主管Vlad…

深度解读零信任身份安全—— 全面身份化:零信任安全的基石

事实上&#xff0c;无论是零信任安全在数据中心的实践&#xff0c;还是通用的零信任安全架构实践&#xff0c;全面身份化都是至关重要的&#xff0c;是“企业边界正在瓦解&#xff0c;基于边界的安全防护体系正在失效”这一大背景下&#xff0c;构筑全新的零信任身份安全架构的…

【C++】多态学习

多态 多态的概念与定义多态的概念构成多态的两个条件虚函数与重写重写的两个特例 final 和 override重载、重写(覆盖)、重定义(隐藏)的对比抽象类多态的原理静态绑定与动态绑定 单继承与多继承关系下的虚函数表(派生类)单继承中的虚函数表查看多继承中的虚函数表查看 菱形继承与…

关于一个git的更新使用流程

1.第一步使用git bash 使用git bash命令来进行操作&#xff08;当然我是个人比较喜欢用这种方法的&#xff09; 2. 第二步&#xff1a;连接 3.第三步&#xff1a;进入 4.第四步&#xff1a;查看分支 5.第五步&#xff1a;切换分支 将本地文件更新后之后进行提交 6.第六步&am…

【个人博客系统网站】框架升级 · 工程目录 · 数据库设计

【JavaEE】进阶 个人博客系统&#xff08;1&#xff09; 文章目录 【JavaEE】进阶 个人博客系统&#xff08;1&#xff09;1. 使用Spring全家桶 MyBatis框架进行开发2. 页面2.1 登录页2.2 注册页2.3 详情页2.4 我的博客列表页3.5 所有人的博客列表页3.6 添加博客页3.7 修改文…

华为云 sfs 服务浅谈

以root用户登录弹性云服务器。 以root用户登录弹性云服务器。 安装NFS客户端。 查看系统是否安装NFS软件包。 CentOS、Red Hat、Oracle Enterprise Linux、SUSE、Euler OS、Fedora或OpenSUSE系统下&#xff0c;执行如下命令&#xff1a; rpm -qa|grep nfs Debian或Ubuntu系统下…

设计模式—观察者模式(Observer)

目录 思维导图 一、什么是观察者模式&#xff1f; 二、有什么优点吗&#xff1f; 三、有什么缺点吗&#xff1f; 四、什么时候使用观察者模式&#xff1f; 五、代码展示 ①、双向耦合的代码 ②、解耦实践一 ③、解耦实践二 ④、观察者模式 六、这个模式涉及到了哪些…

开发一个npm包

1 注册一个npm账号 npm https://www.npmjs.com/ 2 初始化一个npm 项目 npm init -y3编写一段代码 function fn(){return 12 }exports.hellofn;4发布到全局node_module npm install . -g5测试代码 创建一个text文件 npm link heath_apisnode index.js6登录(我默认的 https…

docker,nvidia-docker安装

卸载先前的docker Docker 的旧版本被称为 docker&#xff0c;docker.io 或 docker-engine 。如果已安装&#xff0c;请卸载它们&#xff1a; sudo apt-get remove docker docker-engine docker.io containerd runc使用 Docker 仓库进行安装 设置仓库 更新 apt 包索引 sudo…

基于单片机教室人数实时检测系统

一、系统方案 主程序中main函数主要是引脚的初始化&#xff0c;给单片机引脚初始化&#xff0c;初始化LCD1602&#xff0c;初始化红外对管&#xff0c;通过对LCD1602赋值&#xff0c;采集进入教室的人数&#xff0c;显示在LCD1602上面进出人数我们采用按键的形式&#xff0c;检…

opencv鼠标事件函数setMouseCallback()详解

文章目录 opencv鼠标事件函数setMouseCallback()详解1、鼠标事件函数&#xff1a;&#xff08;1&#xff09;鼠标事件函数原型&#xff1a;setMouseCallback()&#xff0c;此函数会在调用之后不断查询回调函数onMouse()&#xff0c;直到窗口销毁&#xff08;2&#xff09;回调函…

系列九、Java操作RocketMQ之顺序消息

一、概述 消息有序指的是可以按照消息发送的顺序来消费&#xff08;FIFO&#xff09;&#xff0c;RocketMQ可以严格的保证消息有序&#xff0c;按照分区的不同&#xff0c;可以分为分区有序和全局有序。顺序消费的原理解析&#xff0c;在默认情况下消息发送会按照轮询的方式把消…

肖sir__linux详解__001

linux详解: 1、ifconfig 查看ip地址 2、6版本&#xff1a;防火墙的命令&#xff1a; service iptables status 查看防火墙状态 service iptables statrt 开启防火墙 service iptables stop 关闭防火墙 service iptables restart 重启防火墙状态 7版本&#xff1a; systemctl s…

考前冲刺上岸浙工商MBA的备考经验分享

2023年对于许多人来说都是不平凡的一年&#xff0c;历经三年的抗争&#xff0c;我们终于成功结束了疫情。而我也很幸运的被浙工商MBA项目录取&#xff0c;即将开始全新的学习生活。身为一名已在职工作6年的人&#xff0c;能够重回校园真是一种特别令人激动的体验。今天&#xf…

域内密码喷洒

在Kerberos阶段认证的AS-REQ阶段&#xff0c;请求包cname对应的值是用户名&#xff0c;当用户名存在时候&#xff0c;密码正确和错误两种情况下&#xff0c;AS-REP返回包不一样&#xff0c;所以可以利用这一点对域用户名进行密码喷洒攻击 域内密码喷洒工具 Kerbrute kerbrut…

【Redis】redis入门+java操作redis

目录 一、Redis入门 1.1 Redis简介 1.2 Redis下载与安装 1.2.1 下载 1.2.2 linux安装 1.2.3 windows安装 1.3 Redis服务启动与停止 1.3.1 linux启动、停止Redis服务 1.3.2 windows启动、停止Redis服务 1.4 修改Redis启动密码 1.4.1 Linux修改设置 1.4.2 windows设…

使用HTTPS模式建立高效爬虫IP服务器详细步骤

嘿&#xff0c;各位爬虫小伙伴们&#xff01;想要自己建立一个高效的爬虫IP服务器吗&#xff1f;今天我就来分享一个简单而强大的解决方案——使用HTTPS模式建立工具&#xff01;本文将为你提供详细的操作步骤和代码示例&#xff0c;让你快速上手&#xff0c;轻松建立自己的爬虫…

Scala并发编程的react、loop方法详解

Scala并发编程的react、loop方法详解 在 Scala 中编写并发应用程序&#xff0c;我们通常会使用 Actor 和 ActorSystem 来创建和管理 Actor&#xff0c;而 react 和 loop 方法则是 Actor 的两个重要方法。 1. react 方法&#xff1a; react 方法是 Actor 类中最基本的消息处理…

python爬虫13:pymysql库

python爬虫13&#xff1a;pymysql库 前言 ​ python实现网络爬虫非常简单&#xff0c;只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点&#xff0c;方便以后复习。 申明 ​ 本系列所涉及的代码仅用于个人研究与讨论&#xff0c;并不会对网站产生…