【boost网络库从青铜到王者】第六篇:asio网络编程中的socket异步读(接收)写(发送)

文章目录

  • 1、简介
  • 2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)
  • 3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)
  • 4、异步写void AsyncSendToSocket(const std::string& buffer)
  • 5、异步读void AsyncReadSomeToSocket(const std::string& buffer)
  • 6、异步读void AsyncReceiveToSocket(const std::string& buffer)
  • 7、总结

1、简介

本文介绍boost asio的异步读写操作及注意事项,为保证知识便于读者吸收,仅介绍api使用的代码片段。下一节再编写完整的客户端和服务器程序。

所以我们定义一个session类,这个session类表示服务器处理客户端连接的管理类

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;};#endif // !__ASYNC_DEMO_H_2023_8_22__

session类定义了一个socket成员变量,负责处理对端**(ip+端口)的连接读写,封装了Connect**函数:

#include"async_demo.h"Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):socket_(socket)
{send_buffer_ = nullptr;
}bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {socket_->connect(ep);return true;
}

这里只是简单意思一下,下面核心介绍异步读写api的使用。

2、异步写 void AsyncWriteSomeToSocketErr(const std::string& buffer)

在写操作前,我们先封装一个Buffer结构。用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)

写了两个构造函数,两个参数的负责构造写节点,一个参数的负责构造读节点。

#pragma once
#include<iostream>//trv
const int RECVSIZE = 1024;
class Buffer {
public://发送消息协议//param 协议首地址,协议总长度Buffer(const char* msg,int32_t total_len):msg_(new char[total_len]),total_len_(total_len),cur_len_(0){memcpy(msg_, msg, total_len);}//接收消息协议//param 协议总长度,当前接收协议长度Buffer(int32_t total_len):total_len_(total_len),cur_len_(0){msg_ = new char[total_len];}~Buffer() {delete[] msg_;}char* GetMsg() {return msg_;}int32_t GetTotalLen() {return total_len_;}void SetTotalLen(int32_t total_len) {total_len_ = total_len;}int32_t GetCurLen() {return cur_len_;}void SetCurLen(int32_t cur_len) {cur_len_ = cur_len;}
private://消息协议的首地址char* msg_;//消息协议的总长度int32_t total_len_;//消息协议的当前发送长度 +上已经发送长度 = total_len (已经处理的长度(已读的长度或者已写的长度))int32_t cur_len_;
};

接下来为Session添加异步写发送数据操作和负责发送写数据的节点。

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);
private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

在这里插入图片描述

AsyncWriteSomeToSocketErr函数为我们封装的写操作,AsyncWriteSomeToSocketErr为异步写操作回调的函数,为什么会有三个参数呢,我们可以看一下asio源码:

  template <typename ConstBufferSequence,BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,std::size_t)) WriteTokenBOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(WriteToken,void (boost::system::error_code, std::size_t))async_write_some(const ConstBufferSequence& buffers,BOOST_ASIO_MOVE_ARG(WriteToken) tokenBOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((async_initiate<WriteToken,void (boost::system::error_code, std::size_t)>(declval<initiate_async_send>(), token,buffers, socket_base::message_flags(0)))){return async_initiate<WriteToken,void (boost::system::error_code, std::size_t)>(initiate_async_send(this), token,buffers, socket_base::message_flags(0));}

async_write_some是异步写的函数,这个异步写函数有两个参数,第一个参数为ConstBufferSequence常引用类型的buffer,就是构造buffer结构。

第二个参数为WriteToken类型,而WriteToken在上面定义了,是一个函数对象类型,返回值为void,参数为error_codesize_t,所以我们为了调用async_write_some函数也要传入一个符合WriteToken定义的函数,就是我们声明的AsyncWriteSomeToSocketErr函数,前两个参数为WriteToken规定的参数,第三个参数为Buffer的智能指针,这样通过智能指针保证我们发送的Buffer数据生命周期延长。

我们看一下AsyncWriteSomeToSocketErr函数的具体实现:

void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {//先构造一个发送节点send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//然后构造async_write_some的参数buffer和回调和函数socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;return;}if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));}
}

这段代码的作用是实现异步发送数据的功能,主要包括两个函数:AsyncWriteSomeToSocketErrAsyncWriteSomeCallBackErr

  • AsyncWriteSomeToSocketErr 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:

    • 首先,使用 std::make_shared 创建一个 Buffer 对象,这个对象用于存储要发送的数据。
    • 然后,使用 socket_->async_write_some 函数触发异步写操作,将数据写入套接字。在这里,你绑定了回调函数 AsyncWriteSomeCallBackErr
  • AsyncWriteSomeCallBackErr 函数是异步写操作完成后的回调函数。它的主要作用是处理写操作的结果,检查是否发生错误,以及是否需要继续发送剩余的数据。具体步骤如下:

    • 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
    • 然后没有错误,检查已传输的字节数 bytes_transferred 加上 buffer_ 对象中已经发送的字节数 buffer_->GetCurLen() 是否小于总的数据长度 buffer_->GetTotalLen()。如果小于总长度,说明还有剩余数据需要发送。
    • 如果有剩余数据需要发送,就更新 buffer_ 对象中的已发送字节数 buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred),然后继续触发异步写操作,将剩余的数据发送出去。这里再次调用 socket_->async_write_some 并绑定了相同的回调函数,以便在写操作完成后再次检查和处理。
    • 总体来说,这段代码实现了异步发送数据的逻辑,确保了数据的完整性和发送顺序。通过使用回调函数,可以在每次写操作完成后处理相应的逻辑,包括检查错误、更新已发送字节数以及触发下一次写操作。

