asio之select_interrupter

简介

select_interrupter用于唤醒reactor的事件循环,其是对不同唤醒机制的别名

定义

在有eventfd时,表示的是eventfd_select_interrupter,在window平台下使用的是socket_select_interrupter ,否则表示的是pipe_select_interrupter

#if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) || defined(__SYMBIAN32__)
typedef socket_select_interrupter select_interrupter;
#elif defined(BOOST_ASIO_HAS_EVENTFD)
typedef eventfd_select_interrupter select_interrupter;
#else
typedef pipe_select_interrupter select_interrupter;
#endif

公共方法有

select_interrupter
+void recreate()
+void interrupt()
+bool reset()
+socket_type read_descriptor()

管道机制

pipe_select_interrupter通过pipe创建一个通道

pipe_select_interrupter
-int read_descriptor_
-int write_descriptor_
+void recreate()
+void interrupt()
+bool reset()
+socket_type read_descriptor()
-void open_descriptors()
-void close_descriptors()

open_descriptors:使用pipe创建读写通道,并且设置为非阻塞

void pipe_select_interrupter::open_descriptors()
{int pipe_fds[2];if (pipe(pipe_fds) == 0){read_descriptor_ = pipe_fds[0];::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);write_descriptor_ = pipe_fds[1];::fcntl(write_descriptor_, F_SETFL, O_NONBLOCK);#if defined(FD_CLOEXEC)::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);::fcntl(write_descriptor_, F_SETFD, FD_CLOEXEC);
#endif // defined(FD_CLOEXEC)}else{boost::system::error_code ec(errno,boost::asio::error::get_system_category());boost::asio::detail::throw_error(ec, "pipe_select_interrupter");}
}

close_descriptors:关闭读写通道

void pipe_select_interrupter::close_descriptors()
{if (read_descriptor_ != -1)::close(read_descriptor_);if (write_descriptor_ != -1)::close(write_descriptor_);
}

recreate:先关闭读写通道,然后新建读写通道

void pipe_select_interrupter::recreate()
{close_descriptors();write_descriptor_ = -1;read_descriptor_ = -1;open_descriptors();
}

interrupt:向写通道写入一个字节,内容为0

void pipe_select_interrupter::interrupt()
{char byte = 0;signed_size_type result = ::write(write_descriptor_, &byte, 1);(void)result;
}

reset:从读通道读取数据,直到没有数据

bool pipe_select_interrupter::reset()
{for (;;){char data[1024];signed_size_type bytes_read = ::read(read_descriptor_, data, sizeof(data));if (bytes_read < 0 && errno == EINTR)continue;bool was_interrupted = (bytes_read > 0);while (bytes_read == sizeof(data))bytes_read = ::read(read_descriptor_, data, sizeof(data));return was_interrupted;}
}

套接字机制

socket_select_interrupter通过网络套接字

socket_select_interrupter
-socket_type read_descriptor_
-socket_type write_descriptor_
+void recreate()
+void interrupt()
+bool reset()
+socket_type read_descriptor()
-void open_descriptors()
-void close_descriptors()

open_descriptors:创建两个套接字,一个作为服务端,一个作为客户端,客户端作为写套接字,服务端作为读套接字

void socket_select_interrupter::open_descriptors()
{boost::system::error_code ec;socket_holder acceptor(socket_ops::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, ec));if (acceptor.get() == invalid_socket)boost::asio::detail::throw_error(ec, "socket_select_interrupter");int opt = 1;socket_ops::state_type acceptor_state = 0;socket_ops::setsockopt(acceptor.get(), acceptor_state,SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt), ec);using namespace std; // For memset.sockaddr_in4_type addr;std::size_t addr_len = sizeof(addr);memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_addr.s_addr = socket_ops::host_to_network_long(INADDR_LOOPBACK);addr.sin_port = 0;if (socket_ops::bind(acceptor.get(), (const socket_addr_type*)&addr,addr_len, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");if (socket_ops::getsockname(acceptor.get(), (socket_addr_type*)&addr,&addr_len, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");// Some broken firewalls on Windows will intermittently cause getsockname to// return 0.0.0.0 when the socket is actually bound to 127.0.0.1. We// explicitly specify the target address here to work around this problem.addr.sin_addr.s_addr = socket_ops::host_to_network_long(INADDR_LOOPBACK);if (socket_ops::listen(acceptor.get(),SOMAXCONN, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");socket_holder client(socket_ops::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, ec));if (client.get() == invalid_socket)boost::asio::detail::throw_error(ec, "socket_select_interrupter");if (socket_ops::connect(client.get(), (const socket_addr_type*)&addr,addr_len, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");socket_holder server(socket_ops::accept(acceptor.get(), 0, 0, ec));if (server.get() == invalid_socket)boost::asio::detail::throw_error(ec, "socket_select_interrupter");ioctl_arg_type non_blocking = 1;socket_ops::state_type client_state = 0;if (socket_ops::ioctl(client.get(), client_state,FIONBIO, &non_blocking, ec))boost::asio::detail::throw_error(ec, "socket_select_interrupter");opt = 1;socket_ops::setsockopt(client.get(), client_state,IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt), ec);non_blocking = 1;socket_ops::state_type server_state = 0;if (socket_ops::ioctl(server.get(), server_state,FIONBIO, &non_blocking, ec))boost::asio::detail::throw_error(ec, "socket_select_interrupter");opt = 1;socket_ops::setsockopt(server.get(), server_state,IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt), ec);read_descriptor_ = server.release();write_descriptor_ = client.release();
}

