文章目录
- 前言
- 一、计时器使用
- 1. 创建计时器,阻塞等待
- 2. 创建计时器,非阻塞等待
- 3. 周期性计时器
- 4.将3写入类封装
- 5.多线程程序的同步完成事件处理函数
- 二、Sockets简介
- 1.同步TCP客户端
- 2.同步TCP服务端
- 3.异步TCP服务端
- 4 同步UDP客户端
- 5 同步UDP服务端
- 6 异步UDP服务端
- 7 异步TCP/UDP服务
- 参考网址
前言
The Boost.Asio库是为使用C++进行系统编程的程序员设计的,在系统编程中,通常需要访问操作系统功能(如网络)。特别是Boost。Asio致力于实现以下目标:
-
便携性。该库应支持一系列常用的操作系统,并在这些操作系统之间提供一致的行为。
-
可扩展性。该库应促进可扩展到数千个并发连接的网络应用程序的开发。每个操作系统的库实现都应该使用最能实现这种可伸缩性的机制。
-
效率该库应支持分散采集I/O等技术,并允许程序最大限度地减少数据复制。
-
从已建立的API(如BSD套接字)中建模概念。BSD套接字API被广泛地实现和理解,并且在许多文献中都有涉及。其他编程语言通常使用类似的接口来连接API。在合理的范围内,Boost。Asio应该利用现有的做法。
-
易于使用。图书馆应采用工具包而非框架方法,为新用户提供较低的进入门槛。也就是说,它应该尽量减少前期投资,及时学习一些基本规则和指导方针。之后,库用户应该只需要了解正在使用的特定函数。
-
进一步抽象的基础。该库应允许开发提供更高抽象级别的其他库。例如,常用协议(如HTTP)的实现。
尽管Boost.Asio最初主要专注于网络,其异步I/O的概念已经扩展到包括其他操作系统资源,如串行端口、文件描述符等。
即: Boost.Asio是可实现异步IO的网络编程库,它封装了socket, bind, listen, epoll等操作
一、计时器使用
在Linux系统上,boost::asio::steady_timer 是通过timerfd_create 和 timerfd_settime 等系统调用函数实现的。这些函数允许应用程序创建和设置定时器,并且可以以稳定的时间间隔触发定时器事件。
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
1. 创建计时器,阻塞等待
// 只有一个线程
#include <boost/asio.hpp>
// 使用asio库必须建立至少一个io执行上下文对象,如io_context,thread_pool
// io执行上下文提供了对io功能的访问
boost::asio::io_contex io;
boost::asio::steady_timer t(io, boost::asio::chrono::second(5)); // 对象构造出来后计时器就已经执行了
/*若这里的执行时间较长,则wait不阻塞*/
t.wait(); // wait 至到计时器结束
2. 创建计时器,非阻塞等待
// 只有一个线程
void print(const boost::system::error_code& e)
{}
boost::asio::io_contex io;
boost::asio::steady_timer t(io, boost:;asio::chrono::seconds(5));
// 该函数接受一个可调用对象,函数签名:void(const boost::system::error_code&)
t.async_wait(&print)
io.run(); // 阻塞,至到所有工作都处理完
3. 周期性计时器
// 只有一个线程
#include <functional>
#include <boost/asio.hpp>
void print(const boost::system::error_code&, boost::asio::steady_timer*, t, int *count)
{if (*count < 5){++(*count);// expires_at设置到期时间点,expiry()返回到期的时间点// expiry返回一个time_pointt->expires_at(t->expiry() + boost::asio::chrono::seconds(1));// 异步等待,当到期时注册的函数会被调用t->async_wait(std::bind(print, boost::asio::placeholders::error, t, count));}
}boost::asio::io_context io;
int count = 0;
boost::asio::steady_timer t(io, boost::asio::chrono::seconds(1));
t.async_wait(std::bind(print, boost::asio::placeholders::error, &t, &count));
io.run()
std::bind为创建一个可调用对象,可更改该对象的函数签名
void fun(int a, int b, int c, int d) {}
auto bind_fun = std::bind(fun, a, b, std::placeholders::_1, d);
bind_fun(c);std::_Placeholder<9> s;auto b_fun = std::bind(fun, 1, s); // fun后面为传入函数的参数b_fun(1,2,3,4,5,6,7,8,9); // 分别是placeholders::_1 _2 _3 ... _9
4.将3写入类封装
#include <functional>
#include <iostream>
#include <boost/asio.hpp>class printer
{
public:printer(boost::asio::io_context& io): timer_(io, boost::asio::chrono::seconds(1)),count_(0){timer_.async_wait(std::bind(&printer::print, this)); // 没有placeholders也是可以的}~printer(){std::cout << "Final count is " << count_ << std::endl;}void print(){if (count_ < 5){std::cout << count_ << std::endl;++count_;timer_.expires_at(timer_.expiry() + boost::asio::chrono::seconds(1));timer_.async_wait(std::bind(&printer::print, this));}}
private:boost::asio::steady_timer timer_;int count_;
};int main()
{boost::asio::io_context io;printer p(io);io.run();return 0;
}
5.多线程程序的同步完成事件处理函数
注册的函数会在调用run的线程中执行,如果有多个线程同时调用了run呢?
// 多线程
如果有两个线程同时调用 run() 函数,每个线程都将从 io_context 的事件队列中获取事件并执行它们。由于您的 printer 类在构造函数中设置了两个定时器 t1_ 和 t2_,并且它们都是异步等待的,因此在每个定时器到期时,将触发相应的处理函数 print1() 和 print2()。
当两个线程同时调用 run() 时,它们将竞争 io_context 的事件队列,但由于 io_context 本身是线程安全的,因此它们不会相互干扰或导致竞态条件。每个线程将按照事件在队列中的顺序处理它们。
总体来说,两个线程将会以并发的方式执行 io_context 中的事件,但由于事件的触发和处理是按照事件队列的顺序进行的,因此您的打印机类的 print1() 和 print2() 函数也将按照预期的顺序执行。
#include <functional>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>using namespace std;
namespace asio = boost::asio;class printer
{
private:// 提供线程安全的序列化执行,以确保异步操作在多线程环境中的顺序性asio::strand<asio::io_context::executor_type> strand_;asio::steady_timer t1_;asio::steady_timer t2_;int count_;
public:printer(asio::io_context& io): strand_(asio::make_strand(io)),t1_(io, asio::chrono::seconds(1)),t2_(io, asio::chrono::seconds(1)),count_(0){// strand_ 执行器与 print1 方法绑定在一起,这样就确保了 print1 方法在 strand_ 执行器上执行t1_.async_wait(asio::bind_executor(strand_, bind(&printer::print1, this)));t2_.async_wait(asio::bind_executor(strand_, bind(&printer::print2, this)));}~printer(){cout << "final count is " << count_ << endl;}void print1(){if (count_ < 10){cout << "T1: " << count_ << " thid: " << this_thread::get_id() << endl;++count_;t1_.expires_at(t1_.expiry() + asio::chrono::milliseconds(100));t1_.async_wait(asio::bind_executor(strand_, bind(&printer::print1, this)));}else if (count_ < 1000){cout << "T1: " << count_ << " thid: " << this_thread::get_id() << endl;++count_;t1_.expires_at(t1_.expiry() + asio::chrono::milliseconds(200));t1_.async_wait(asio::bind_executor(strand_, bind(&printer::print1, this)));}}void print2(){if (count_ < 1000){cout << "T2: " << count_<< " thid: " << this_thread::get_id() << endl;++count_;t2_.expires_at(t2_.expiry() + asio::chrono::milliseconds(100));t2_.async_wait(asio::bind_executor(strand_, bind(&printer::print2, this)));}}
};int main()
{asio::io_context io;printer p(io);thread t([&]{io.run();});// 不调用run,注册函数就不会被执行io.run();t.join();return 0;
}
二、Sockets简介
Daytime Protocol:
服务端无论接受到什么都返回当前日期
Weekday, Month Day, Year Time-Zone
Tuesday, February 22, 1982 17:37:43-PST
1.同步TCP客户端
socket
connet
using boost::asio::ip::tcp;
namespace asio = boost::asio;
try
{// 使用asio库必须建立至少一个io执行上下文对象asio::io_context io_context;// 将xxx解析成xxxtcp::resolver resolver(io_context);tcp::resolver::results_type endpoints = resolver.resolve(argv[1], "daytime");// 创建socket,建立连接tcp::socket socket(io_context);boost::asio::connect(socket, endpoints);for (;;){std::array<char, 128> buf;boost::system::error_code error;// 阻塞读取数据size_t len = socket.read_some(boost::asio::buffer(buf), error);if (error == boost::asio::error::eof)break; // Connection closed cleanly by peer.else if (error)throw boost::system::system_error(error); // Some other error.std::cout.write(buf.data(), len);}
}
catch (std::exception& e)
{std::cerr << e.what() << std::endl;
}
2.同步TCP服务端
socket
bind
listen
accept
boost::asio::io_context io_context;tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 13));for (;;)
{tcp::socket socket(io_context);acceptor.accept(socket); // 阻塞,等待客户端连接std::string message = make_daytime_string();boost::system::error_code ignored_error;boost::asio::write(socket, boost::asio::buffer(message), ignored_error);
}
3.异步TCP服务端
std::enable_shared_from_this 是一个 C++11 标准库提供的模板类,位于 <memory> 头文件中。它的作用是允许一个对象可以在 tcp_connection 的成员函数中调用 shared_from_this() 方法来获取指向当前对象的 std::shared_ptr
// 函数原型
template<typename AsyncWriteStream, typename ConstBufferSequence, typename WriteHandler>
void async_write(AsyncWriteStream & s, const ConstBufferSequence & buffers, WriteHandler handler);io_context io;ip::tcp::socket socket(io);socket.connect(ip::tcp::endpoint(ip::address::from_string("127.0.0.1"), 8080));std::string data = "Hello, world!";async_write(socket, boost::asio::buffer(data),[](const boost::system::error_code& error, std::size_t bytes_transferred) {if (!error) {std::cout << "Data sent successfully." << std::endl;} else {std::cerr << "Error: " << error.message() << std::endl;}});io.run();
//using namespace boost::asio; // asio is namespace, 将namespace 中的内容释放出来
//namespace asio = boost::asio;
//using boost::asio::ip::tcp; // tcp is class#include <ctime>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <boost/asio.hpp>
namespace asio = boost::asio;
using boost::asio::ip::tcp;
class tcp_connection : public std::enable_shared_from_this<tcp_connection > {private:tcp_connection(asio::io_context& io):socket_(io) {}void handle_write(const boost::system::error_code&, size_t){}tcp::socket socket_;std::string message_;public:typedef std::shared_ptr<tcp_connection> pointer;static pointer create(asio::io_context& io){ return pointer(new tcp_connection(io)); }tcp::socket& socket(){ return socket_; }void start(){// message_ = get_time();// 该函数保证整个数据块被发送message_ = "HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\n\r\n"; asio::async_write( // 这里用shared_from_this而非socket_, asio::buffer(message_), std::bind(&tcp_connection::handle_write, shared_from_this(),asio::placeholders::error, asio::placeholders::bytes_transferred));}};
class tcp_server{private:asio::io_context& io_;tcp::acceptor acceptor_;void start_accept() {tcp_connection::pointer new_conn = tcp_connection::create(io_);acceptor_.async_accept(new_conn->socket(), std::bind(&tcp_server::handle_accept, this, new_conn, asio::placeholders::error));}void handle_accept(tcp_connection::pointer new_conn, const boost::system::error_code& error) {if (!error) {new_conn->start();}start_accept();}public:tcp_server(asio::io_context& io) :io_(io), acceptor_(io, tcp::endpoint(tcp::v4(), 9000)) {start_accept();}
};
int main()
{try{asio::io_context io; // 使用asio必须有io上下文对象tcp_server server(io);io.run();} catch (std::exception& e) {std::cerr << e.what() << std::endl;}return 0;
}
只有一个线程
执行流解析:
tcp_server server(io);->start_accept()->async_accept(注册异步接收事件,当事件发生是调用handle_accept)-> io.run()在此处阻塞。
当有事件发生时:从io.run的阻塞处跳转至handle_accept执行->start->async_write(异步写数据)-> start_accept(再次注册) -> 跳回到 io.run()在此处阻塞
4 同步UDP客户端
#include <array>
#include <iostream>
#include <boost/asio.hpp>using namespace std;
using boost::asio::ip::udp;
namespace ip = boost::asio::ip;
namespace asio = boost::asio;int main(int argc, char* argv[])
{try{asio::io_context io;// 通过host,services获取远程endpoint// 获取远程信息,如远程地址,端口,udp::resolver resolver(io);udp::endpoint receiver_endpoint = // 解引用是安全的,至少返回一个*resolver.resolve(udp::v4(), argv[1], "daytime").begin(); ip::address::from_string("127.0.0.1");// udp::socket socket(io);socket.open(udp::v4());array<char, 1> send_buf{0};socket.send_to(asio::buffer(send_buf), udp::endpoint(ip::address::from_string(argv[1]), 9000));array<char, 128> recv_buf;udp::endpoint sender_endpoint;// 由receive_from初始化endpoint 获取服务器相关信息size_t len = socket.receive_from(asio::buffer(recv_buf), sender_endpoint);cout.write(recv_buf.data(), len);// cout << recv_buf.data() << endl;}catch(const std::exception& e){std::cerr << "WAHT?:" << e.what() << '\n';}
}
5 同步UDP服务端
#include <array>
#include <ctime>
#include <iostream>
#include <string>
#include <boost/asio.hpp>using boost::asio::ip::udp;std::string make_daytime_string()
{using namespace std; // For time_t, time and ctime;time_t now = time(0);return ctime(&now);
}int main()
{try{boost::asio::io_context io_context;udp::socket socket(io_context, udp::endpoint(udp::v4(), 9000));for (;;){std::array<char, 1> recv_buf;udp::endpoint remote_endpoint;socket.receive_from(boost::asio::buffer(recv_buf), remote_endpoint);std::string message = "HTTP/1.1 200 OK\r\nContent-Type:text\r\n\r\n";message += make_daytime_string();boost::system::error_code ignored_error;socket.send_to(boost::asio::buffer(message),remote_endpoint, 0, ignored_error);}}catch (std::exception& e){std::cerr << e.what() << std::endl;}return 0;
}
6 异步UDP服务端
#include <array>
#include <ctime>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <boost/asio.hpp>using boost::asio::ip::udp;std::string make_daytime_string()
{using namespace std; // For time_t, time and ctime;time_t now = time(0);return ctime(&now);
}class udp_server
{
public:udp_server(boost::asio::io_context& io_context): socket_(io_context, udp::endpoint(udp::v4(), 13)){start_receive();}private:void start_receive(){socket_.async_receive_from(boost::asio::buffer(recv_buffer_), remote_endpoint_,std::bind(&udp_server::handle_receive, this,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));}void handle_receive(const boost::system::error_code& error,std::size_t /*bytes_transferred*/){if (!error){std::shared_ptr<std::string> message(new std::string(make_daytime_string()));socket_.async_send_to(boost::asio::buffer(*message), remote_endpoint_,std::bind(&udp_server::handle_send, this, message,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));start_receive();}}void handle_send(std::shared_ptr<std::string> /*message*/,const boost::system::error_code& /*error*/,std::size_t /*bytes_transferred*/){}udp::socket socket_;udp::endpoint remote_endpoint_;std::array<char, 1> recv_buffer_;
};int main()
{try{boost::asio::io_context io_context;udp_server server(io_context);io_context.run();}catch (std::exception& e){std::cerr << e.what() << std::endl;}return 0;
}
7 异步TCP/UDP服务
#include <array>
#include <ctime>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <boost/asio.hpp>using boost::asio::ip::tcp;
using boost::asio::ip::udp;std::string make_daytime_string()
{using namespace std; // For time_t, time and ctime;time_t now = time(0);return ctime(&now);
}class tcp_connection: public std::enable_shared_from_this<tcp_connection>
{
public:typedef std::shared_ptr<tcp_connection> pointer;static pointer create(boost::asio::io_context& io_context){return pointer(new tcp_connection(io_context));}tcp::socket& socket(){return socket_;}void start(){message_ = make_daytime_string();boost::asio::async_write(socket_, boost::asio::buffer(message_),std::bind(&tcp_connection::handle_write, shared_from_this()));}private:tcp_connection(boost::asio::io_context& io_context): socket_(io_context){}void handle_write(){}tcp::socket socket_;std::string message_;
};class tcp_server
{
public:tcp_server(boost::asio::io_context& io_context): io_context_(io_context),acceptor_(io_context, tcp::endpoint(tcp::v4(), 13)){start_accept();}private:void start_accept(){tcp_connection::pointer new_connection =tcp_connection::create(io_context_);acceptor_.async_accept(new_connection->socket(),std::bind(&tcp_server::handle_accept, this, new_connection,boost::asio::placeholders::error));}void handle_accept(tcp_connection::pointer new_connection,const boost::system::error_code& error){if (!error){new_connection->start();}start_accept();}boost::asio::io_context& io_context_;tcp::acceptor acceptor_;
};class udp_server
{
public:udp_server(boost::asio::io_context& io_context): socket_(io_context, udp::endpoint(udp::v4(), 9000)){start_receive();}private:void start_receive(){socket_.async_receive_from(boost::asio::buffer(recv_buffer_), remote_endpoint_,std::bind(&udp_server::handle_receive, this,boost::asio::placeholders::error));}void handle_receive(const boost::system::error_code& error){if (!error){std::shared_ptr<std::string> message(new std::string(make_daytime_string()));socket_.async_send_to(boost::asio::buffer(*message), remote_endpoint_,std::bind(&udp_server::handle_send, this, message));start_receive();}}void handle_send(std::shared_ptr<std::string> /*message*/){}udp::socket socket_;udp::endpoint remote_endpoint_;std::array<char, 1> recv_buffer_;
};int main()
{try{boost::asio::io_context io_context;tcp_server server1(io_context);udp_server server2(io_context);io_context.run();}catch (std::exception& e){std::cerr << e.what() << std::endl;}return 0;
}
参考网址
https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio.html
https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/tutorial.html