【brpc学习实践十二】bthread

概览

bthread(代码)是baidu-rpc使用的M:N线程库,是其稳定和高效的关键组件。能更好地利用多核cpu,能在pthread中运行,需要注意的是,bthread的work stealing机制会da让任务pthread发生切换,从而让thread_local变量不可信,通常在bthread_usleep或这join的时候就有可能发生切换。

thread_local SomeObject obj;
...
SomeObject* p = &obj;
p->bar();
bthread_usleep(1000);
p->bar();

解决方案:

  • 不使用线程级变量传递业务数据。这是一种槽糕的设计模式,依赖线程级数据的函数也难以单测。判断是否滥用:如果不使用线程级变量,业务逻辑是否还能正常运行?线程级变量应只用作优化手段,使用过程中不应直接或间接调用任何可能阻塞的bthread函数。比如使用线程级变量的tcmalloc就不会和bthread有任何冲突。
  • 如果一定要(在业务中)使用线程级变量,使用bthread_key_create和bthread_getspecific。另外,bthread没有优先级,如果需要线程优先级,还是需要使用pthread。

全局视图

在进入代码实现介绍前,通过一张图了解bthread库的主要类/结构体,以及他们之间的关系。
在这里插入图片描述

TaskControl是单例模式的全局控制类。
TaskGroup由每一个Worker线程进行创建,和Worker线程一一对应。
TaskMeta是描述bthread任务的结构体,里面保存了bthread任务的各种信息,包括id,状态,任务函数和参数,栈空间等等。
ParkingLot则是通过futex实现的线程同步工具,用来唤醒(有任务提交时)和阻塞(无任务时)挂在上面的Worker线程(存在多个ParkingLot的目的是减少竞争)。
在关系上,TaskControl中创建了Worker线程,然后Worker线程在线程入口函数里面创建属于自己的TaskGroup。TaskGroup内有rq和remote_rq两个队列,里面保存的是外部提交需要执行的bthread任务id。

所以简单描述bthread库的运行过程就是:外部提交一个bthread任务到某个TaskGroup中的任务队列,然后通过ParkingLot唤醒休眠的Worker线程,Worker线程会从自己的TaskGroup中取出或者其他Worker线程的TaskGroup中“偷”出bthread任务进行执行。某种意义上来说,和线程池有些类似,但是bthread任务粒度更小,所以bthread库也被称为一种类协程库。

这种“偷”任务的实现称为work stealing机制。它保证了即使某一个Worker线程被阻塞,它内部任务队列的bthread任务也可以被其他Worker线程“偷取”执行成功。所以有时候在bthread任务中调用了bthread_join / bthread_usleep等函数把自己切出后,等到函数返回继续执行的时候会发现执行自己的线程已经变了,这时pthread local的数据已经失效,这是在使用的时候一个需要注意的点。

代码剖析

接下来开始介绍bthread库主体功能的代码实现,包括bthread库的初始化、bthread任务的提交、以及bthread任务的执行和较为核心的栈空间切换。
具体的,我们从一个bthread和pthread比较的demo开始。从功能、函数参数和调用形式上,两者非常相似,都是实现后台运行thread_func任务函数。但bthread额外拥有唯一标识自己的bthread id。

#include <iostream>#include "bthread.h"const char* bthread_flag = "bthread";
const char* pthread_flag = "pthread";void* thread_func(void* args) {std::cout << "flag: " << (char*)(args) <<", pthread_id: " << pthread_self() <<", bthread_id: " << bthread_self() << std::endl;return nullptr;
}int main(int argc, char** argv) {bthread_t bth;pthread_t pth;if (bthread_start_background(&bth, nullptr, thread_func, (void*)bthread_flag) != 0) {std::cout << "fail to create bthread" << std::endl;return -1;}if (pthread_create(&pth, nullptr, thread_func, (void*)pthread_flag) != 0) {std::cout << "fail to create pthread" << std::endl;return -1;}bthread_join(bth, nullptr);pthread_join(pth, nullptr);return 0;
}
// $./output/bin/bthread_test 
// flag: pthread, pthread_id: 140281875785472, bthread_id: 0
// flag: bthread, pthread_id: 140282267948800, bthread_id: 4294968064

初始化

bthread_start_background是我们启动一个bthread任务常用的接口函数。
它会首先判断自己的调用是不是由某个Worker线程发起的。这在BRPC框架下是很常见的场景,因为RPC回调中的主体业务代码都是运行在Worker线程内的。它判断的依据是pthread local级别的TaskGroup指针变量tls_task_group是否为空。之前有提到TaskGroup和Worker线程一一对应,实现的方式就是在Worker线程创建好TaskGroup后,赋值给自己的线程变量tls_task_group。

所以如果tls_task_group不为空,这说明bthread库已经初始化过了,并且位于Worker线程中,直接调用TaskGroup的start_background函数提交bthread任务到它的任务队列;否则就需要尝试进行bthread库的初始化,然后从其中随机挑选一个TaskGroup同样调用它的start_background函数进行任务提交。前面的demo很明显走的是后者的逻辑,需要进行bthread库的初始化。

