生产环境使用boost::fiber

简介

boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。

fiber封装

boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。

这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:

图片

运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class IFiberTask {public:IFiberTask() = default;virtual ~IFiberTask() = default;IFiberTask(const IFiberTask& rhs) = delete;IFiberTask& operator=(const IFiberTask& rhs) = delete;IFiberTask(IFiberTask&& other) = default;IFiberTask& operator=(IFiberTask&& other) = default;virtual void execute() = 0;public:inline static std::atomic_size_t fibers_size {0};
};template <typename Func>
class FiberTask: public IFiberTask {public:explicit FiberTask(Func&& func) :func_{std::move(func)} { }~FiberTask() override = default;FiberTask(const FiberTask& rhs) = delete;FiberTask& operator=(const FiberTask& rhs) = delete;FiberTask(FiberTask&& other)  noexcept = default;FiberTask& operator=(FiberTask&& other)  noexcept = default;void execute() override {fibers_size.fetch_add(1);func_();fibers_size.fetch_sub(1);}private:Func func_;
};

IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。

外部队列基于boost::fibers::buffered_channel实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。

接着是任务装箱,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template<typename Func, typename... Args>
auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {// 捕获lambda极其参数auto capture = [func = std::forward<Func>(func),args = std::make_tuple(std::forward<Args>(args)...)]() mutable {return std::apply(std::move(func), std::move(args));};// 任务的返回值类型using task_result_t = std::invoke_result_t<decltype(capture)>;// 该任务packaged_task的using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;// 创建任务对象packaged_task_t task {std::move(capture)};// 装箱到FiberTask中using task_t = fiber::FiberTask<packaged_task_t>;// 获取packaged_task的futureauto result_future = task.get_future();// 添加到buffered_channel中auto status = work_queue_.push(std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));if (status != boost::fibers::channel_op_status::success) {return std::optional<std::decay_t<decltype(result_future)>> {};}return std::make_optional(std::move(result_future));
}

代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。

接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 注册调度算法为抢占式调度
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true);
// 创建task类型
auto task_tuple = typename decltype(work_queue_)::value_type {};// 从队列中获取任务
while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {// 解包auto& [launch_policy, task_to_run] = task_tuple;// 触发 fiber并detachboost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {task->execute();}).detach();
}

抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。

以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。

下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DefaultPool {private:static auto* Pool() {const static size_t size = std::thread::hardware_concurrency();static fiber::FiberPool pool(size, size*8);return &pool;}public:template<typename Func, typename... Args>static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);}template<typename Func, typename... Args>static auto SubmitJob(Func &&func, Args &&... args) {return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);}static void Close() {Pool()->CloseQueue();}private:DefaultPool() = default;
};

其他组件封装

上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。

redis客户端封装

同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。

综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。

redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template<typename Type>
using Promise = boost::fibers::promise<Type>;template<typename Type>
using Future = boost::fibers::future<Type>;Future<long long > Del(const StringView &key) {auto promise = std::make_unique<Promise<long long >>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<OptionalString> Get(const StringView &key) {auto promise = std::make_unique<Promise<OptionalString>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}Future<bool> Set(const StringView &key, const StringView &val) {auto promise = std::make_unique<Promise<bool>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future;
}

注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。

grpc客户端封装

跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class GrpcClient {public:explicit GrpcClient(const ClientOption& option);~GrpcClient();// 对外提供的接口Future<meta::HelloResponse> Call(const meta::HelloRequest& request);private:// worker线程执行的逻辑void Work();private:std::unique_ptr<grpc::CompletionQueue> completion_queue_;std::thread worker_;std::shared_ptr<grpc::Channel> channel_;gpr_timespec timespec_{};
};

异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:

1
2
3
4
5
6
struct CallData {grpc::ClientContext context;          // grpc上下文Promise<meta::HelloResponse> promise; // Promise对象grpc::Status status;                  // grpc调用状态meta::HelloResponse response;         // 相应包
};

Call函数中的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建上下文对象
auto data = new CallData;
// 设置超时时间
data->context.set_deadline(timespec_);
// 创建桩
meta::HelloService::Stub stub(channel_);
auto future = data->promise.get_future();
// 异步调用,添加到完成队列中
auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get());
// 绑定response、status,并将上下文对象作为tag传下去
rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data));
return future;

