基础篇:多线程所需知识:RAII接口模式对生产者和消费者封装以及多batch实现

我们先来弄一个最基础的infer类:


class Infer{
public: bool load_model(const string &file){context_ = file;return true;} void forward(){if(context_.empty()){printf("加载模型异常\n");return;}printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};

正常的模型是很复杂的,但在这里我们就用string代替了。

那这个代码重点就在于异常逻辑,在正常工作代码中,异常逻辑如果没有写好,就会造成封装的不安全性,甚至会导致程序崩溃。

比如之前的load model:如果没有load过还好,但是如果load过了,那不久导致了两次load_model么?

bool load_model(const string &file){if (!context_.empty()){/* code */destroy();}context_ = file;return true;} 

所以在load前要先看一眼,如果load过了就destroy

这个时候就能引出我们的RAII + 接口模式

RAII

RAII -------> 资源获取即初始化

接口i模式 ------> 设计模式,是一种封装模式,实现类与接口分离的模式

那就是将原本的

Infer infer_;
infer.load_model("aa");

合为一步,即获取就可以初始化

shared_ptr <Infer> create_infer(const string& file){shared_ptr <Infer> instance(new Infer);if (!instance->load_model(file)){instance.reset();}return instance;}

所以这里是表示如果模型加载失败,则代表资源获取失败,返回空指针。


int main(){auto infer_ = create_infer("aa");if (infer_ == nullptr){/* code */printf("failed\n");return -1;}infer_->forward();return 0;
}

所以现在的结果非常简单,只要为空指针则表示失败。不为空则成功。

这样就避免了外部执行load_model(RAII制作到一部分没有完全做到)。

避免了load_model不会执行超过一次

获取的模型一定初始化成功,因此forward不用判断模型初始化成功。

load model中可以删除对于重复load的判断,forward可以删掉是否家在成功的判断。

接口模式封装:

那么既然我们已经能用create了,可基于Infer的public属性,还是可以调用load_model的,所以就要进入到接口模式封装了。

接口模式解决问题:

1、解决load_model还能被外部看到的问题。

2、解决成员变量对外可见的问题(比如成员函数是特殊类型的,比如cudaStream_t ,那么头文件必须包含cuda_runtime.h,就会造成命名空间污染,头文件污染的问题 , 不干净的结果就会造成程序错误等异常)

接口类是一个纯虚类

原则是只暴露调用者需要的函数,其他一概隐藏起来

比如说load_model,咱们通过RAII做了定义,因此load_model属于不需要的范畴

内部如果有启动线程等等,start , stop ,也不需要暴露 , 而是初始化的时候自动启动,都是RAII的定义

class InferInterface
{
private:/* data */
public:virtual void forward() = 0 ;};

forward 要做成纯虚函数。

class InferImpl : public InferInterface{
public: bool load_model(const string &file){context_ = file;return true;} virtual void  forward()override{printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};

然后要让原来的类继承于接口类

shared_ptr <InferInterface> create_infer(const string& file){shared_ptr <InferImpl> instance(new InferImpl);if (!instance->load_model(file)){instance.reset();}return instance;}

在返回类型的时候返回的是Interface , 在new的时候选的是InferImpl。

 这样作为一个使用者除了forward真的还没有其他的可选项。

之后可以再进一步设置Infer.hpp作为头文件

#ifndef INFER_HPP
#define INFER_HPP
#include<memory>
#include<string>
class InferInterface
{
private:/* data */
public:virtual void forward() = 0 ;};
std::shared_ptr <InferInterface> create_infer(const std::string& file);
#endif  /. INFER_HPP

实现放到Infer.cpp中:

#include "infer.hpp"
using namespace std;
class InferImpl : public InferInterface{
public: bool load_model(const string &file){context_ = file;return !context_.empty();} virtual void  forward()override{printf("使用%s进行推理\n " , context_.c_str());}void destroy(){context_.clear();}
private:string context_;};
shared_ptr <InferInterface> create_infer(const string& file){shared_ptr <InferImpl> instance(new InferImpl);if (!instance->load_model(file)){instance.reset();}return instance;}

这样在main函数里就只需要


#include"infer.hpp"
using namespace std;int main(){auto infer_ = create_infer("aa");if (infer_ == nullptr){/* code */printf("failed\n");return -1;}infer_->forward();return 0;
}

这样就做到了