AsyncWriteSomeToSocketErr函数里判断如果已经发送的字节数没有达到要发送的总字节数,那么久更新节点已经发送的长度,然后计算剩余要发送的长度,如果有数据未发送完,再次调用async_write_some函数异步发送。

但是这个函数并不能投入实际应用,因为async_write_some回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节。
在这里插入图片描述
此时我们调用async_write_some发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。

而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr,因为boost::asio封装的时epoll和iocp等多路复用模型。当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。

比如我们如下代码:

//用户发送数据
AsyncWriteSomeToSocketErr("Hello World!");
//用户无感知下层调用情况又一次发送了数据
AsyncWriteSomeToSocketErr("Hello World!");

那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World!

所以对端收到的数据很可能是HelloHello World! World!

3、异步写void AsyncWriteSomeToSocket(const std::string& buffer)

那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);
private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;bool send_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

定义了bool变量send_padding_,该变量为true表示一个节点还未发送完,false代表发送完成。send_padding_ 用来缓存要发送的消息协议节点,是一个队列。

我们实现异步发送功能:

Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):socket_(socket),send_padding_(false)
{send_buffer_ = nullptr;if (!send_queue_.empty()) {send_queue_.pop();}
}

函数实现:

void Session::AsyncWriteSomeToSocket(const std::string& buffer) {//发送节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完的数据,false,表示没有,true表示还有if (send_padding_) {return;}//异步发送数据socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//取出队列中队首元素std::shared_ptr<Buffer> send_data = send_queue_.front();send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);//数据未发送完,继承调用异步函数取出队首元素发送if (send_data->GetCurLen() < send_data->GetTotalLen()) {socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),send_data->GetTotalLen() - send_data->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//如果这个数据发送完了,把数据节点取出来send_queue_.pop();//判断队列里面是否还有下一个数据if (send_queue_.empty()) {send_padding_ = false;return;}//有数据则继续发送if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送的地址偏移socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));}
}

这段代码的作用是实现异步发送数据并保证发送顺序的逻辑,主要包括两个函数:AsyncWriteSomeToSocketAsyncWriteSomeCallBack

  • AsyncWriteSomeToSocket 函数的作用是将数据放入发送队列中,并触发异步写操作。具体步骤如下:

    • 首先,将一个新的 Buffer 对象(用于存储要发送的数据)插入到 send_queue_ 队列中。
    • 接着,检查是否还有未发完的数据,如果有,说明还在等待前一次异步写操作完成,直接返回。
    • 如果没有未发完的数据,说明可以触发异步发送操作,使用 socket_->async_write_some 函数将数据写入套接字,并绑定回调函数 AsyncWriteSomeCallBack
  • AsyncWriteSomeCallBack 函数是异步写操作完成后的回调函数。其主要作用是处理写操作的结果,继续发送队列中的下一个数据。具体步骤如下:

    • 首先,检查 err 参数,如果其值不为 0,表示发送出现错误,就输出错误信息并返回。
    • 然后,取出队列中队首元素,该元素是一个 Buffer 对象,表示待发送的数据。
    • 接着,更新这个数据的已发送字节数 send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred)
    • 然后,检查数据是否已经全部发送完,如果未发送完,则继续触发异步写操作,将剩余的数据发送出去。
    • 如果这个数据已经发送完毕,就从队列中移除这个数据节点,并检查队列是否还有下一个数据。
    • 如果队列不为空,表示还有数据需要发送,就取出下一个数据节点,更新已发送字节数,并触发下一个异步写操作,以便发送下一个数据。

