前言: 小白一个, 没有系统性的学习过python网络通信,只是单纯的cv加修改代码,仅作留念以及参考用,感谢互联网博主们和bito插件,使得chatGPT得以免费使用.
另外该多线程传输图片的速度比没有多线程执行还满,后续不对python服务端做优化,而改为C++服务端实现.写出来继续再分享把
前篇博客地址
python客户端
采用生存者消费者模式、模式2和joinablequeue库.
客户端实现还是比较简单的,麻烦在server端
import pickle
import time
from multiprocessing import Process, JoinableQueue
from queue import Queuefrom multiprocessing.connection import Client, Listenerfrom client_apart import draw_box
from image import plot_boxes, img_encode
import os
from natsort import ns, natsortedhost = 'localhost'
port = 9006
total_time = 0def img_product(img_queue, path, path_mode='image'):if path_mode == 'image':img = img_encode(path)img_obj = {'frame_num': 1, 'image': img} # need frame_num?img_queue.put(img_obj)elif path_mode == 'dir':dir_list = os.listdir(path)files = natsorted(dir_list, alg=ns.PATH) # 顺序读取文件名i = 1for filename in files:img_path = path + '/' + filenameimg = img_encode(img_path)img_obj = {'frame_num': i, 'image': img} # need frame_num?i += 1img_queue.put(img_obj)img_queue.put({'frame_num': 0, 'image': "end"}) # end signal img_queue.join()def server_consumer(img_queue):# 1. send datawhile True:img_obj = img_queue.get()if img_obj is None:client.close() # avoid connection-reset-by-peerbreak # exit enddata_bytes = pickle.dumps(img_obj)start = int(round(time.time() * 1000))start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())client.send(data_bytes) # 40ms/per send img# print('send cost time: ', (end - start))img_queue.task_done()try:det_result = client.recv()end = int(round(time.time() * 1000))end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print('recv cost time: ', (end-start))except EOFError:breakdet_result = pickle.loads(det_result)draw_box(det_result, img_obj)if __name__ == '__main__':client = Client((host, port))Listener()img_dir = './data'one_img = './data/Enterprise001.jpg'mode = 'dir'img_jq = JoinableQueue()producer = Process(target=img_product, args=(img_jq, img_dir, mode,))consumer = Process(target=server_consumer, args=(img_jq,))consumer.daemon = True # set daemon but not set join()producer.start()consumer.start()producer.join()
Python服务端
此处接受图片和发送结果线程处在一个类内,使得可以共享一个Listener,来自chatGPT的idea.
from ctypes import *
import ctypes
import time
import pickle
from PHeader import *import cv2
import numpy as np
from multiprocessing.connection import Listener
from multiprocessing import JoinableQueue, Process, Queue, connection
import threadingclass ListenerThread(threading.Thread):def __init__(self, address, data_handler, lib):super().__init__()self.daemon = Trueself.address = addressself.data_handler = data_handlerself.lib = libself.listener = Listener(self.address, backlog=5)self.conn = Noneself.is_running = Truedef run(self):print('Listening on', self.listener.address)while self.is_running:try:self.conn = self.listener.accept() # ready to accept data continuallyprint('Connected by', self.listener.last_accepted)t1 = threading.Thread(target=self.receive_data)t2 = threading.Thread(target=self.send_result)t1.start()t2.start()t1.join()t2.join()except OSError as e:if e.errno != 98: # Address already in useraiseprint('Address already in use, retrying in 1 second...')time.sleep(1)def destroy_Model(self): # when qt send a signalself.lib.interface_destoryThread()def receive_data(self):time_cost1 = 0while True:try:start = int(round(time.time() * 1000))received_bytes = self.conn.recv() # recv Client dataend = int(round(time.time() * 1000))print('recv time cost: ', end-start)time_cost1 += end-startimg_dict = pickle.loads(received_bytes)if img_dict["frame_num"] == 0:print("receive's thread already receive all data, close thread!!")print("recv: ", time_cost1)self.lib.interface_setEmptyFlag() # make send thread breakbreakimg_dict['image'] = cv2.imdecode(img_dict['image'], cv2.IMREAD_COLOR)self.data_handler.sendImgtoC(self.lib, img_dict, 0) # prepare to send imgexcept EOFError:print('Connection closed')self.conn.close()breakdef send_result(self):time_cost1 = 0time_cost2 = 0while True:self.lib.interface_getClsQueue.restype = ObjClassifyOutputstart = int(round(time.time() * 1000))output = self.lib.interface_getClsQueue() # get result from modelend = int(round(time.time() * 1000))time_cost1 += end-startprint('get cls time cost: ', end-start)if output.object_list.object_num >= 0:cls_result = self.data_handler.CtoP(output)cls_result = pickle.dumps(cls_result)start = int(round(time.time() * 1000))self.conn.send(cls_result)end = int(round(time.time() * 1000))print('send time cost: ', end-start)time_cost2 += end-startelif output.object_list.object_num == -1: # queue is empty for nowtime.sleep(0.04)continueelif output.object_list.object_num == -2: # all data is classifyprint("send's thread alreay handle all data, close thread!!")print("cls: ", time_cost1, ' send: ', time_cost2)# self.close()breakdef close(self): # useless for nowself.conn.close()# self.listener.close()self.run()class DataHandler:def __init__(self):self.data = Nonedef CtoP(self, output): # 将模型结果解析为python列表# [cv_object_list: [cv_object: [cv_box: [] ]]]cv_object_list = []cls_out = []obj_list = output.object_listif obj_list.object_num != 0:for i in range(obj_list.object_num):cv_box = []cv_object = []obj = obj_list.object[i]# bboxcv_box.append(obj.bbox.left_top_x)cv_box.append(obj.bbox.left_top_y)cv_box.append(obj.bbox.w)cv_box.append(obj.bbox.h)cv_object.append(cv_box)# classes/objectness/probcv_object.append(obj.classes)cv_object.append(obj.objectness)prob = POINTER(c_float)(obj.prob)cv_object.append(prob.contents.value)# cv_objectcv_object_list.append(cv_object)cv_object_list.append(obj_list.object_num)# cv_object_listcls_out.append(cv_object_list)return cls_outdef sendImgtoC(self, lib, img_dict, i):lib.interface_receive.argtypes = [PythonMat]# 1. combine info to imgpi = PythonMat()pi.frame_num[0] = img_dict["frame_num"]img = img_dict['image']# 1.1 set width/heightPARAM = c_int * 32height = PARAM()height[0] = img.shape[0]pi.height = heightwidth = PARAM()width[0] = img.shape[1]pi.width = width# 1.2 set Matframe_data = np.asarray(img, dtype=np.uint8)frame_data = frame_data.ctypes.data_as(c_char_p)pi.frame[0] = frame_data# 2. send img to detect modellib.interface_receive(pi)if __name__ == '__main__':address = ('localhost', 9006)data_handler = DataHandler()ll = ctypes.cdll.LoadLibrarylib = ll("../../lib/libDetClsController.so") # create a C liblistener_thread = ListenerThread(address, data_handler, lib)listener_thread.start()try:det_process = threading.Thread(target=lib.interface_initDetectImage)cls_process = threading.Thread(target=lib.interface_initClassifyImage)det_process.start()cls_process.start()det_process.join() # need a break signalcls_process.join()except KeyboardInterrupt:# 程序被强制关闭print('Program terminated')# 关闭ListenerThread对象listener_thread.is_running = Falselistener_thread.join()finally:# 关闭ListenerThread对象listener_thread.is_running = Falselistener_thread.join()
C++模型调用
此处涉及其他人员写的代码,做修改处理
typedef struct
{
int width[CV_MAX_BATCH_SIZE];
int height[CV_MAX_BATCH_SIZE];
char* frame[CV_MAX_BATCH_SIZE];
int frame_num[CV_MAX_BATCH_SIZE];
}PythonMat;// 多线程控制相关
mutex mtxQueueDet; // mutex for detect queue
mutex mtxQueueImg; // mutex for image queue
mutex mtxQueueCls; // mutex for classify queue
mutex mtxif;
queue<> queueDetOut;// Det output queue
queue<> queueClsOut;// Det classify queue
queue<cv::Mat> queueMat;
bool DetectFlag = true;
bool ClassifyFlag = true;
bool empty_flag = false;void receiveImg(PythonMat &img)
{cv::Mat frame(img.height[0], img.width[0], CV_8UC3, img.frame[0]);mtxQueueImg.lock();queueMat.push(frame);cout << "frame num: "<< img.frame_num[0] << endl;mtxQueueImg.unlock();
}void DetectImage()
{Detect detect_output;Detect detectmodel;detectmodel.Init(config_path, 1);cv::Mat frame;while(1){ if (queueMat.empty()) {if(!DetectFlag){break;}usleep(2000);continue;}mtxQueueImg.lock();frame = queueMat.front();queueMat.pop();mtxQueueImg.unlock();detect_output = detectmodel.Run(detect_input);// lockmtxQueueDet.lock();queueDetOut.push(detect_output);cout << "detect run !!" << endl;mtxQueueDet.unlock();}return;
}void ClassifyImage()
{Classify objclassify;objclassify.Init(config_path, 1);ClassifyInput input;ClassifyOutput output;cv::Mat frame;while(1){if (queueDetOut.empty()) {if(!ClassifyFlag){break;}usleep(2000);continue;}mtxQueueDet.lock();detect_result = queueDetOut.front();queueDetOut.pop();mtxQueueDet.unlock();output = objclassify.Run(input);mtxQueueCls.lock();queueClsOut.push(output);mtxQueueCls.unlock();}return;
}ClassifyOutput getClsQueue(){ObjClassifyOutput output;if (queueClsOut.empty()){usleep(2000);output.object_list.object_num = -1; // -1 is now empty; -2 is all empty if (empty_flag){output.object_list.object_num = -2;empty_flag = false;}return output;}mtxQueueCls.lock();output = queueClsOut.front();queueClsOut.pop();cout << "cls_out pop " << output.object_list.object_num << endl;mtxQueueCls.unlock();return output;}extern "C" {
int i = 0;
void interface_initDetectImage(){// if (flag ) exit thread detect/classify DetectImage();
}
void interface_initClassifyImage(){ClassifyImage();
}
void interface_receive(PythonMat &img){printf("run %d times\n", i++);receiveImg(img);
}
void interface_setEmptyFlag(){empty_flag = true;
}
void testThread(){printf("C++ method!!\n");
}
ClassifyOutput interface_getClsQueue(){ClassifyOutput output;output = getClsQueue();return output;
}void interface_destoryThread(){DetectFlag = false;ClassifyFlag = false;
}
}
至此这个混合编程项目结束,但是server接收到client耗时居然300ms,需要优化…
参考博客
- 共享内存
- Address already in use
- connection-reset-by-peer 我是把client.close()加上就不报错
- (最终版)linux下python和c++相互调用共享内存通信
- 常见python通信错误
- EOFError
- OSError: handle is closed
- Broken pipe