文章目录
- 前言
- 一、共享内存管理
- 二、图像算法服务中的IPC通信流程
- 三、demo实验结果
- 总结
前言
在一个系统比较复杂的时候,将模块独立成单独的进程有助于错误定位以及异常重启恢复,不至于某个模块发生崩溃导致整个系统崩溃。当通信数据量比较大时,例如图像数据,可以使用共享内存在进程间交互,比socket快很多。下面介绍一个利用Boost.interprocess和Boost.process模块进行进程间图像数据交互,以及子进程调用、卡死、崩溃检测的demo。代码链接在文章结尾。
一、共享内存管理
共享内存依靠字符串名称来对应,A进程创建的变量或者内存空间,在B进程都是依靠唯一的命名来查找的,下面看一段代码:
//进程A
//创建共享内存空间
bip::managed_shared_memory shm(bi::create_only, shm_name.c_str(), size);
// 创建分配器
SharedImage::ShmemAllocator allocator(shm.get_segment_manager());
// 在共享内存中构造 Image 对象s
hared_image = shm.construct<SharedImage>("Image")(allocator);quit = shm.construct<bool>("quit")();//进程B
//打开内存空间
shm = bip::managed_shared_memory(bip::open_only, shm_name);
//查找变量
quit = shm.find<bool>("quit").first; image = shm.find<SharedImage>("Image").first;
共享内存变量可以是任意类型,但是你要能保证它的内存都在申请的共享内存之上,就拿我们自己定义的SharedImage类型来举例:
// 使用共享内存分配器定义的 vector 来存储图像数据
typedef bip::allocator<unsigned char, bip::managed_shared_memory::segment_manager> ShmemAllocator;
typedef bip::vector<unsigned char, ShmemAllocator> ShmemVector;
struct SharedImage
{ int imgWidth;int imgHeight;int imgChannels;int imgStep;bool used;ShmemVector data;// 构造函数SharedImage(const ShmemAllocator& allocator) : data(allocator) {}
};
存储图像数据的data字段我们用了ShmemVector,必须给它分配一个内存分配器才可以使用。必须保证所有的内存都在共享内存之上。如果我们直接创建一个带指针的类型是不行的,例如cv::Mat,它指针所指向的图像数据不在共享内存之上。当然Mat也提供了Matallocator接口,理论上通过在 共享内存之上重写这个分配接口Mat是可以直接创建的,但奈何太复杂了,还是不建议这么做,容易出错。
要注意的是共享内存的销毁,内存不需要使用或者变量不用时:
bi::shared_memory_object::remove(shm_name);
shm.destroy<bool>("quit");
当程序异常结束时共享内存时不会自动释放的,我们重新打开时需要判断内存空间是否存在,然后销毁后重新创建:
static bip::managed_shared_memory create_shared_memory(std::string shm_name, size_t size) {try {// Check if the shared memory already existsbool shm_exists = false;try {bi::shared_memory_object shm(bi::open_only, shm_name.c_str(), bi::read_write);shm_exists = true;} catch (const bi::interprocess_exception& e) {// Shared memory does not existshm_exists = false;}// If shared memory exists, remove itif (shm_exists) {std::cout << "Shared memory already exists. Removing it..." << std::endl;bi::shared_memory_object::remove(shm_name.c_str());}// Create new shared memory bip::managed_shared_memory shm(bi::create_only, shm_name.c_str(), size);std::cout << "Shared memory created successfully." << std::endl;return shm;} catch (const std::exception& e) {std::cerr << "Exception: " << e.what() << std::endl;}return bip::managed_shared_memory();
}
二、图像算法服务中的IPC通信流程
本文讨论的IPC通信场景是用来替代算法库API的,即将真正的算法代码跑在一个进程里面,API作为一个中间件跑在调用者的进程中,将输入写到共享内存,等待算法进程处理完毕后返回结果转给调用方。大致流程如下图所示,在两个进程中都有一个心跳保持线程来监控对方是否还存活,数据输入处理是串行的过程,通过自旋等待监控标记位变化来判断是有新数据需要处理,以及是否处理完毕。这边我没有用进程锁(进程间如果用锁以及条件变量同步有很坑的地方,只要一个进程崩溃,另一个就可能卡死!!!),因为是串行处理,不可能同时写一个标记位。
父进程部分逻辑:
void run(int argc, char* argv[]){try{std::vector<std::string> image_files;cv::glob(argv[1], image_files);// Read the images from the file systemfor (auto &file : image_files){cv::Mat img = cv::imread(file);if (img.empty())continue;std::cout << "read image from file: " << file << std::endl;while(!child.running()){//std::cout<<"wait for child process run"<<std::endl;std::this_thread::yield();}process(img);}}catch (const std::exception &e){std::cerr << "Exception: " << e.what() << std::endl;}}void process(cv::Mat img){TickCount tic;std::cout << "send data to child process" << std::endl;shared_image->imgWidth = img.cols;shared_image->imgHeight = img.rows;shared_image->imgChannels = img.channels();shared_image->imgStep = img.step;shared_image->data.assign(img.data, img.data + img.total() * img.elemSize());shared_image->used = false;std::cout << "Waiting for child process" << std::endl;static int count = 0;static int timeout_count=0;std::cout << "wait" << std::endl;bool timeout_flag = !my_timed_wait(construct_timeout_seconds(2), [&](){ return shared_image->used || *quit; });if (!timeout_flag){count++; timeout_count=0;}else{timeout_count++;if(timeout_count > 2){std::cout<<"child process timeout too many times, may be in dead loop, restart child process"<<std::endl;stop_child_process();}}std::cout <<"time:"<<tic.elapsed()<<"ms"<<std::endl;std::cout << "child process done" << std::endl;std::cout << "count: " << count << std::endl;}void heartbeat(){while (!*quit){auto now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();if (now - *timestamp_child > 3000 && check_start_timestamp()){// 子进程崩溃if(last_start_timestamp > 0){printf("child process crash\n");}start_child_process();//wait for child process to start//std::this_thread::sleep_for(std::chrono::milliseconds(5000));}*timestamp_parent = now;std::this_thread::yield();}}void start_child_process(){if(!check_start_timestamp()) return;if(child.running()) child.terminate();std::cout<<"start child process"<<std::endl;child = boost::process::child("child.exe"); std::cout<<"child pid:"<<child.id()<<std::endl; std::cout<<"start child process end"<<std::endl; last_start_timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();}void stop_child_process(){child.terminate();}bool check_start_timestamp(){auto now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();if (now - last_start_timestamp < 5000)return false;return true;}
子进程逻辑:
void run(){cv::Mat img;while (*quit == false){//std::cout << "Waiting for task" << std::endl;auto timeout = construct_timeout_seconds(2);my_timed_wait(timeout, [&](){ return *quit == true || image->used == false; }); // Wait until the condition variable is notifiedif (*quit)break;if (image->used == true)continue;//std::cout << "get task" << std::endl;img = cv::Mat(image->imgHeight, image->imgWidth, CV_8UC3, image->data.data(), image->imgStep).clone();image->used = true;//printf("Image size: %d x %d\n", img.cols, img.rows);// cond->notify_all();cv::imshow("image", img);cv::waitKey(100);}}void heartbeat(){while (!*quit){auto now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();if (now - *timestamp_parent > 3000){// 父进程退出或崩溃,退出子进程printf("parent process crash, exit\n");exit(0);}*timestamp_child = now;std::this_thread::yield();}}
三、demo实验结果
成功检测到卡死、崩溃并重启,耗时非常短!
总结
- 共享内存创建后不会随着进程结束而释放,需要手动释放。共享内存根据字符串命名作为唯一标识,所以注意命名。
- 变量的所有内存都需要在共享内存之上才可以在进程间传递,所以需要注意含有指针的结构,指针本身以及指针所指向内容都需要在共享内存上,图像可以利用bip::vector存放内容。
- 慎用进程锁和条件变量进行同步,一方面是有进程崩溃导致锁和条件变量卡死的BUG,另一方面是发生竞争时比较耗时。
完整项目代码链接