这段代码的设计确保了数据的发送顺序,即使在异步发送的情况下也可以保持数据的完整性和顺序。如果发送错误,它也会正确地处理错误情况。

async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度,这样我们每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,所以async_send不能和async_write_some混合使用,我们基于async_send封装另外一个发送函数。

4、异步写void AsyncSendToSocket(const std::string& buffer)

函数定义:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);
private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;bool send_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncSendToSocket(const std::string& buffer) {//把发送消息协议构造成节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完数据if (send_padding_) {return;}//异步发送数据socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {//发送数据失败std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//因为调用的是async_send()它的设计目标是简化发送数据的过程,\让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可send_queue_.pop();if (send_queue_.empty()) {send_padding_ = false;return;}if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送发生地址偏移socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));}
}

这段代码的目的是实现异步发送数据,并在发送完成后调用回调函数进行处理。这与你之前提到的代码逻辑类似,但使用了 async_send 函数代替了 async_write_some,并且没有需要手动维护已发送字节数。

  • 具体的逻辑如下:

    • AsyncSendToSocket 函数用于将数据包装成一个 Buffer 对象并插入发送队列 send_queue_ 中。

    • 接着,检查是否已经有数据正在等待发送(send_padding_ 是否为 true),如果是,则说明还在等待前一次异步发送完成,直接返回。

    • 如果没有等待发送的数据,就调用 socket_->async_send 函数进行异步发送。这个函数会将数据发送到套接字,并在发送完成后调用回调函数 AsyncSendCallBack

    • AsyncSendCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示发送出现错误,输出错误信息并返回。

    • 如果发送成功,就从发送队列中弹出已发送的数据 (send_queue_.pop()),并检查队列是否为空。如果队列为空,说明没有待发送的数据,将 send_padding_ 设置为 false 表示没有数据需要发送。

    • 如果队列不为空,表示还有待发送的数据,就取出队列的头部元素,即下一个要发送的数据,然后调用 socket_->async_send 再次异步发送数据。这个过程会重复,直到队列中的数据全部发送完毕。

总体而言,这段代码实现了异步发送数据的功能,保证了发送的顺序,同时也能正确处理发送过程中的错误。不同之处在于,它使用了 async_send 函数,该函数封装了发送的细节,使得发送数据更加方便。

5、异步读void AsyncReadSomeToSocket(const std::string& buffer)

接下来介绍异步读操作,异步读操作和异步的写操作类似同样又async_read_someasync_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。

先基于async_read_some封装一个读取的函数AsyncReadSomeToSocket,同样在Session类的声明中添加一些变量:

函数定义:

#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);//异步读 优先取这个void AsyncReadSomeToSocket(const std::string& buffer);void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;std::shared_ptr<Buffer> recv_buffer_;bool send_padding_;bool recv_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncReadSomeToSocket(const std::string& buffer) {//判断是否正在读数据,这里第一次读数据if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//异步读取数据socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//判断读取的字节数,没有读取完继续读取recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//将数据投递到队列里交给逻辑线程处理,此处略去//如果读完了则将标记置为falserecv_padding_ = false;recv_buffer_ = nullptr;
}

这段代码的主要功能是异步读取数据,并在读取完成后调用回调函数 AsyncReadSomeCallBack 处理数据。以下是代码逻辑的详细解释:

  • AsyncReadSomeToSocket 函数用于异步读取数据。在这个函数中,首先检查 recv_padding_ 是否为 true。如果为 true,表示正在读取数据,直接返回,避免重复读取。

  • 如果 recv_padding_false,说明可以开始读取数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要读取的数据。

  • 接着,调用 socket_->async_read_some 函数进行异步读取数据。这个函数会在读取完成后调用回调函数 AsyncReadSomeCallBack

  • AsyncReadSomeCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示读取出现错误,输出错误信息并返回。

  • 如果读取成功,将已读取的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,检查是否已经读取完所有数据,即 CurLen 是否小于 TotalLen

  • 如果未读取完,继续调用 socket_->async_read_some 函数继续异步读取剩余的数据,直到读取完所有数据。

  • 如果读取完了,将 recv_padding_ 置为 false,表示没有正在读取的数据。最后,清空 recv_buffer_ 对象,以便下次读取新的数据。

