分享一个asio下使用channel来实现无需队列的安全的连续async_write的方法
问题:不能直接用asio::async_write连续发送数据
下面这段代码是错误的(为了代码的可读性和易理解,请先忽略函数调用中参数不正确的问题):
asio::async_write(sock, "abc", [](){});
asio::async_write(sock, "def", [](){});
为什么不能这样用?
传统的经验和说法是,比如在Windows下,WSASend,WSARecv
不能连续调用,必须等待上一个调用结束,才能开始下一个调用,所谓的上一个调用结束,指的是该调用的回调函数已经触发了,这里就是指上面代码中的lambda即:[](){}
触发了。
在Windows下,asio::async_write
调用的是WSASend,所以asio::async_write
不能连续调用可以理解。
那为什么WSASend,WSARecv
不能连续调用呢?这可能得问比尔盖次了,他这么设计的吧,你非要想能够这样用,你去通知比尔盖次,要求他限期整改,也许是个办法。
针对这个问题的常规解决办法
在asio官方代码示例中,其中有对这个问题的处理办法:
https://github.com/chriskohlhoff/asio/blob/master/asio/src/examples/cpp11/chat/chat_client.cpp
简单归纳一下就是如下这样:
std::deque<std::string> write_msgs_;void write(std::string msg)
{asio::post(io_context_,[this, msg = std::move(msg)]() mutable{bool write_in_progress = !write_msgs_.empty();write_msgs_.push_back(std::move(msg));if (!write_in_progress){do_write();}});
}void do_write(){asio::async_write(socket_,asio::buffer(write_msgs_.front().data(),write_msgs_.front().length()),[this](std::error_code ec, std::size_t /*length*/){if (!ec){write_msgs_.pop_front();if (!write_msgs_.empty()){do_write();}}else{socket_.close();}});}
即构造一个队列,将要发送的消息都保存在该队列里,然后,循环发送队列里的每条数据,每次发送完了(发送完了是指async_write
的回调函数触发了),再发送队列里的下一个,依次类推。
如何使用asio的channel来解决这个问题
我是看到asio的这两段示例代码忽然想到的这个办法。
代码1:
https://github.com/chriskohlhoff/asio/blob/master/asio/src/examples/cpp20/channels/mutual_exclusion_2.cpp
- 协程A
co_await write_lock_.async_send(deferred);co_await async_write(socket_, "<line>"_buf, deferred); // 1
co_await async_write(socket_, dynamic_buffer(data, length), deferred); // 2write_lock_.try_receive([](auto...){});
- 协程B
co_await write_lock_.async_send(deferred);co_await async_write(socket_, "<heartbeat>\n"_buf, deferred); // 3write_lock_.try_receive([](auto...){});
注意,协程A中有两个连续async_write
的代码,协程B中有一个async_write
的代码,这段代码的本意是按照这个执行顺序:1 -> 2 -> 3
如果没有write_lock_
这个锁的话,当代码执行到 1
这里,假设已经投递了write请求,但是请求还没结束,此时协程B执行了,协程B的async_write
也被调用了,那么代码的执行顺序就变成了1 -> 3 -> 2
这就不符合程序意图了。但是如果没有协程B的话,即使没有write_lock_
这个锁,这段代码也是没问题的,协程A中的那两个连续async_write
的代码也没有问题(因为是协程,不是异步,如果是异步的连续两个async_write
是有问题的,异步的连续async_write
有问题怎么解决前面刚刚说过)。
代码2
using default_token = as_tuple_t<use_awaitable_t<>>;
using tcp_socket = default_token::as_default_on_t<tcp::socket>;auto [e2, nwritten] = co_await async_write(socket, asio::buffer(data, nread));
上面这段代码先放一放,先按照从简单到复杂的过程捋一遍代码:
- 我们知道
asio::async_write
同时支持异步和协程的写法,异步的不再说了,协程的用法是这样的:
auto result = co_await asio::async_write(sock, asio::buffer("abc"), asio::use_awaitable);
问题1是:上面这段代码中,返回值 result
的类型是什么?
我在vs调试里的局部变量窗口看了一下,它的类型是 std::size_t
,也就是返回值表示写入了多少个字节的数据。
问题2是:上面这段代码是有可能抛异常的,怎么办?
通常用 try catch
包起来就行了,如下:
try
{auto result = co_await asio::async_write(sock, asio::buffer("abc"), asio::use_awaitable);
}
catch(...){}
但是有的人特别讨厌 try catch
这种方式,那能不能不抛异常,而是让错误信息作为返回值直接返回呢?
可以,代码改成下面这样即可:
auto [e1, n1] = co_await asio::async_write(sock, asio::buffer("abc"), asio::experimental::as_tuple(asio::use_awaitable));
// 返回值中的e1的类型是asio::error_code表示错误信息
// 返回值中的n1的类型是std::size_t表示实际发送了多少字节数据
// 上面这种用法就不会抛异常,只需要判断e1的值是否有错误即可
注意,此代码和前面的代码的区别是多了这个东西:asio::experimental::as_tuple
上面这个用法来自于asio官方示例:https://github.com/chriskohlhoff/talking-async/blob/master/episode1/step_6.cpp
有兴趣的可以看下这个视频,asio作者的演讲视频,演示了asio网络处理代码从异步到协程的演变过程,我个人觉得非常值得看看:https://www.bilibili.com/video/BV1yf4y1T7df/?spm_id_from=333.337.search-card.all.click
当我看到上面的 asio::experimental::as_tuple(asio::use_awaitable)
这个用法时,我很好奇他内部到底是怎么做的,于是我跟踪了一下代码,发现他是通过 rebind_executor
重新定制了一个特殊的 executor
asio::ip::tcp::socket
这个类型的定义是这样的:
template <typename Protocol, typename Executor>
class basic_stream_socket
他的第二个模板参数是 executor
的类型,这就意味着,第二个模板参数我们可以定制,可以自己做一个符合要求的 executor
类型,然后传给这个类。
asio::experimental::as_tuple(asio::use_awaitable)
这个东西,大致就是这个逻辑来实现的(我思考了一下,还是选择不贴as_tuple
的实现代码了,贴了代码感觉反而导致整个文章看上去难度更大了,有兴趣的自己去跟踪代码看一下就行了)。
OK,到这里,就差不多了,此时我想,既然能定制出 as_tuple
这种东西,那我能不能也定制一个 as_lock
呢(as_lock是我随便取的一个名字)?然后在这个 as_lock
里再放一个 write_lock_
这个锁呢?由于定制了 as_lock
之后,就相当于,这个socket和这个 as_lock
绑定了,同时这个 as_lock
里面还自带了一个锁,就相当于这个socket和这个锁也绑定了,那么我在调用 asio::async_write
时,就可以直接取出这个锁,先上锁,write完了,再释放锁,这样不就解决 asio::async_write
不能连续调用的问题了吗?
下面贴上部分代码和大致过程
- 首先定制一个类型
template <typename CompletionToken>
class with_lock_t
{
public:// 定制一个自己的executor类template <typename InnerExecutor>struct executor_with_default : InnerExecutor{template <typename InnerExecutor1>executor_with_default(const InnerExecutor1& ex,typename constraint<conditional<!is_same<InnerExecutor1, executor_with_default>::value,is_convertible<InnerExecutor1, InnerExecutor>,false_type>::type::value>::type = 0) noexcept: InnerExecutor(ex){ch = std::make_shared<experimental::channel<void()>>(ex, 1);}// 在自己定制的executor类中增加一个channel变量,用来当锁用的std::shared_ptr<experimental::channel<void()>> ch;};// 这段代码的目的是提供rebind_executor的功能(这段代码是照抄过来的)template <typename T>using as_default_on_t = typename T::template rebind_executor<executor_with_default<typename T::executor_type> >::other;
};
我记得with_lock_t
的实现代码是参考 as_tuple_t
这个类来实现的,asio里面有好几个这种类,就是类里面有个executor_with_default
和rebind_executor<executor_with_default>
这样的东西,实际上as_tuple_t
类里面的实现细节我看了代码之后只是知道个大概,那个代码看起来太痛苦了,所以够用就行了,我实在是不想去扣那些细节了
- 自己封装一个
async_send
函数,用来替代async_write
函数。
struct tcp_async_send_op
{auto operator()(auto state, auto sock_ref, auto buffer) -> void{auto& sock = sock_ref.get();co_await asio::dispatch(sock.get_executor(), asio::use_nothrow_deferred);// 此时socket的executor就是我们前面定制的那个// `with_lock_t`里面的`executor_with_default`// 取出里面的ch并调用async_send进行加锁操作即可co_await sock.get_executor().ch->async_send(asio::use_nothrow_deferred);// 发送数据auto [e1, n1] = co_await asio::async_write(sock, buffer, asio::use_nothrow_deferred);// 写完了,释放锁sock.get_executor()->ch.try_receive([](auto...) {});co_return{ e1, n1 };}
};template<typename AsyncStream,typename SendToken = asio::default_token_type<AsyncStream>>
inline auto async_send(AsyncStream& sock,auto buffer,SendToken&& token = asio::default_token_type<AsyncStream>())
{// 这里有个较陌生的函数co_composed后面有简单介绍return async_initiate<SendToken, void(asio::error_code, std::size_t)>(experimental::co_composed<void(asio::error_code, std::size_t)>(tcp_async_send_op{}, sock),token,std::ref(sock),buffer);
}
注意:上面这段代码作为演示用,保留和简化了代码,实际使用可能会有小错误。
- 可以无限制的调用了
经过上面的封装之后,我们就可以直接调用 async_send
函数来发送数据了,不需要自己做队列了,而且:
这样调用是安全的(使用协程,协程下这样调用本来就是安全的):
co_await async_send(socket_, "abc", asio::use_awaitable);
co_await async_send(socket_, "def", asio::use_awaitable);
这样调用也是安全的(使用异步):
async_send(socket_, "abc", [](auto...){});
async_send(socket_, "def", [](auto...){});
注意上面的 async_send
不仅仅是不需要队列了,它也是线程安全的,可以在任何线程中直接调用。
实际上 asio::channel
内部有一个 queue
,也就是说这种实现方式实际上使用了 asio::channel
里面的那个 queue
,让它来代替了我们自己实现的那个队列了。
上面的代码仅是精简后的演示代码,实际代码比这复杂不少。实际代码可参考这里:https://github.com/zhllxt/asio3/blob/main/include/asio3/core/with_lock.hpp
注意:async_send
并不能解决前面所提到的这个问题:
- 协程A
co_await async_write(socket_, "<line>"_buf, deferred); // 1
co_await async_write(socket_, dynamic_buffer(data, length), deferred); // 2
- 协程B
co_await async_write(socket_, "<heartbeat>\n"_buf, deferred); // 3
也就是说,即使将前面代码中的 async_write
替换为自己封装的 async_send
依然是不行的。
上面这段代码可以简单理解为,你开了两个线程,在A线程中,你先发送包A的包头
,接着发送包A的包体
,在B线程中,又发送了一个包B
,此时包B
是有可能恰好夹在包A的包头
和包A的包体
中间被发送的。
这实际上是业务上的数据发送顺序问题,上面所说的解决办法只能解决无法连续调用async_write
的问题,不能解决数据发送顺序的问题。
最后说下co_composed函数
co_composed是用来解决,自己封装的异步函数中,无法直接使用c++20的协程的问题的。
如果你想自己封装适用于asio的异步函数,asio的官方示例提供了这样的方式:
struct my_writer : public asio::coroutine
{asio::ip::tcp::socket& sock;template <typename Self>void operator()(Self& self, error_code ec = {}, std::size_t bytes_transferred = 0){ASIO_CORO_REENTER(*this){// 在这里使用asio通过宏模拟的协程,这种方式的协程使用起来缚手缚脚的// 很不灵活// 注意:这里无法使用 co_await asio::async_write(sock, "abc");ASIO_CORO_YIELDasio::async_write(sock, "abc", std::move(self));if (ec)goto end;// ...end:// complete 会调用回调函数,这里回调函数有一个参数即asio::error_codeself.complete(ec);}}
};template <typename CompletionToken>
auto do_my_write(asio::ip::tcp::socket& sock)
{return asio::async_compose<CompletionToken, void(asio::error_code)>(my_writer{ sock }, token, sock);
}
之后,你就可以这样来调用了:
- 异步方式:
do_my_write(sock, [](asio::error_code){});
- 协程方式
do_my_write(sock, asio::use_awaitable);
这种实现方式的问题在于:它只能使用asio通过宏模拟的协程,这就导致在 struct my_writer::operator
函数中,无法使用c++20语言的原生协程,即无法使用 co_await co_return
这些关键字,解决办法就是使用asio的co_composed
但是我在使用co_composed
的过程中发现,它目前还有问题,就是通过co_composed
包装的代码在debug下运行正常,但是在release下崩溃,我通过修改vs的编译参数,主要包括禁止内联和禁止内部函数,一定程度能解决,但依然无法完全解决,而我对asio的协程框架的底层实现也没有兴趣和耐心去仔细的琢磨,所以先放一边。如果想避开这个问题,那么不使用co_composed
即可,使用co_composed
的目的是,想让自己包装的函数同时支持异步调用和协程调用,如果你不考虑这个问题,那就直接写c++20原生协程,就没问题了。也就是说如下这样是没问题的:
asio::awaitable<void> my_writer(asio::ip::tcp::socket& sock)
{co_await asio::async_write(sock, "abc");
}
见:https://github.com/chriskohlhoff/asio/issues/1354
见:https://github.com/chriskohlhoff/asio/issues/1413