int bthread_start_background(bthread_t* __restrict tid,const bthread_attr_t* __restrict attr,void * (*fn)(void*),void* __restrict arg) __THROW {bthread::TaskGroup* g = bthread::tls_task_group;if (g) {// start from workerreturn g->start_background<false>(tid, attr, fn, arg);}return bthread::start_from_non_worker(tid, attr, fn, arg);
}BASE_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,const bthread_attr_t* __restrict attr,void * (*fn)(void*),void* __restrict arg) {TaskControl* c = get_or_new_task_control();if (NULL == c) {return ENOMEM;}...return c->choose_one_group()->start_background<true>(tid, attr, fn, arg);
}

get_or_new_task_control函数实现了全局单例类TaskControl的构造和初始化。它是通过double-checked locking pattern(DCLP)方式实现的单例模式,使用了memory fence保证了正确性(感兴趣的可以看这篇论文(https://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf)和文章(https://zh.wikipedia.org/wiki/%E5%8F%8C%E9%87%8D%E6%A3%80%E6%9F%A5%E9%94%81%E5%AE%9A%E6%A8%A1%E5%BC%8F#Microsoft_Visual_C++_%E4%B8%AD%E7%9A%84%E4%BD%BF%E7%94%A8),C++11后通常用std::call_once实现单例模式)。

inline TaskControl* get_or_new_task_control() {base::atomic<TaskControl*>* p = (base::atomic<TaskControl*>*)&g_task_control;TaskControl* c = p->load(base::memory_order_consume);if (c != NULL) {return c;}BAIDU_SCOPED_LOCK(g_task_control_mutex);c = p->load(base::memory_order_consume);if (c != NULL) {return c;}c = new (std::nothrow) TaskControl;if (NULL == c) {return NULL;}int concurrency = FLAGS_bthread_min_concurrency > 0 ?FLAGS_bthread_min_concurrency :FLAGS_bthread_concurrency;if (c->init(concurrency) != 0) {LOG(ERROR) << "Fail to init g_task_control";delete c;return NULL;}p->store(c, base::memory_order_release);return c;
}

TaskControl的init函数,首先检查了入参concurrency的正确性,然后创建了concurrency数量的Worker线程。
默认情况下会创建concurrency = FLAGS_bthread_concurrency = 8 + 1 = 9条线程。
BRPC框架下默认情况会设置concurrency为系统CPU核心数 + 1,也可以通过ServerOptions.num_threads配置Worker线程的数量。

int TaskControl::init(int concurrency) {if (_concurrency != 0) {LOG(ERROR) << "Already initialized";return -1;}if (concurrency <= 0) {LOG(ERROR) << "Invalid concurrency=" << concurrency;return -1;}_concurrency = concurrency;..._workers.resize(_concurrency);   for (int i = 0; i < _concurrency; ++i) {const int rc = pthread_create(&_workers[i], NULL, worker_thread<true>, this);if (rc) {LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);return -1;}}...while (_ngroup == 0) {usleep(100);  // TODO: Elaborate}return 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/184735.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Agent举例与应用

什么是Agent OpenAI 应用研究主管 Lilian Weng 在一篇长文中提出了 Agent LLM&#xff08;大型语言模型&#xff09;记忆规划技能工具使用这一概念&#xff0c;并详细解释了Agent的每个模块的功能。她对Agent未来的应用前景充满信心&#xff0c;但也表明到挑战无处不在。 现…

Linux RN6752 驱动编写

一、概述 关于 RN6752V1 这个芯片这里就不做介绍了&#xff0c;看到这篇笔记的小伙伴应该都明白&#xff0c;虽然说 RN6752V1 芯片是 AHD 信号的解码芯片&#xff0c;但是也可以把芯片当做是一个 YUV 信号的 MIPI 摄像头&#xff0c;所以驱动的编写和 MIPI 摄像头无太大的区别。…

虚拟机虚拟化原理

目录 什么是虚拟化广义虚拟化狭义虚拟化 虚拟化指令集敏感指令集虚拟化指令集的工作模式监视器对敏感指令的处理过程&#xff1a; 虚拟化类型全虚拟化类虚拟化硬件辅助虚拟化 虚拟化架构裸金属架构宿主机模式架构 什么是虚拟化 虚拟化就是通过模仿下层原有的功能模块创造接口来…

中国版的 GPTs:InsCode AI 生成应用

前言 在上一篇文章 《InsCode&#xff1a;这可能是下一代应用开发平台&#xff1f;》中&#xff0c;我们介绍了一个新的应用开发平台 InsCode&#xff0c;它是基于云原生开发环境 云 IDE AI 辅助编程的一站式在线开发平台。 最近&#xff0c;InsCode 又推出了另一种全新的开…

Python三十个常见的脚本汇总

1、冒泡排序 2、计算x的n次方的方法 3、计算a*a b*b c*c …… 4、计算阶乘 n! 5、列出当前目录下的所有文件和目录名 6、把一个list中所有的字符串变成小写&#xff1a; 7、输出某个路径下的所有文件和文件夹的路径 8、输出某个路径及其子目录下的所有文件路径 9、输出某个路…

深度学习毕设项目 医学大数据分析 - 心血管疾病分析

# 1 前言 &#x1f6a9; 基于大数据的心血管疾病分析 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 1 课题背景 本项目的任务是利用患者的检查结果预测心血管疾病(CVD)的存在与否。 2 数据…

【开源视频联动物联网平台】流媒体传输协议HLS,FLV的功能和特点

HLS&#xff08;HTTP Live Streaming&#xff09;和FLV&#xff08;Flash Video&#xff09;都是用于视频流传输的协议或容器格式&#xff0c;但它们在某些方面有着显著的区别和特点。 HLS是一种由苹果公司开发的用于流媒体传输的协议&#xff0c;而FLV则是Adobe公司开发的用于…

【ArcGIS Pro二次开发】:CC工具箱1.1.4更新_免费_50+工具

CC工具箱1.1.4更新【2023.11.30】 使用环境要求&#xff1a;ArcGIS Pro 3.0 一、下载链接 工具安装文件及使用文档&#xff1a; https://pan.baidu.com/s/1OJmO6IPtMfX_vob3bMtvEg?pwduh5rhttps://pan.baidu.com/s/1OJmO6IPtMfX_vob3bMtvEg?pwduh5r 二、使用方法 1、在下…

从物理机到K8S:应用系统部署方式的演进及其影响

公众号「架构成长指南」&#xff0c;专注于生产实践、云原生、分布式系统、大数据技术分享。 概述 随着科技的进步&#xff0c;软件系统的部署架构也在不断演进&#xff0c;从以前传统的物理机到虚拟机、Docker和Kubernetes&#xff0c;我们经历了一系列变化。 这些技术的引入…

VBA技术资料MF88:测试Excel文件名是否有效

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。我的教程一共九套&#xff0c;分为初级、中级、高级三大部分。是对VBA的系统讲解&#xff0c;从简单的入门&#xff0c;到…

系列二十五、Spring设计模式之适配器模式

一、适配器模式 1.1、概述 适配器模式&#xff08;Adapter Pattern&#xff09;用于兼容不相关的接口之间&#xff0c;类似于一个桥梁&#xff0c;它结合了两个独立接口的功能&#xff0c;这种类型的设计属于结构型模式&#xff0c;为了方便大家伙的理解&#xff0c;我举个例子…

什么是美颜sdk?集成第三方美颜sdk的步骤

本文将深入探讨如何集成第三方美颜sdk&#xff0c;为直播平台引入更先进、更具吸引力的美颜特效。 第一步&#xff1a;选择合适的第三方美颜sdk 在开始集成美颜sdk之前&#xff0c;首要任务是选择适合自己直播平台需求的第三方美颜sdk。不同的sdk可能具有不同的特色和性能&a…

如何修改.exe文件的修改时间,亲测有效

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 演示视频&#xff1a; 10秒钟实现将文件的修改…

Linux:可视化管理工具Webmin的安装

一、下载 地址&#xff1a;Webmin官网 我这里下载的是1.700-1版本 二、安装 1、在虚拟机上新建目录并安装软件 mkdir /opt/webmin rpm -ivh webmin-1.700-1.noarch.rpm2、修改webmin的root密码 /usr/libexec/webmin/changepass.pl /etc/webmin root 1234563、修改端口(可…

docker读取字体异常

解决方法 docker容器中执行 apk add ttf-freefont 根据版本不同 apk add ttf-dejavu-fonts apk add ttf-bernoulli

【开源】基于Vue.js的医院门诊预约挂号系统的设计和实现

项目编号&#xff1a; S 033 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S033&#xff0c;文末获取源码。} 项目编号&#xff1a;S033&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 功能性需求2.1.1 数据中心模块2.1.2…

Ranger安装和使用

Ranger部署 1.准备 1.1 编译 Ranger编译&#xff08;已经编译过的话&#xff0c;直接看1.2&#xff09; 1.1.1 准备到Ranger官网下载ranger的源码&#xff1a;http://ranger.apache.org/download.html 1.1.2 Ranger编译的过程实在非虚拟机环境下完成的&#xff0c;下载好r…

chapter10-homework-Java

第十章作业 Homework01知识点 Homework02知识点 Homework03知识点 Homework04知识点 Homework05知识点 Homework06Homework07Homework08 Homework01 分析执行结果。 public static void main(String[] args) {Car_ c new Car_();Car_ c1 new Car_(100);System.out.println(…

Vue中 实现自定义指令(directive)及应用场景

一、Vue2 1. 指令钩子函数 一个指令定义对象可以提供如下几个钩子函数 (均为可选)&#xff1a; bind 只调用一次&#xff0c;指令第一次绑定到元素时调用。在这里可以进行一次性的初始化设置。inserted 被绑定元素插入父节点时调用 (仅保证父节点存在&#xff0c;但不一定已…

三.排序与分页

目录 一.排序数据二.分页 一.排序数据 1.排序规则 使用ORDER BY 子句排序 ASC&#xff08;ascend&#xff09;升序DESC&#xff08;descend&#xff09;降序 ORDER BY 子句在SELECT语句的结尾 2.单列排序 SELECT last_name, job_id, department_id, hire_date FROM e…