这段代码实现了异步读取数据的逻辑,确保数据被正确读取并处理。如果数据没有完全读取,它会继续异步读取剩余的部分,直到读取完整个数据。如果有新的数据需要读取,可以再次调用 AsyncReadSomeToSocket

6、异步读void AsyncReceiveToSocket(const std::string& buffer)

我们基于async_receive再封装一个接收数据的函数:

函数声明:

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);//异步读 优先取这个void AsyncReadSomeToSocket(const std::string& buffer);void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);void AsyncReceiveToSocket(const std::string& buffer);void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;std::shared_ptr<Buffer> recv_buffer_;bool send_padding_;bool recv_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

函数实现:

void Session::AsyncReceiveToSocket(const std::string& buffer) {//判断是否有数据正在读取if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);recv_padding_ = false;recv_buffer_ = nullptr;
}

这段代码看起来非常类似于前面提到的异步读取数据的代码。它实现了异步接收数据的逻辑,以下是代码的详细解释:

  • AsyncReceiveToSocket 函数用于异步接收数据。首先,它检查 recv_padding_ 是否为 true,如果为 true,表示已经有数据在读取,直接返回,以避免重复接收。

  • 如果 recv_padding_false,说明可以开始接收数据。这时,创建一个 Buffer 对象 recv_buffer_,并初始化为要接收的数据。

接着,调用 socket_->async_receive 函数进行异步接收数据。这个函数会在接收完成后调用回调函数 AsyncReceiveCallBack

AsyncReceiveCallBack 回调函数中,首先检查错误码 err,如果不为 0,表示接收出现错误,输出错误信息并返回。

如果接收成功,将已接收的字节数添加到 recv_buffer_ 的当前长度 CurLen 中。然后,将 recv_padding_ 置为 false,表示没有正在接收的数据。

最后,清空 recv_buffer_ 对象,以便下次接收新的数据。

这段代码实现了异步接收数据的逻辑,确保数据被正确接收并处理。如果数据没有完全接收,它会继续异步接收剩余的部分,直到接收完整个数据。如果有新的数据需要接收,可以再次调用 AsyncReceiveToSocket

同样async_read_someasync_receive不能混合使用,否则会出现逻辑问题。

7、总结

总体代码声明:

#pragma once
#ifndef __ASYNC_DEMO_H_2023_8_22__
#define __ASYNC_DEMO_H_2023_8_22__
#include<iostream>
#include<boost/asio.hpp>
#include<memory>
#include<string>
#include<queue>
#include"buffer.h"class Session {
public:Session(std::shared_ptr <boost::asio::ip::tcp::socket> socket);bool Connect(boost::asio::ip::tcp::endpoint& ep);//异步写  这个异步写存在问题void AsyncWriteSomeToSocketErr(const std::string& buffer);void AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer);void AsyncWriteSomeToSocket(const std::string& buffer);void AsyncWriteSomeCallBack(const boost::system::error_code& err, std::size_t bytes_transferred);//优先取这个void AsyncSendToSocket(const std::string& buffer);void AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred);//异步读 优先取这个void AsyncReadSomeToSocket(const std::string& buffer);void AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred);void AsyncReceiveToSocket(const std::string& buffer);void AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred);private:std::shared_ptr<boost::asio::ip::tcp::socket> socket_;std::shared_ptr<Buffer> send_buffer_;std::queue<std::shared_ptr<Buffer>> send_queue_;std::shared_ptr<Buffer> recv_buffer_;bool send_padding_;bool recv_padding_;
};#endif // !__ASYNC_DEMO_H_2023_8_22__

总体代码定义:

#include"async_demo.h"Session::Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket):socket_(socket),send_padding_(false),recv_padding_(false)
{send_buffer_ = nullptr;recv_buffer_ = nullptr;if (!send_queue_.empty()) {send_queue_.pop();}
}bool Session::Connect(boost::asio::ip::tcp::endpoint& ep) {socket_->connect(ep);return true;
}void Session::AsyncWriteSomeToSocketErr(const std::string& buffer) {//先构造一个发送节点send_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//然后构造async_write_some的参数buffer和回调和函数socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()),//绑定成员函数的地址,类的对象,参数占位符1,参数占位符2std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, send_buffer_));
}//TCP缓冲区 收发端不对等 发11字节 TCP缓冲区只有5字节 那么要分两次发送,假设发送hello world ,第一次只发送hello,\
world未发送,那么如果用户再次调用WriteCallBackErr那么底层不保护发送顺序,那么可能收到的结果hello hello world world \
解决这种就是用一个队列把存储的数据存放到队列里面
void Session::AsyncWriteSomeCallBackErr(const boost::system::error_code& err, std::size_t bytes_transferred, std::shared_ptr<Buffer> buffer_) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " . message: " << err.what() << std::endl;return;}if (bytes_transferred + buffer_->GetCurLen() < buffer_->GetTotalLen()) {//buffer_->GetCurLen() = buffer_->GetCurLen() + bytes_transferred;buffer_->SetCurLen(buffer_->GetCurLen() + bytes_transferred);socket_->async_write_some(boost::asio::buffer(buffer_->GetMsg() + buffer_->GetCurLen(), buffer_->GetTotalLen() - buffer_->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBackErr, this, std::placeholders::_1, std::placeholders::_2, buffer_));}
}void Session::AsyncWriteSomeToSocket(const std::string& buffer) {//发送节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完的数据,false,表示没有,true表示还有if (send_padding_) {return;}//异步发送数据socket_->async_write_some(boost::asio::buffer(buffer.c_str(), buffer.length()), std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncWriteSomeCallBack(const boost::system::error_code& err, size_t bytes_transferred) {if (err.value() != 0) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//取出队列中队首元素std::shared_ptr<Buffer> send_data = send_queue_.front();send_data->SetCurLen(send_data->GetCurLen() + bytes_transferred);//数据未发送完,继承调用异步函数取出队首元素发送if (send_data->GetCurLen() < send_data->GetTotalLen()) {socket_->async_write_some(boost::asio::buffer(send_data->GetMsg() + send_data->GetCurLen(),send_data->GetTotalLen() - send_data->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//如果这个数据发送完了,把数据节点取出来send_queue_.pop();//判断队列里面是否还有下一个数据if (send_queue_.empty()) {send_padding_ = false;return;}//有数据则继续发送if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送的地址偏移socket_->async_write_some(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncWriteSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));}
}void Session::AsyncSendToSocket(const std::string& buffer) {//把发送消息协议构造成节点插入队列send_queue_.emplace(new Buffer(buffer.c_str(), buffer.length()));//判断是否还有未发完数据if (send_padding_) {return;}//异步发送数据socket_->async_send(boost::asio::buffer(buffer.c_str(), buffer.length()),std::bind(&Session::AsyncSendCallBack, this, std::placeholders::_1, std::placeholders::_2));send_padding_ = true;
}void Session::AsyncSendCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {//发送数据失败std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//因为调用的是async_send()它的设计目标是简化发送数据的过程,\让用户不必关心数据的细节,只需提供要发送的数据和回调函数即可send_queue_.pop();if (send_queue_.empty()) {send_padding_ = false;return;}if (!send_queue_.empty()) {std::shared_ptr<Buffer> send_data_next = send_queue_.front();//异步发送发生地址偏移socket_->async_send(boost::asio::buffer(send_data_next->GetMsg() + send_data_next->GetCurLen(),send_data_next->GetTotalLen() - send_data_next->GetCurLen()),std::bind(&Session::AsyncSendCallBack, this,std::placeholders::_1, std::placeholders::_2));}
}void Session::AsyncReadSomeToSocket(const std::string& buffer) {//判断是否正在读数据,这里第一次读数据if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());//异步读取数据socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReadSomeCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}//判断读取的字节数,没有读取完继续读取recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);if (recv_buffer_->GetCurLen() < recv_buffer_->GetTotalLen()) {socket_->async_read_some(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReadSomeCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//将数据投递到队列里交给逻辑线程处理,此处略去//如果读完了则将标记置为falserecv_padding_ = false;recv_buffer_ = nullptr;
}void Session::AsyncReceiveToSocket(const std::string& buffer) {//判断是否有数据正在读取if (recv_padding_) {return;}recv_buffer_ = std::make_shared<Buffer>(buffer.c_str(), buffer.length());socket_->async_receive(boost::asio::buffer(recv_buffer_->GetMsg() + recv_buffer_->GetCurLen(),recv_buffer_->GetTotalLen() - recv_buffer_->GetCurLen()),std::bind(&Session::AsyncReceiveCallBack, this, std::placeholders::_1, std::placeholders::_2));recv_padding_ = true;
}void Session::AsyncReceiveCallBack(boost::system::error_code& err, std::size_t bytes_transferred) {if (0 != err.value()) {std::cout << "error occured!error code: " << err.value() << " .message: " << err.what() << std::endl;return;}recv_buffer_->SetCurLen(recv_buffer_->GetCurLen() + bytes_transferred);recv_padding_ = false;recv_buffer_ = nullptr;
}