close_descriptors:关闭创建的两个套接字

void socket_select_interrupter::close_descriptors()
{boost::system::error_code ec;socket_ops::state_type state = socket_ops::internal_non_blocking;if (read_descriptor_ != invalid_socket)socket_ops::close(read_descriptor_, state, true, ec);if (write_descriptor_ != invalid_socket)socket_ops::close(write_descriptor_, state, true, ec);
}

recreate:先关闭两个套接字,然后新建两个套接字

void socket_select_interrupter::recreate()
{close_descriptors();write_descriptor_ = invalid_socket;read_descriptor_ = invalid_socket;open_descriptors();
}

interrupt:向服务端发送一个字节,内容为0

void socket_select_interrupter::interrupt()
{char byte = 0;socket_ops::buf b;socket_ops::init_buf(b, &byte, 1);boost::system::error_code ec;socket_ops::send(write_descriptor_, &b, 1, 0, ec);
}

reset:服务端接收客户端的数据,直到数据读完

bool socket_select_interrupter::reset()
{char data[1024];socket_ops::buf b;socket_ops::init_buf(b, data, sizeof(data));boost::system::error_code ec;int bytes_read = socket_ops::recv(read_descriptor_, &b, 1, 0, ec);bool was_interrupted = (bytes_read > 0);while (bytes_read == sizeof(data))bytes_read = socket_ops::recv(read_descriptor_, &b, 1, 0, ec);return was_interrupted;
}

eventfd机制

eventfd_select_interrupter是通过eventfd机制创建读定两个文件描述符,如果失败时则使用pipe

eventfd_select_interrupter
-int read_descriptor_
-int write_descriptor_
+void recreate()
+void interrupt()
+bool reset()
+socket_type read_descriptor()
-void open_descriptors()
-void close_descriptors()

open_descriptors:使用eventfd创建读写两个文件描述符,如果失败则使用pipe

void eventfd_select_interrupter::open_descriptors()
{
#if __GLIBC__ == 2 && __GLIBC_MINOR__ < 8write_descriptor_ = read_descriptor_ = syscall(__NR_eventfd, 0);if (read_descriptor_ != -1){::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);}
#else // __GLIBC__ == 2 && __GLIBC_MINOR__ < 8
# if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)write_descriptor_ = read_descriptor_ =::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
# else // defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)errno = EINVAL;write_descriptor_ = read_descriptor_ = -1;
# endif // defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)if (read_descriptor_ == -1 && errno == EINVAL){write_descriptor_ = read_descriptor_ = ::eventfd(0, 0);if (read_descriptor_ != -1){::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);}}
#endif // __GLIBC__ == 2 && __GLIBC_MINOR__ < 8if (read_descriptor_ == -1){int pipe_fds[2];if (pipe(pipe_fds) == 0){read_descriptor_ = pipe_fds[0];::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);write_descriptor_ = pipe_fds[1];::fcntl(write_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(write_descriptor_, F_SETFD, FD_CLOEXEC);}else{boost::system::error_code ec(errno,boost::asio::error::get_system_category());boost::asio::detail::throw_error(ec, "eventfd_select_interrupter");}}
}

close_descriptors:关闭读写两个文件描述符

