分享一个asio下使用channel来实现无需队列的安全的连续async_write的方法

分享一个asio下使用channel来实现无需队列的安全的连续async_write的方法

问题:不能直接用asio::async_write连续发送数据

下面这段代码是错误的(为了代码的可读性和易理解,请先忽略函数调用中参数不正确的问题):

asio::async_write(sock, "abc", [](){});
asio::async_write(sock, "def", [](){});

为什么不能这样用?

传统的经验和说法是,比如在Windows下,WSASend,WSARecv不能连续调用,必须等待上一个调用结束,才能开始下一个调用,所谓的上一个调用结束,指的是该调用的回调函数已经触发了,这里就是指上面代码中的lambda即:[](){}触发了。

在Windows下,asio::async_write调用的是WSASend,所以asio::async_write不能连续调用可以理解。

那为什么WSASend,WSARecv不能连续调用呢?这可能得问比尔盖次了,他这么设计的吧,你非要想能够这样用,你去通知比尔盖次,要求他限期整改,也许是个办法。

针对这个问题的常规解决办法

在asio官方代码示例中,其中有对这个问题的处理办法:

https://github.com/chriskohlhoff/asio/blob/master/asio/src/examples/cpp11/chat/chat_client.cpp

简单归纳一下就是如下这样:

std::deque<std::string> write_msgs_;void write(std::string msg)
{asio::post(io_context_,[this, msg = std::move(msg)]() mutable{bool write_in_progress = !write_msgs_.empty();write_msgs_.push_back(std::move(msg));if (!write_in_progress){do_write();}});
}void do_write(){asio::async_write(socket_,asio::buffer(write_msgs_.front().data(),write_msgs_.front().length()),[this](std::error_code ec, std::size_t /*length*/){if (!ec){write_msgs_.pop_front();if (!write_msgs_.empty()){do_write();}}else{socket_.close();}});}

即构造一个队列,将要发送的消息都保存在该队列里,然后,循环发送队列里的每条数据,每次发送完了(发送完了是指async_write的回调函数触发了),再发送队列里的下一个,依次类推。

如何使用asio的channel来解决这个问题

我是看到asio的这两段示例代码忽然想到的这个办法。

代码1:

https://github.com/chriskohlhoff/asio/blob/master/asio/src/examples/cpp20/channels/mutual_exclusion_2.cpp

  • 协程A
co_await write_lock_.async_send(deferred);co_await async_write(socket_, "<line>"_buf, deferred); // 1
co_await async_write(socket_, dynamic_buffer(data, length), deferred); // 2write_lock_.try_receive([](auto...){});
  • 协程B
co_await write_lock_.async_send(deferred);co_await async_write(socket_, "<heartbeat>\n"_buf, deferred); // 3write_lock_.try_receive([](auto...){});

注意,协程A中有两个连续async_write的代码,协程B中有一个async_write的代码,这段代码的本意是按照这个执行顺序:1 -> 2 -> 3

如果没有write_lock_这个锁的话,当代码执行到 1 这里,假设已经投递了write请求,但是请求还没结束,此时协程B执行了,协程B的async_write也被调用了,那么代码的执行顺序就变成了1 -> 3 -> 2 这就不符合程序意图了。但是如果没有协程B的话,即使没有write_lock_这个锁,这段代码也是没问题的,协程A中的那两个连续async_write的代码也没有问题(因为是协程,不是异步,如果是异步的连续两个async_write是有问题的,异步的连续async_write有问题怎么解决前面刚刚说过)。

代码2

using default_token = as_tuple_t<use_awaitable_t<>>;
using tcp_socket = default_token::as_default_on_t<tcp::socket>;auto [e2, nwritten] = co_await async_write(socket, asio::buffer(data, nread));

上面这段代码先放一放,先按照从简单到复杂的过程捋一遍代码:

  • 我们知道asio::async_write同时支持异步和协程的写法,异步的不再说了,协程的用法是这样的:
auto result = co_await asio::async_write(sock, asio::buffer("abc"), asio::use_awaitable);

问题1是:上面这段代码中,返回值 result 的类型是什么?

我在vs调试里的局部变量窗口看了一下,它的类型是 std::size_t ,也就是返回值表示写入了多少个字节的数据。

问题2是:上面这段代码是有可能抛异常的,怎么办?

通常用 try catch 包起来就行了,如下:

try
{auto result = co_await asio::async_write(sock, asio::buffer("abc"), asio::use_awaitable);
}
catch(...){}

但是有的人特别讨厌 try catch 这种方式,那能不能不抛异常,而是让错误信息作为返回值直接返回呢?

可以,代码改成下面这样即可:

auto [e1, n1] = co_await asio::async_write(sock, asio::buffer("abc"), asio::experimental::as_tuple(asio::use_awaitable));
// 返回值中的e1的类型是asio::error_code表示错误信息
// 返回值中的n1的类型是std::size_t表示实际发送了多少字节数据
// 上面这种用法就不会抛异常,只需要判断e1的值是否有错误即可

注意,此代码和前面的代码的区别是多了这个东西:asio::experimental::as_tuple

上面这个用法来自于asio官方示例:https://github.com/chriskohlhoff/talking-async/blob/master/episode1/step_6.cpp

有兴趣的可以看下这个视频,asio作者的演讲视频,演示了asio网络处理代码从异步到协程的演变过程,我个人觉得非常值得看看:https://www.bilibili.com/video/BV1yf4y1T7df/?spm_id_from=333.337.search-card.all.click

当我看到上面的 asio::experimental::as_tuple(asio::use_awaitable) 这个用法时,我很好奇他内部到底是怎么做的,于是我跟踪了一下代码,发现他是通过 rebind_executor 重新定制了一个特殊的 executor

asio::ip::tcp::socket 这个类型的定义是这样的:

template <typename Protocol, typename Executor>
class basic_stream_socket

他的第二个模板参数是 executor 的类型,这就意味着,第二个模板参数我们可以定制,可以自己做一个符合要求的 executor 类型,然后传给这个类。
asio::experimental::as_tuple(asio::use_awaitable) 这个东西,大致就是这个逻辑来实现的(我思考了一下,还是选择不贴as_tuple的实现代码了,贴了代码感觉反而导致整个文章看上去难度更大了,有兴趣的自己去跟踪代码看一下就行了)。

OK,到这里,就差不多了,此时我想,既然能定制出 as_tuple 这种东西,那我能不能也定制一个 as_lock 呢(as_lock是我随便取的一个名字)?然后在这个 as_lock 里再放一个 write_lock_ 这个锁呢?由于定制了 as_lock 之后,就相当于,这个socket和这个 as_lock 绑定了,同时这个 as_lock 里面还自带了一个锁,就相当于这个socket和这个锁也绑定了,那么我在调用 asio::async_write 时,就可以直接取出这个锁,先上锁,write完了,再释放锁,这样不就解决 asio::async_write 不能连续调用的问题了吗?

下面贴上部分代码和大致过程

  1. 首先定制一个类型
template <typename CompletionToken>
class with_lock_t
{
public:// 定制一个自己的executor类template <typename InnerExecutor>struct executor_with_default : InnerExecutor{template <typename InnerExecutor1>executor_with_default(const InnerExecutor1& ex,typename constraint<conditional<!is_same<InnerExecutor1, executor_with_default>::value,is_convertible<InnerExecutor1, InnerExecutor>,false_type>::type::value>::type = 0) noexcept: InnerExecutor(ex){ch = std::make_shared<experimental::channel<void()>>(ex, 1);}// 在自己定制的executor类中增加一个channel变量,用来当锁用的std::shared_ptr<experimental::channel<void()>> ch;};// 这段代码的目的是提供rebind_executor的功能(这段代码是照抄过来的)template <typename T>using as_default_on_t = typename T::template rebind_executor<executor_with_default<typename T::executor_type> >::other;
};

我记得with_lock_t的实现代码是参考 as_tuple_t 这个类来实现的,asio里面有好几个这种类,就是类里面有个executor_with_defaultrebind_executor<executor_with_default>这样的东西,实际上as_tuple_t类里面的实现细节我看了代码之后只是知道个大概,那个代码看起来太痛苦了,所以够用就行了,我实在是不想去扣那些细节了

  1. 自己封装一个 async_send 函数,用来替代 async_write 函数。