本文介绍了boost asio异步读写的操作,仅仅是代码片段和api的封装便于大家理解,下一篇利用这些异步api写一个异步的服务器展示收发效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/51143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文看懂 iova、IOMMU、DMA

目录 一、概念解释 二、深入浅出 三、应用 四、常见问题 一、概念解释 IOVA&#xff08;IO Virtual Address&#xff0c;输入/输出虚拟地址&#xff09; IOMMU&#xff08;I/O Memory Management Unit&#xff09;&#xff1a;IOMMU是一种硬件单元&#xff0c;用于管理设备…

springboot sl4j2 写入日志到mysql

问题描述 springboot初始化的时候&#xff0c;会先初始化日志然后再加载数据源如果用配置文件进行初始化&#xff0c;那么会出现数据源没有加载成功&#xff0c;导致空指针异常 报错排查如下&#xff1a; 搜索报错信息&#xff0c;OBjects.invoke is Null打断点发现。dataso…

前端基础踩坑记录

前言&#xff1a;在做vue项目时&#xff0c;有时代码没有报错&#xff0c;但运行时却各种问题&#xff0c;没有报错排查起来就很费劲&#xff0c;本人感悟&#xff1a;写前端&#xff0c;需要好的眼神&#xff01;&#xff01;&#xff01;谨以此博客记录下自己的踩坑点。 一、…

【Maven教程】(三)基础使用篇:入门使用指南——POM编写、业务代码、测试代码、打包与运行、使用Archetype生成项目骨架~

Maven基础使用篇 1️⃣ 编写 POM2️⃣ 编写业务代码3️⃣ 编写测试代码4️⃣ 打包和运行5️⃣ 使用 Archetype生成项目骨架 1️⃣ 编写 POM 到目前为止&#xff0c;已经大概了解并安装好了Maven环境, 现在&#xff0c;我们开始创建一个最简单的 Hello World 项目。如果你是初次…

IDEA下SpringBoot指定环境、配置文件启动

1、idea下的SpringBoot启动&#xff1a;指定配置文件 Springboot项目有如下配置文件 主配置文件application.yml&#xff0c; 测试环境&#xff1a;application-test.yml 生产环境&#xff1a;application-pro.yml 开发环境&#xff1a;application-dev.yml 1.1.配置文件…

【FreeRTOS】【STM32】中断详细介绍

文章目录 一、三种优先级的概念辨析1. 先理清楚两个概念&#xff1a;CPU 和 MPU2. Cortex-M3 内核与 STM32F1XX 控制器有什么关系3. 优先级的概念辨析① Cortex-M3 内核和 STM32F1XX 的中断优先级② FreeRTOS 的任务的优先级 二、 Cortex-M3 内核的中断优先级1. 中断编号2. 优先…

Android 系统桌面 App —— Launcher 开发(1)

Android 系统桌面 App —— Launcher 开发&#xff08;1&#xff09; Launcher简介 Launcher就是Android系统的桌面&#xff0c;俗称“HomeScreen”也就是我们开机后看到的第一个App。launcher其实就是一个app&#xff0c;它的作用是显示和管理手机上其他App。目前市场上有很…

VIT Swin Transformer

VIT&#xff1a;https://blog.csdn.net/qq_37541097/article/details/118242600 Swin Transform&#xff1a;https://blog.csdn.net/qq_37541097/article/details/121119988 一、VIT 模型由三个模块组成&#xff1a; Linear Projection of Flattened Patches(Embedding层) Tran…

星际争霸之小霸王之小蜜蜂(六)--让子弹飞

目录 前言 一、添加子弹设置 二、创建子弹 三、创建绘制和移动子弹函数 四、让子弹飞 五、效果 总结 前言 小蜜蜂的基本操作已经完成了&#xff0c;现在开始编写子弹的代码了。 一、添加子弹设置 在我的预想里&#xff0c;我们的小蜜蜂既然是一只猫&#xff0c;那么放出的子弹…