data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CallData* data = nullptr;
bool ok = false;
// 获取状态完毕的数据
while (completion_queue_->Next((void**)&data, &ok)) {// 判断队列是否已经结束if (!ok) {break;}// 如果grpc状态ok,则赋值if (data->status.ok()) {data->promise.set_value(std::move(data->response));} else {// 否则设置异常data->promise.set_exception(std::make_exception_ptr(std::runtime_error(data->status.error_message())));}// 删除数据delete data;data = nullptr;
}

调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。

上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。

参考

  • fiberpool代码
  • 生产环境使用fiber
  • grpc异步客户端
  • hiredis
  • 生产环境使用boost::fiber

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/127590.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

appium如何连接多台设备

我们在做app自动化的时候&#xff0c;若要考虑兼容性问题&#xff0c;需要跑几台设备&#xff0c;要是一台一台的跑比较耗时&#xff0c;因此需要考虑使用多线程来同时操作多台设备。 1.我们拿两台设备来模拟操作下&#xff0c;使用&#xff1a;adb devices查看连接状况&#…

sass制作一个简单的星空背景

最近遇到一个有意思的东西&#xff0c;需要制作一个如下图的背景&#xff1a; 如果使用js或者canvas应该是比较简单的&#xff0c;正好最近在使用sass&#xff0c;那么纯sass能否实现这种效果呢&#xff1f;来试一下 首先来生成这些点&#xff1a; <div class"conten…

“恒山光量子”首秀!玻色量子联合移动云发表物理1区Top期刊SCPMA论文

2023年5月&#xff0c;北京玻色量子科技有限公司&#xff08;以下简称“玻色量子”&#xff09;联合移动云在我国知名科技期刊平台《中国科学&#xff1a;物理学 力学 天文学》英文版上发表了以“Optical experimental solution for the multiway number partitioning problem …

K8s集群

统一时间&#xff1a;ntpdate(都做) ntpdate -b ntp1.aliyun.com */1 * * * * /usr/sbin/ntpdate -b ntp1.aliyun.com systemctl status docker vi /etc/docker/daemon.json systemctl restart docker m: vim kubernetes.sh cat >> /etc/yum.repos.d/kubernetes.repo…

中国电子学会主办 第四届ATEC科技精英赛报名启动

11月1日由中国电子学会主办的第四届ATEC科技精英赛&#xff08;ATEC2023&#xff09;正式启动报名。 ATEC科技精英赛是主要面向中国籍计算机等专业在校学生、人工智能及网络安全行业研究者和从业者的一场高水平的智能科技挑战赛&#xff0c;意在贯彻落实党中央、国务院关于推动…

【OpenCV实现图像梯度,Canny边缘检测】

文章目录 概要图像梯度Canny边缘检测小结 概要 OpenCV中&#xff0c;可以使用各种函数实现图像梯度和Canny边缘检测&#xff0c;这些操作对于图像处理和分析非常重要。 图像梯度通常用于寻找图像中的边缘和轮廓。在OpenCV中&#xff0c;可以使用cv2.Sobel()函数计算图像的梯度…

期 货 跟 单/资 管 分 仓/镜像跟单/外 盘 分 仓的全面介绍!

期货跟单是经过科学的筛选找出合格的目标样本数据然后利用样本交易数据转化成未来实际账面利润的综合性过程。 期货跟单分为正向跟单和反向跟单&#xff0c;简单地说就是找出期高手正向跟随高手赚取收益或找出期货低手反向跟随赚取收益。 期货跟单软件是实现跟单过程自动化的工…

设计思想培养:装饰者模式下的RecyclerView添加头、尾

用一个设计模式培养高复用、低耦合思想 前言Android中的装饰者代码实现第一步&#xff1a;创建装饰器DecorateAdapter第二步&#xff1a;处理头部、中间内容、尾部的绑定关系第三步&#xff1a;装饰器的使用第四步&#xff1a;改进、直接封装一个View出来 总结 前言 一个高复用…

SpringCloud Gateway实现请求解密和响应加密