struct tcp_async_send_op
{auto operator()(auto state, auto sock_ref, auto buffer) -> void{auto& sock = sock_ref.get();co_await asio::dispatch(sock.get_executor(), asio::use_nothrow_deferred);// 此时socket的executor就是我们前面定制的那个// `with_lock_t`里面的`executor_with_default`// 取出里面的ch并调用async_send进行加锁操作即可co_await sock.get_executor().ch->async_send(asio::use_nothrow_deferred);// 发送数据auto [e1, n1] = co_await asio::async_write(sock, buffer, asio::use_nothrow_deferred);// 写完了,释放锁sock.get_executor()->ch.try_receive([](auto...) {});co_return{ e1, n1 };}
};template<typename AsyncStream,typename SendToken = asio::default_token_type<AsyncStream>>
inline auto async_send(AsyncStream& sock,auto buffer,SendToken&& token = asio::default_token_type<AsyncStream>())
{// 这里有个较陌生的函数co_composed后面有简单介绍return async_initiate<SendToken, void(asio::error_code, std::size_t)>(experimental::co_composed<void(asio::error_code, std::size_t)>(tcp_async_send_op{}, sock),token,std::ref(sock),buffer);
}

注意:上面这段代码作为演示用,保留和简化了代码,实际使用可能会有小错误。

  1. 可以无限制的调用了

经过上面的封装之后,我们就可以直接调用 async_send 函数来发送数据了,不需要自己做队列了,而且:

这样调用是安全的(使用协程,协程下这样调用本来就是安全的):

co_await async_send(socket_, "abc", asio::use_awaitable); 
co_await async_send(socket_, "def", asio::use_awaitable); 

这样调用也是安全的(使用异步):

async_send(socket_, "abc", [](auto...){}); 
async_send(socket_, "def", [](auto...){}); 

注意上面的 async_send 不仅仅是不需要队列了,它也是线程安全的,可以在任何线程中直接调用。

实际上 asio::channel 内部有一个 queue ,也就是说这种实现方式实际上使用了 asio::channel里面的那个 queue ,让它来代替了我们自己实现的那个队列了。

上面的代码仅是精简后的演示代码,实际代码比这复杂不少。实际代码可参考这里:https://github.com/zhllxt/asio3/blob/main/include/asio3/core/with_lock.hpp

注意:async_send 并不能解决前面所提到的这个问题:

  • 协程A
co_await async_write(socket_, "<line>"_buf, deferred); // 1
co_await async_write(socket_, dynamic_buffer(data, length), deferred); // 2
  • 协程B
co_await async_write(socket_, "<heartbeat>\n"_buf, deferred); // 3

也就是说,即使将前面代码中的 async_write 替换为自己封装的 async_send 依然是不行的。

上面这段代码可以简单理解为,你开了两个线程,在A线程中,你先发送包A的包头,接着发送包A的包体,在B线程中,又发送了一个包B,此时包B是有可能恰好夹在包A的包头包A的包体中间被发送的。

这实际上是业务上的数据发送顺序问题,上面所说的解决办法只能解决无法连续调用async_write的问题,不能解决数据发送顺序的问题。

最后说下co_composed函数

co_composed是用来解决,自己封装的异步函数中,无法直接使用c++20的协程的问题的。

如果你想自己封装适用于asio的异步函数,asio的官方示例提供了这样的方式:

struct my_writer : public asio::coroutine
{asio::ip::tcp::socket& sock;template <typename Self>void operator()(Self& self, error_code ec = {}, std::size_t bytes_transferred = 0){ASIO_CORO_REENTER(*this){// 在这里使用asio通过宏模拟的协程,这种方式的协程使用起来缚手缚脚的// 很不灵活// 注意:这里无法使用 co_await asio::async_write(sock, "abc");ASIO_CORO_YIELDasio::async_write(sock, "abc", std::move(self));if (ec)goto end;// ...end:// complete 会调用回调函数,这里回调函数有一个参数即asio::error_codeself.complete(ec);}}
};template <typename CompletionToken>
auto do_my_write(asio::ip::tcp::socket& sock)
{return asio::async_compose<CompletionToken, void(asio::error_code)>(my_writer{ sock }, token, sock);
}

之后,你就可以这样来调用了:

  1. 异步方式:
do_my_write(sock, [](asio::error_code){});
  1. 协程方式
do_my_write(sock, asio::use_awaitable);

这种实现方式的问题在于:它只能使用asio通过宏模拟的协程,这就导致在 struct my_writer::operator 函数中,无法使用c++20语言的原生协程,即无法使用 co_await co_return 这些关键字,解决办法就是使用asio的co_composed 但是我在使用co_composed的过程中发现,它目前还有问题,就是通过co_composed 包装的代码在debug下运行正常,但是在release下崩溃,我通过修改vs的编译参数,主要包括禁止内联和禁止内部函数,一定程度能解决,但依然无法完全解决,而我对asio的协程框架的底层实现也没有兴趣和耐心去仔细的琢磨,所以先放一边。如果想避开这个问题,那么不使用co_composed 即可,使用co_composed 的目的是,想让自己包装的函数同时支持异步调用和协程调用,如果你不考虑这个问题,那就直接写c++20原生协程,就没问题了。也就是说如下这样是没问题的:

asio::awaitable<void> my_writer(asio::ip::tcp::socket& sock)
{co_await asio::async_write(sock, "abc");
}

见:https://github.com/chriskohlhoff/asio/issues/1354

见:https://github.com/chriskohlhoff/asio/issues/1413

最后更新于 2024-01-22

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640176.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xshell配置隧道转移规则

钢铁知识库&#xff0c;一个学习python爬虫、数据分析的知识库。人生苦短&#xff0c;快用python。 xshell是什么 通俗点说就是一款强大ssh远程软件&#xff0c;可以方便运维人员对服务器进行管理操作&#xff0c;功能很多朋友们自行探索&#xff0c;今天只聊其中一个功能点那…

Numpy笔记:安装Numpy+ndarray基本属性+常用方法+索引和切片+广播+轴+范数

Numpy Python库&#xff0c;用于数组快速操作的各种API 支持常见的数组和矩阵操作ndarray处理多维数组 安装Numpy 检查PyCharm的Python运行环境 File–>Settings–>Project–>Python Interpreter检查Python Interpreter环境&#xff0c;例如base 点击Anaconda Prom…

从0开始学习C++ 第三十课 插入排序和快速排序

插入排序 (Insertion Sort) 概念&#xff1a; 插入排序是一种简单直观的排序算法&#xff0c;它的工作原理是通过构建有序序列&#xff0c;对于未排序数据&#xff0c;在已排序序列中从后向前扫描&#xff0c;找到相应位置并插入。 逐步分析&#xff1a; 从数组第二个元素开…

HNU-数据挖掘-实验2-数据降维与可视化

数据挖掘课程实验实验2 数据降维与可视化 计科210X 甘晴void 202108010XXX 文章目录 数据挖掘课程实验<br>实验2 数据降维与可视化实验背景实验目标实验数据集说明实验参考步骤实验过程1.对数据进行初步降维2.使用无监督数据降维方法&#xff0c;比如PCA&#xff0c;I…

既是API调试平台也是自动化测试工具?Apipost

Apipost提供可视化的API自动化测试功能&#xff0c;使用Apipost研发人员可以设计、调试接口&#xff0c;测试人员可以基于同一数据源进行测试&#xff0c;Apipost 接口自动化功能在上次更新中进行了逻辑调整&#xff0c;带来更好的交互操作、更多的控制器选择&#xff0c;同时新…

代码随想录算法训练营第四十一天 | 343.整数拆分、66.不同的二叉搜索树

343.整数拆分 题目链接&#xff1a;343.整数拆分 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 文章讲解/视频讲解&#xff1a;https://programmerca…

SpringMvc中拦截器的配置及应用

拦截器原理 在 Spring MVC 中&#xff0c;拦截器&#xff08;Interceptor&#xff09;是一种机制&#xff0c;用于拦截请求并在处理程序&#xff08;Controller&#xff09;执行之前或之后执行一些操作。拦截器允许您在请求的不同阶段&#xff08;如处理程序执行前、处理程序执…

AI大模型中的Bert

1.全方位上下文理解&#xff1a;与以前的模型&#xff08;例如GPT&#xff09;相比&#xff0c;BERT能够双向理解上下文&#xff0c;即同时考虑一个词 的左边和右边的上下文。这种全方位的上下文理解使得BERT能够更好地理解语言&#xff0c;特别是在理解词义、 消歧等复杂任务上…

智慧安防GB28181视频监控EasyCVR v3.5系统增加录像保存地址的配置

智慧安防监控EasyCVR视频管理平台能在复杂的网络环境中&#xff0c;将前端设备统一集中接入。在网络传输上&#xff0c;平台支持设备通过4G、5G、WIFI、有线等方式进行视频流的快捷传输&#xff0c;视频流经平台处理后可对外进行多格式的分发&#xff0c;实现多展示终端观看&am…

消息中间件之Kafka(二)

1.Kafka线上常见问题 1.1 为什么要对topic下数据进行分区存储? 1.commit log文件会受到所在机器的文件系统大小的限制&#xff0c;分区之后可以将不同的分区放在不同的机器上&#xff0c; 相当于对数据做了分布式存储&#xff0c;理论上一个topic可以处理任意数量的数据2.提…

TCP高并发服务器简介(select、poll、epoll实现与区别)

select、poll、epoll三者的实现&#xff1a; select实现TCP高并发服务器的流程&#xff1a; 一、创建套接字&#xff08;socket函数&#xff09;&#xff1a;二、填充服务器的网络信息结构体&#xff1a;三、套接字和服务器的网络信息结构体进行绑定&#xff08;bind函数&…

9、numpy当中维度的变化

在NumPy中&#xff0c;可以使用不同的函数和方法来处理数据的维度。 创建数组&#xff1a;可以使用numpy.array()函数来创建数组&#xff0c;可以是一维、二维、多维数组。 import numpy as np# 一维数组 a np.array([1, 2, 3])# 二维数组 b np.array([[1, 2, 3],[4, 5, 6]…

大模型笔记【3】 gem5 运行模型框架LLama

一 LLama.cpp LLama.cpp 支持x86&#xff0c;arm&#xff0c;gpu的编译。 1. github 下载llama.cpp https://github.com/ggerganov/llama.cpp.git 2. gem5支持arm架构比较好&#xff0c;所以我们使用编译LLama.cpp。 以下是我对Makefile的修改 开始编译&#xff1a; make UNAME…

Kotlin协程的JVM实现源码分析(下)

协程 根据 是否保存切换 调用栈 &#xff0c;分为&#xff1a; 有栈协程&#xff08;stackful coroutine&#xff09;无栈协程&#xff08;stackless coroutine&#xff09; 在代码上的区别是&#xff1a;是否可在普通函数里调用&#xff0c;并暂停其执行。 Kotlin协程&…

一、基础数据结构——2.队列——3.双端队列和单调队列1

参考资料&#xff1a;《算法竞赛》&#xff0c;罗勇军 郭卫斌 著 本博客作为阅读本书的学习笔记&#xff0c;仅供交流学习。 建议关注 罗勇军老师博客 删除线格式 今天想到考完研去找工作面试被问到的问题&#xff1a; C与C有什么区别&#xff1f; 我当时的答案&#xff08;毫无…

【git分支管理策略】

文章目录 前言一、分支管理策略简介二、git基本操作三、git分支远程分支本地分支 四、gitflow分支管理策略分支定义gitflow分支管理策略评价 五、GITHUB FLOW分支管理策略分支使用流程创建分支&#xff08;Create a branch&#xff09;新增提交(add and commit)提出 Pull 请求&…

C++泛型编程-类模板的项目实战实现基础的Vector的编写

请设计一个数组模板类&#xff08; Vector &#xff09;&#xff0c;完成对 int 、 char 、 float 、 double 以 及任意的自定义类等类型元素进行管理。 需求 a. 实现构造函数 b. 实现拷贝构造函数 c. 实现 cout << 操作 d. 实现下标访问符 [] 的重载操作 …

webench源码阅读

简介 webbench是一款用C编写的开源工具&#xff0c;主要用来在Linux下进行网站压力测试。最多可以模拟3万个连接去测试网站的负载能力&#xff0c;并可以设置运行的客户端数、测试时间、使用的http协议版本、请求方法、是否需要等待服务器响应等选项&#xff0c;最后统计每分钟…

CTF-PWN-堆-【chunk extend/overlapping-1】

文章目录 chunk extend/overlappingfastbin与topchunk相邻free时候不会合并unsortedbinchunk中与topchunk相邻的被free时会合并extend向后overlapping先修改header&#xff0c;再free&#xff0c;再malloc先free&#xff0c;再修改header&#xff0c;再malloc extend向前overla…

Filter简单了解

1、filter能干嘛 过滤器实际上就是对web资源进行拦截&#xff0c;做一些处理后交给下一个过滤器或者servlet处理&#xff0c;通常都是拦截request的&#xff0c;也可以对response进行拦截处理&#xff1b; 2、面试考点&#xff1a;filter能干嘛&#xff08;应用场景&#xff0…