微信小程序开发教学系列(1)- 开发入门

第一章&#xff1a;微信小程序简介与入门 1.1 简介 微信小程序是一种基于微信平台的应用程序&#xff0c;可以在微信内直接使用&#xff0c;无需下载和安装。它具有小巧、高效、便捷的特点&#xff0c;可以满足用户在微信中获取信息、使用服务的需求。 微信小程序采用前端技…

自定义WEB框架结合Jenkins实现全自动测试

自定义WEB框架结合Jenkins实现全自动测试 allure生成 allure生成 1.allure–纯命令运行 -固定的–稍微记住对应的单词即可。2 安装&#xff0c;2个步骤: 1.下载allure包&#xff0c;然后配置环境变量。 https://github.com/allure-framework/allure2/releases/tag/2.22.4 2.在…

mysql 、sql server 临时表、表变量、

sql server 临时表 、表变量 mysql 临时表 创建临时表 create temporary table 表名 select 字段 [&#xff0c;字段2…&#xff0c;字段n] from 表

[JavaWeb]【十】web后端开发-SpringBootWeb案例(配置文件)

目录 一、参数配置化 1.1 问题分析 1.2 问题解决&#xff08;application.properties&#xff09; 1.2.1 application.properties 1.2.2 AliOSSUtils 1.2.3 启动服务-测试 二、yml配置文件 2.1 配置格式 2.1.1 新增 application.yml 2.1.2 启动服务 2.2 XML与prope…

LeetCode438.找到字符串中所有字母异位词

因为之前写过一道找字母异位词分组的题&#xff0c;所以这道题做起来还是比较得心应手。我像做之前那道字母异位词分组一样&#xff0c;先把模板p排序&#xff0c;然后拿滑动窗口去s中从头到尾滑动&#xff0c;窗口中的这段字串也给他排序&#xff0c;然后拿这两个排完序的stri…

GEE/PIE 遥感大数据处理与典型案例

查看原文>>>【399三天】GEE/PIE遥感大数据处理与典型案例实践 随着航空、航天、近地空间等多个遥感平台的不断发展&#xff0c;近年来遥感技术突飞猛进。由此&#xff0c;遥感数据的空间、时间、光谱分辨率不断提高&#xff0c;数据量也大幅增长&#xff0c;使其越来…

数据结构(6)

2-3查找树 2-结点&#xff1a;含有一个键(及其对应的值)和两条链&#xff0c;左链接指向2-3树中的键都小于该结点&#xff0c;右链接指向的2-3树中的键都大于该结点。 3-结点&#xff1a;含有两个键(及其对应的值)和三条链&#xff0c;左链接指向的2-3树中的键都小于该结点&a…

python中的matplotlib画散点图(数据分析与可视化)

python中的matplotlib画散点图&#xff08;数据分析与可视化&#xff09; import numpy as np import pandas as pd import matplotlib.pyplot as pltpd.set_option("max_columns",None) plt.rcParams[font.sans-serif][SimHei] plt.rcParams[axes.unicode_minus]Fa…

完全免费的GPT,最新整理,2023年8月24日,已人工验证,不用注册,不用登录,更不用魔法,点开就能用

完全免费的ChatGPT&#xff0c;最新整理&#xff0c;2023年8月24日&#xff0c;已人工验证&#xff0c; 不用注册&#xff0c;不用登录&#xff0c;更不用魔法&#xff0c;点开就能用&#xff01; 第一个&#xff1a;网址地址统一放在文末啦&#xff01;文末直达 看上图你就能…

Spring Boot+Atomikos进行多数据源的分布式事务管理详解和实例

文章目录 0.前言1.参考文档2.基础介绍3.步骤1. 添加依赖到你的pom.xml文件:2. 配置数据源及其对应的JPA实体管理器和事务管理器:3. Spring BootMyBatis集成Atomikos4. 在application.properties文件中配置数据源和JPA属性&#xff1a; 4.使用示例5.底层原理 0.前言 背景&#x…

YOLO目标检测——足球比赛中球员检测数据集下载分享

足球比赛中球员检测数据集&#xff0c;真实场景的高质量图片数据&#xff0c;数据场景丰富&#xff0c;图片格式为jpg&#xff0c;共500张图片 数据集点击下载&#xff1a;YOLO足球比赛中球员检测数据集500图片.rar