  • 头文件只依赖最简单的
  • 成员变量看不到
  • 只能访问forward

RAII接口总结原则:

1、头文件应尽量简洁

2、不需要的东西都放到cpp当中,不要让接口类看到

比如:

#include <cudaruntime.h>
class InferInterface
{
private:cudaStream_t stream;
public:virtual void forward() = 0 ;};

这样就不符合尽量简洁的原则,因为这里用不到stream,应该尽可能放到cpp里。

3、尽量不要在头文件写using  namespace,因为写了这个就代表命名空间被打开了,这样就使得所有包含了这个头文件的人的所有命名空间都被打开。但CPP里可以写

多batch实现:

在这里我们主要是进行一个推理的实现。

老规矩:弄一个线程worker_thread_,为了尽量使得资源尽量那里分配那里释放,这样可以保证程序足够简单。在worker内实现模型的家在,使用,与释放。

那样的话

  
bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单worker_thread_ = thread(&InferImpl::worker , this , file);return context_.empty();} 

这肯定是不可以的,毕竟这个线程还没来的及进行worker,就要return了

这时候就要使用future

bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));context_ = file;return pro.get_future().get();} 

对应的worker:

void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}while (true){/* code */}}

这样我们就完成了在局部内加载使用释放

仿照上一节完成一下消费者模式:


struct Job{shared_ptr<promise<string>>pro;string input; 
};class InferImpl : public InferInterface{
public: bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));return pro.get_future().get();} virtual void  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);}//实际的推理阶段void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}while (true){if (!qjobs_.empty()){lock_guard<mutex> l(job_lock);auto job = qjobs_.front();qjobs_.pop();//inference:printf();}this_thread::yield();}  }
private:thread worker_thread_;queue<Job> qjobs_;mutex  job_lock;condition_variable cv_;
};

但这里每次都是一个一个地拿出来pop,我们要的是多batch,最好是一次能拿batchsize个出来。

这是就可以用vector:

  int maxbatchsize  =  5;vector<Job> jobs;while (true){if (!qjobs_.empty()){lock_guard<mutex> l(job_lock);while (jobs.size < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}//inference:printf();}this_thread::yield();}  

通知模式

那我们这个消费者不能一直while(true)啊,于是就要对其进行一个通知,也就是有东西进来了,再进行消费。所以就要使用到condition_variable的wait()和notify_one()了。

    virtual void  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();}

在传入的时候通知一下我。

    void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}int maxbatchsize  =  5;vector<Job> jobs;//i消费者:while (true){unique_lock<mutex> l(job_lock);cv_.wait(l , [&]{return !qjobs_.empty();});while (jobs.size() < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}//inference:printf();}}

在这里要进行一个等待。之后还可以去除qjobs_.empty()的判断。

这种通知的模式也要比我们之前主动检查模式要更好。

之后就要进行推理了:

for (int  i = 0; i < jobs.size(); i++){auto& job = jobs[i];char result [100];sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());job.pro->set_value(result);}batch_id++;jobs.clear();this_thread::sleep_for(chrono::milliseconds(1000));

 而这个时候set_value过后我们就要考虑一下返回的操作了。

返回future格式

如果forward是采用了

    virtual string  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();return job.pro->get_future().get();}