文章目录 前言正文一、项目简介二、核心代码2.1 自定义过滤器2.2 网关配置2.3 自定义配置类2.4 加密组件接口2.5 加密组件实现&#xff0c;AES算法2.6 启动类&#xff0c;校验支持的算法配置 三、请求报文示例四、测试结果4.1 网关项目启动时4.2 发生请求时 前言 本文环境使用比…

UI动效的都可以用哪些工作来制作

随着UI设计的不断发展&#xff0c;UI动效越来越多地应用于现实生活中。手机&#xff0c;iPad、计算机、网页和其他设备被广泛使用&#xff0c;所以问题来了&#xff0c;为什么UI动态效果越来越被广泛使用&#xff1f;它的优点是什么&#xff1f;哪些软件可以设计UI动态效果&…

车载测试相比软件测试,前景会稍好一点吗?

> 如果个人是汽车、电气、工业工程相关专业的学历背景&#xff0c;那可以考虑从事车载测试&#xff08;看上图&#xff09;。> 如果不是以上专业&#xff0c;那就要慎重啦。 车载测试是测试行业的一个分支&#xff0c;最近十年一直存在&#xff0c;并不是这一两年才有的…

python 数据挖掘库orange3 介绍

orange3 是一个非常适合初学者的data mining library. 它让使用者通过拖拽内置的组件来形成工作流。让你不需要写任何代码就可以体验到数据挖掘和可视化的魅力。 它的桌面如下&#xff0c;这里我创建了 3 个节点&#xff0c;分别是数据集、小提琴图&#xff0c;散点图 其中 …

UE5——网络——RPC

RPC&#xff08;这个是官方文档的资料&#xff09; 要将一个函数声明为 RPC&#xff0c;您只需将 Server、Client 或 NetMulticast 关键字添加到 UFUNCTION 声明。 例如&#xff0c;若要将某个函数声明为一个要在服务器上调用、但需要在客户端上执行的 RPC&#xff0c;您可以…

【Linux】配置JDKTomcat开发环境及MySQL安装和后端项目部署

目录 一. JDK及tomcat安装 二&#xff0c;安装Tomcat 三&#xff0c;MySQL安装 四、后端部署 前言&#xff1a; 今天我们就来在Linux上安装JDK及tomcat&#xff0c;MySQL&#xff0c;希望你可以通过这一博客&#xff0c;找到你的答案&#xff01;&#xff01;&#xff01; …

2023-macOS下安装anaconda,终端自动会出现(base)字样,如何取消

2023-macOS下安装anaconda&#xff0c;终端自动会出现(base)字样&#xff0c;如何取消 安装后&#xff0c;我们再打开终端&#xff0c;就会自动出现了&#xff08;base&#xff09; 就会出现这样子的&#xff0c;让人头大&#xff0c; 所以我们要解决它 具体原因是 安装了anac…

从JDBD的封装方面重新认识Mybaits

前言&#xff1a;SQLSession是对JDBC的封装 MyBatis 是一个 Java 持久化框架&#xff0c;它通过对 JDBC 的封装来简化数据库访问操作。核心的 SQLSession 对象是 MyBatis 的核心组件之一&#xff0c;负责管理数据库连接、执行 SQL 语句以及映射查询结果等功能。 具体来说&…

Windows11恢复组策略编辑器功能的方法

原因分析 日常工作学习中,对 Windows 计算机上的问题进行故障排除时,有些高级用户经常使用组策略编辑器轻松修复它。通过其分层结构,您可以快速调整应用于用户或计算机的设置。如果搜索结果中缺少组策略编辑器,则可能必须使用注册表编辑器作为疑难解答工具,这是一种更复杂…

力扣:147. 对链表进行插入排序(Python3)

题目&#xff1a; 给定单个链表的头 head &#xff0c;使用 插入排序 对链表进行排序&#xff0c;并返回 排序后链表的头 。 插入排序 算法的步骤: 插入排序是迭代的&#xff0c;每次只移动一个元素&#xff0c;直到所有元素可以形成一个有序的输出列表。每次迭代中&#xff0c…

0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)

在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。 关于滑动窗口&#xff0c;可以先看下《0基础学习PyFlink——个数滑动窗口&#xff08;Sliding Count Windows&#x…