void eventfd_select_interrupter::close_descriptors()
{if (write_descriptor_ != -1 && write_descriptor_ != read_descriptor_)::close(write_descriptor_);if (read_descriptor_ != -1)::close(read_descriptor_);
}

recreate:先关闭读写文件描述符,然后新建读写文件描述符

void eventfd_select_interrupter::recreate()
{close_descriptors();write_descriptor_ = -1;read_descriptor_ = -1;open_descriptors();
}

interrupt:向写文件描述符写入一字节,内容为0

void eventfd_select_interrupter::interrupt()
{uint64_t counter(1UL);int result = ::write(write_descriptor_, &counter, sizeof(uint64_t));(void)result;
}

reset:从读文件描述符读取数据,直至读写

bool eventfd_select_interrupter::reset()
{if (write_descriptor_ == read_descriptor_){for (;;){// Only perform one read. The kernel maintains an atomic counter.uint64_t counter(0);errno = 0;int bytes_read = ::read(read_descriptor_, &counter, sizeof(uint64_t));if (bytes_read < 0 && errno == EINTR)continue;bool was_interrupted = (bytes_read > 0);return was_interrupted;}}else{for (;;){// Clear all data from the pipe.char data[1024];int bytes_read = ::read(read_descriptor_, data, sizeof(data));if (bytes_read < 0 && errno == EINTR)continue;bool was_interrupted = (bytes_read > 0);while (bytes_read == sizeof(data))bytes_read = ::read(read_descriptor_, data, sizeof(data));return was_interrupted;}}
}

epoll_reactor中是如何调用的

在epoll_reactor构造函数中将读描述符添加到epoll中,并且开始调用select_interrupter的interrupt方法写入一字节内容

epoll_reactor::epoll_reactor(boost::asio::io_service& io_service): boost::asio::detail::service_base<epoll_reactor>(io_service),io_service_(use_service<io_service_impl>(io_service)),mutex_(),interrupter_(),epoll_fd_(do_epoll_create()),timer_fd_(do_timerfd_create()),shutdown_(false)
{// Add the interrupter's descriptor to epoll.epoll_event ev = { 0, { 0 } };ev.events = EPOLLIN | EPOLLERR | EPOLLET;ev.data.ptr = &interrupter_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);interrupter_.interrupt();// Add the timer descriptor to epoll.if (timer_fd_ != -1){ev.events = EPOLLIN | EPOLLERR;ev.data.ptr = &timer_fd_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);}
}

在事件循环中,如果事件描述符为interrupter_,没有做任何操作,即没有调用reset读取数据

void epoll_reactor::run(bool block, op_queue<operation>& ops)
{// This code relies on the fact that the task_io_service queues the reactor// task behind all descriptor operations generated by this function. This// means, that by the time we reach this point, any previously returned// descriptor operations have already been dequeued. Therefore it is now safe// for us to reuse and return them for the task_io_service to queue again.// Calculate a timeout only if timerfd is not used.int timeout;if (timer_fd_ != -1)timeout = block ? -1 : 0;else{mutex::scoped_lock lock(mutex_);timeout = block ? get_timeout() : 0;}// Block on the epoll descriptor.epoll_event events[128];int num_events = epoll_wait(epoll_fd_, events, 128, timeout);#if defined(BOOST_ASIO_HAS_TIMERFD)bool check_timers = (timer_fd_ == -1);
#else // defined(BOOST_ASIO_HAS_TIMERFD)bool check_timers = true;
#endif // defined(BOOST_ASIO_HAS_TIMERFD)// Dispatch the waiting events.for (int i = 0; i < num_events; ++i){void* ptr = events[i].data.ptr;if (ptr == &interrupter_){// No need to reset the interrupter since we're leaving the descriptor// in a ready-to-read state and relying on edge-triggered notifications// to make it so that we only get woken up when the descriptor's epoll// registration is updated.#if defined(BOOST_ASIO_HAS_TIMERFD)if (timer_fd_ == -1)check_timers = true;
#else // defined(BOOST_ASIO_HAS_TIMERFD)check_timers = true;
#endif // defined(BOOST_ASIO_HAS_TIMERFD)}
#if defined(BOOST_ASIO_HAS_TIMERFD)else if (ptr == &timer_fd_){check_timers = true;}
#endif // defined(BOOST_ASIO_HAS_TIMERFD)else{// The descriptor operation doesn't count as work in and of itself, so we// don't call work_started() here. This still allows the io_service to// stop if the only remaining operations are descriptor operations.descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);descriptor_data->set_ready_events(events[i].events);ops.push(descriptor_data);}}if (check_timers){mutex::scoped_lock common_lock(mutex_);timer_queues_.get_ready_timers(ops);#if defined(BOOST_ASIO_HAS_TIMERFD)if (timer_fd_ != -1){itimerspec new_timeout;itimerspec old_timeout;int flags = get_timeout(new_timeout);timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);}
#endif // defined(BOOST_ASIO_HAS_TIMERFD)}
}

唤醒是通过使用EPOLL_CTL_MOD重新修改interrupter_

void epoll_reactor::interrupt()
{epoll_event ev = { 0, { 0 } };ev.events = EPOLLIN | EPOLLERR | EPOLLET;ev.data.ptr = &interrupter_;epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/29344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何去掉el-input-number两侧的上下按键

期初做法 输入的数据都是数字&#xff0c;就想着使用number类型的输入框但是输入框很小的情况下&#xff0c;就会导致上下按键特别占地方&#xff0c;所以想去掉使用deep 语法进行样式覆盖 <style scoped> /deep/ input::-webkit-outer-spin-button, /deep/ input::-webk…

适合度与挑战并存的计算机相关专业!计算机行业前景看好!

作为即将踏入大学校门的高考生&#xff0c;我在选择专业时&#xff0c;计算机相关专业无疑是一个热门选项。然而&#xff0c;随着市场竞争的加剧和行业饱和度的提高&#xff0c;我对此类专业的未来发展产生了些许疑虑。  首先&#xff0c;计算机科学与技术、人工智能、网络安…

char 型变量中能不能存贮一个中文汉字?为什么?

在 Java 中&#xff0c;char 类型变量可以存储一个中文汉字&#xff0c;这是因为 char 类型使用 Unicode 编码&#xff0c;而 Unicode 编码字符集包括了大量的汉字。详细解释如下&#xff1a; Unicode 和 Java 中的 char 类型 Unicode 编码&#xff1a; Unicode 是一种字符编码…

数据仓库之Kappa架构

Kappa架构是一种简化的数据处理架构&#xff0c;旨在处理实时数据流&#xff0c;解决传统Lambda架构中批处理和实时处理的复杂性。Kappa架构完全基于流处理&#xff0c;不区分批处理和实时处理&#xff0c;所有数据都是通过流处理系统进行处理。以下是对Kappa架构的详细介绍&am…

【ArcGIS微课1000例】0120:ArcGIS批量修改符号的样式(轮廓)

ArcGIS可以批量修改符号的样式,如样式、填充颜色、轮廓等等。 文章目录 一、加载实验数据二、土地利用符号化三、批量修改符号样式四、注意事项一、加载实验数据 订阅专栏后,从私信查收专栏配套的完整实验数据包,打开0120.rar中的土地利用数据,如下图所示: 查看属性表: …

事务所管理系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;客户管理&#xff0c;评论管理&#xff0c;基础数据管理&#xff0c;公告信息管理 客户账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;律师管理&#xff0…

如果搜索一定超时,如何用dp来以空间换时间

E - Alphabet Tiles (atcoder.jp) 题目大意&#xff1a;1到k长度的字符串时&#xff0c;在A-Z给定数量下&#xff0c;搭配出多少种不同的字符串 思路 排列组合&#xff0c;会死人的 暴搜&#xff1a;可以解决&#xff0c;但是时间太长 dp&#xff1a;考虑前 i 个字母&…

下载lombok.jar包,简化类的代码

Download (projectlombok.org) 去这个网站下载lombok.jar包 打开这个包文件的位置,拖到项目lib文件夹: 在这里右键添加为库(Add as library)。 添加这三个注解即可&#xff0c;类里面不需要其他东西了

浏览器 插件 Lighthouse

Lighthouse是一款开源的自动化工具&#xff0c;它能够帮助改善你的web应用程序的性能、质量和正确性。你可以在任何网页上运行它&#xff0c;无论是公开的还是需要认证的。 Lighthouse有一款Chrome扩展插件&#xff0c;也可以在Chrome DevTools中使用&#xff0c;或者作为Node…

【代码随想录】【算法训练营】【第41天】 [416]分割等和子集

前言 思路及算法思维&#xff0c;指路 代码随想录。 题目来自 LeetCode。 day 40&#xff0c;休息&#xff0c;休息一下~ day 41&#xff0c;艰难的周一~ 题目详情 [416] 分割等和子集 题目描述 416 分割等和子集 解题思路 前提&#xff1a;是否可以将数组分为和相等的…

css中content属性你了解多少?

在CSS中&#xff0c;content属性通常与伪元素&#xff08;如 ::before 和 ::after&#xff09;一起使用&#xff0c;用于在元素的内容之前或之后插入生成的内容。这个属性不接受常规的HTML内容&#xff0c;而是接受一些特定的值&#xff0c;如字符串、属性值、计数器值等。 以…

计算机组成原理-期末考前常见简答题总结

1、简述冯-诺伊曼计算机的主要设计思想&#xff0c;它有哪些部件构成&#xff1f; 主要思想&#xff1a;存储程序和程序控制。将解题程序防止存储器中&#xff0c;程序控制&#xff1a;控制器顺序执行程序&#xff0c;按指令功能控制全机协调地完成运算任务。主要部件有控制器…

从零开始精通Onvif之图片抓拍

&#x1f4a1; 如果想阅读最新的文章&#xff0c;或者有技术问题需要交流和沟通&#xff0c;可搜索并关注微信公众号“希望睿智”。 概述 在视频监控系统中&#xff0c;图片抓拍功能&#xff08;也称为快照功能&#xff09;是指通过摄像头或其他视频采集设备&#xff0c;将实时…

NPM 包管理器简介

目录 官方数据 npm 简介 包 安装所有依赖 安装单个包 更新包 版本控制 运行任务 官方数据 包量高达310w&#xff0c; 6月份的第三周下载量高达600亿&#xff0c;5月份下载量更是高达2473亿&#xff0c;这惊人的数字无外乎体现当今互联网的活跃程度和仍旧处于高速发展阶…

Web前端开发实战:HTML5+CSS3+JavaScript+Vue+Bootstrap

&#x1f482; 个人网站:【 摸鱼游戏】【神级代码资源网站】【工具大全】&#x1f91f; 一站式轻松构建小程序、Web网站、移动应用&#xff1a;&#x1f449;注册地址&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交…

IT入门知识第四部分《数据库》(4/10)

目录 1. 数据库基础 1.1 数据库的定义 1.2 数据库的关键概念 数据模型 数据库架构 数据库操作语言&#xff08;DML 和 DDL&#xff09; 总结 2. 关系型数据库 2.1 MySQL MySQL 的历史和特点 MySQL 的安装和配置 MySQL 的基本操作 2.2 PostgreSQL PostgreSQL 的特…

相似性搜索揭秘:向量嵌入与机器学习应用

引言 在当今数据驱动的世界中&#xff0c;有效地检索和利用信息是一项关键挑战。在数据库、搜索引擎和众多应用程序中&#xff0c;寻找相似数据是一项基本操作。传统数据库中&#xff0c;基于固定数值标准的相似项搜索相对直接&#xff0c;通过查询语言即可实现&#xff0c;如…

聚四氟乙烯离心管 四氟反应管 消解管 PTFE螺口带盖管 特氟龙试管

一、产品介绍 样品悬浮液盛放在管状试样容器中&#xff0c;在离心机的高速旋转下&#xff0c;由于巨大的离心力作用&#xff0c;使悬浮的微小颗粒 以一定的速度沉降&#xff0c;从而与溶液得以分离。这种带密封盖或压盖的管状试样容器&#xff0c;就是离心管。 PTFE离心管&…

【机器学习】第9章 降维算法——PCA降维

一、概念 1.PCA &#xff08;1&#xff09;主成分分析&#xff08;Principal ComponentAnalysis&#xff0c;PCA&#xff09;一种经典的线性降维分析算法。 &#xff08;2&#xff09;原理&#xff0c;这里以二维转一维为例&#xff0c;原来的平面变成了一条直线 这是三维变二…

车载学习:UDS诊断、ECU刷写、OTA升级、Tbox测试、CANoe实操

每天的直播时间&#xff1a; 周一至周五&#xff1a;20&#xff1a;00-23&#xff1a;00 周六与周日&#xff1a;9&#xff1a;00-12&#xff1a;00&#xff0c;14&#xff1a;00-17&#xff1a;00 TBOX 深圳 涉及过T-BOX测试吗Ota升级涉及的台架环境是什么样的&#xff1f;上…