这种形式进行返回的话,get() will wait until the whole inference , this not the meaning of batchs

    virtual shared_future<string>  forward(string pic) override{// printf("使用%s进行推理\n " , context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock);qjobs_.push(job);cv_.notify_one();return job.pro->get_future();}

这样的话就可以直接返回future,让执行人有更多的选择。

int main(){auto infer_a = create_infer("aa");auto result_a =  infer_a->forward("aa");auto result_b =infer_a->forward("bb");auto result_c =infer_a->forward("cc");printf("%s   \n" , result_a.get().c_str());printf("%s   \n" , result_b.get().c_str());printf("%s   \n" , result_c.get().c_str());return 0;
}

执行过后:

 可见是一个batch去实现的。

但是如果要是先get的话,

 那就是分批次了,因为get就代表必须完成整个步骤之后返回。

退出模型

    atomic<bool> running_{false};

设立私有变量,表示程序是否还在运行。

在loadmodel时设立running_为true

bool load_model(const string &file){//使得资源尽量那里分配那里释放,这样可以保证程序足够简单running_ = true;promise<bool> pro;worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));return pro.get_future().get();} 
 void worker(string file  , promise<bool>&pro){string context_ = file;if (context_.empty()){/* code */pro.set_value(false);}else{pro.set_value(true);}int maxbatchsize  =  5;vector<Job> jobs;int batch_id = 0;//i消费者:while (running_){unique_lock<mutex> l(job_lock);cv_.wait(l , [&]{return !qjobs_.empty() ||  !running_;});if(! running_){break;}while (jobs.size() < maxbatchsize && !qjobs_.empty()){jobs.emplace_back( qjobs_.front());qjobs_.pop();}for (int  i = 0; i < jobs.size(); i++){auto& job = jobs[i];char result [100];sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());job.pro->set_value(result);}batch_id++;jobs.clear();this_thread::sleep_for(chrono::milliseconds(1000));}printf("释放->%s \n", context_.c_str());context_.clear();printf("Worker Done\n");}

同时worker也要改变,while的判断机制也需要改变,cv_.wait也不能是只有qjobs为空,还要加上是否还在运行。如果不运行了要break出while循环。

在结束之后要释放模型。

析构函数

    virtual ~InferImpl(){running_ =  false;cv_.notify_one();if(worker_thread_.joinable()){worker_thread_.join();}} 

重点在于析沟函数,在该类退出后设置线程同步,并且在running_设置为false的时候通知一次wait。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/21639.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学会这样提问,你就超过了82.7%的老网工

下午好&#xff0c;我的网工朋友 很多朋友会说&#xff0c;我怎么问问题&#xff0c;在群里&#xff0c;或者后台&#xff0c;还有给老杨总发消息&#xff0c;都没收到比较详细的回复&#xff1f;尤其是问技术问题的时候。 除了我们回复消息的时间和精力的确比较有限之外&…

解密外接显卡:笔记本能否接外置显卡?如何连接外接显卡?

伴随着电脑游戏和图形处理的需求不断增加&#xff0c;很多笔记本电脑使用者开始考虑是否能够通过外接显卡来提升性能。然而&#xff0c;外接显卡对于笔记本电脑是否可行&#xff0c;以及如何连接外接显卡&#xff0c;对于很多人来说仍然是一个迷。本文将为您揭秘外接显卡的奥秘…

单价20块蓝牙耳机卖爆越南市场,现象级爆款出现?

以儒道为文化底蕴的越南&#xff0c;是与中国最为相近的东南亚国家&#xff0c;"快速增长的劳动人口相对年轻的社会群体"是很多人对越南这个国家的基本认知。背靠庞大的Z世代用户群体&#xff0c;越南社会年轻化消费需求暴涨&#xff0c;手机与数码品类商品作为“年轻…

Linux中使用verdaccio 搭建私有npm 服务器

安装verdaccio npm i -g verdaccio安装完成 输入verdaccio,出现下面信息代表安装成功&#xff0c;同时输入verdaccio后verdaccio已经处于运行状态&#xff0c;当然这种启动时暂时的&#xff0c;我们需要通过pm2让verdaccio服务常驻 ygiZ2zec61wsgbo9t9i346jbZ:~$ verdacciowar…

Vue使用QuillEditor富文本编辑器问题记录

1.内容绑定的问题 绑定内容要使用 v-model:content"xxx" 的形式。 2.设置字体字号 字体以及字号大小的设置需要先注册。 <script> import { QuillEditor,Quill } from vueup/vue-quill import vueup/vue-quill/dist/vue-quill.snow.css; // 设置字体大小 c…

Elasticsearch和Kibana的安装及验证

金翅大鹏盖世英&#xff0c;展翅金鹏盖世雄。 穿云燕子锡今鸽&#xff0c;踏雪无痕花云平。 ---------------- 2023.7.31.101 ----------------- 本文密钥&#xff1a;365 Elasticsearch 是一个分布式的 RESTful 风格的搜索和数据分析引擎&#xff0c;常用来进行全文检索、…

c语言指针的运算

1、通过指针计算数组的元素&#xff08;指针相减&#xff0c;类型需要一致&#xff09;&#xff0c;比如数组元素指针相减得到的是中间相差的元素个数&#xff0c;可以用于计算数组元素的个数等 #include "stdio.h" #include <stdlib.h>int main() {int a[10]…

【树状数组】讲解

一.介绍 树状数组&#xff08;Fenwick Tree&#xff09;&#xff0c;也称为二叉索引树&#xff08;Binary Indexed Tree&#xff0c;BIT&#xff09;&#xff0c;是一种用于高效处理动态数组前缀和的数据结构。它可以在O(log n)的时间复杂度内完成单点更新和区间查询操作。 树…

C# Blazor 学习笔记(3):路由管理

文章目录 前言路由管理App.razor设置登录页面设置空布局 前言 我们知道使用Blazor的官方模板&#xff0c;我们会自动得到一个拥有侧边栏的布局页面。但是我们发现我们所有新建的页面都有侧边栏。有时候我们需要跳出这个布局&#xff0c;比如我要做登录页面的时候&#xff0c;我…

微信小程序中的全局数据共享(状态管理)使用介绍

开发工具&#xff1a;微信开发者工具Stable 1.06 一、状态管理简介 微信小程序全局状态是指可以在不同页面之间共享的数据或状态。 它可以存储用户的登录状态、个人信息、全局配置信息等。 二、安装MobX 1、安装NPM 在资源管理器的空白地方点右键&#xff0c;选择“在外部…

css在线代码生成器

这里收集了许多有意思的css效果在线代码生成器适合每一位前端开发者 布局&#xff0c;效果类&#xff1a; 网格生成器https://cssgrid-generator.netlify.app/ CSS Grid Generator可帮助开发人员使用CSS Grid创建复杂的网格布局。网格布局是创建Web页面的灵活和响应式设计的强…

解密HTTP代理爬虫中的IP代理选择与管理策略

在当今数据驱动的世界中&#xff0c;HTTP代理爬虫作为一项重要的数据采集工具&#xff0c;其成功与否往往取决于IP代理的选择与管理策略。作为一家专业的HTTP代理产品供应商&#xff0c;我们深知IP代理在数据采集中的重要性。在本文中&#xff0c;我们将分享一些关于HTTP代理爬…

如何使用 ChatGPT 为 Midjourney 或 DALL-E 等 AI 图片生成提示词

人工智能为创意产业开辟了一个充满可能性的全新世界。人工智能最令人兴奋的应用之一是生成独特且原创的艺术品。Midjourney 和 DALL-E 是人工智能生成艺术的两个突出例子&#xff0c;吸引了艺术家和艺术爱好者的注意。在本文中&#xff0c;我们将探索如何使用 ChatGPT 生成 AI …

某科技公司提前批测试岗

文章目录 题目 今天给大家带来一家提前批测试岗的真题&#xff0c;目前已经发offer 题目 1.自我介绍 2.登录页面测试用例设计 3.如何模拟多用户登录 可以使用Jmeter,loadRunner性能测试工具来模拟大量用户登录操作去观察一些参数变化 4.有使用过Jmeter,loadRunner做过性能压…

Jmeter组件作用域及执行顺序

目录 一、Jmeter八大可执行元件 二、组件执行顺序 三、组件作用域 四、特殊说明 一、Jmeter八大可执行元件 配置元件---Config Element 用于初始化默认值和变量&#xff0c;以便后续采样器使用。配置元件大其作用域的初始阶段处理&#xff0c;配置元件仅对其所在的测试树分…

数学知识(一)

一、数论 1.1质数 定义:在所有大于1的自然数&#xff0c;如果只包含1和本身这两个约数&#xff0c;就被称为质数(素数). 质数的判断:试除法 bool is_prime(int n) {if(n < 2) return false;for(int i 2;i < n / i;i ){if(n % i 0)return false;}return true; } 分…

【LangChain】向量存储(Vector stores)

LangChain学习文档 【LangChain】向量存储(Vector stores)【LangChain】向量存储之FAISS 概要 存储和搜索非结构化数据的最常见方法之一是嵌入它并存储生成的嵌入向量&#xff0c;然后在查询时嵌入非结构化查询并检索与嵌入查询“最相似”的嵌入向量。向量存储负责存储嵌入数…

数据泄露的平均成本创历史新高

IBM Security 发布了年度数据泄露成本报告&#xff0c;显示数据泄露的全球平均成本在 2023 年达到 445 万美元&#xff0c;创下该报告的历史新高&#xff0c;并且比过去 3 年增加了 15%。 检测和升级成本在同一时间段内跃升了 42%&#xff0c;占违规成本的最高部分&#xff0c…

无涯教程-Lua - 垃圾回收

Lua使用自动内存管理&#xff0c;该管理使用基于Lua内置的某些算法的垃圾回收。 垃圾收集器暂停 垃圾收集器暂停用于控制垃圾收集器之前需要等待多长时间&#xff1b; Lua的自动内存管理再次调用它。值小于100意味着Lua将不等待下一个周期。同样&#xff0c;此值的较高值将导…

使用socket实现UDP版的回显服务器

文章目录 1. Socket简介2. DatagramSocket3. DatagramPacket4. InetSocketAddress5. 实现UDP版的回显服务器 1. Socket简介 Socket&#xff08;Java套接字&#xff09;是Java编程语言提供的一组类和接口&#xff0c;用于实现网络通信。它基于Socket编程接口&#xff0c;提供了…