简介
select_interrupter用于唤醒reactor的事件循环,其是对不同唤醒机制的别名
定义
在有eventfd时,表示的是eventfd_select_interrupter,在window平台下使用的是socket_select_interrupter ,否则表示的是pipe_select_interrupter
#if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__) || defined(__SYMBIAN32__)
typedef socket_select_interrupter select_interrupter;
#elif defined(BOOST_ASIO_HAS_EVENTFD)
typedef eventfd_select_interrupter select_interrupter;
#else
typedef pipe_select_interrupter select_interrupter;
#endif
公共方法有
管道机制
pipe_select_interrupter通过pipe创建一个通道
open_descriptors:使用pipe创建读写通道,并且设置为非阻塞
void pipe_select_interrupter::open_descriptors()
{int pipe_fds[2];if (pipe(pipe_fds) == 0){read_descriptor_ = pipe_fds[0];::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);write_descriptor_ = pipe_fds[1];::fcntl(write_descriptor_, F_SETFL, O_NONBLOCK);#if defined(FD_CLOEXEC)::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);::fcntl(write_descriptor_, F_SETFD, FD_CLOEXEC);
#endif // defined(FD_CLOEXEC)}else{boost::system::error_code ec(errno,boost::asio::error::get_system_category());boost::asio::detail::throw_error(ec, "pipe_select_interrupter");}
}
close_descriptors:关闭读写通道
void pipe_select_interrupter::close_descriptors()
{if (read_descriptor_ != -1)::close(read_descriptor_);if (write_descriptor_ != -1)::close(write_descriptor_);
}
recreate:先关闭读写通道,然后新建读写通道
void pipe_select_interrupter::recreate()
{close_descriptors();write_descriptor_ = -1;read_descriptor_ = -1;open_descriptors();
}
interrupt:向写通道写入一个字节,内容为0
void pipe_select_interrupter::interrupt()
{char byte = 0;signed_size_type result = ::write(write_descriptor_, &byte, 1);(void)result;
}
reset:从读通道读取数据,直到没有数据
bool pipe_select_interrupter::reset()
{for (;;){char data[1024];signed_size_type bytes_read = ::read(read_descriptor_, data, sizeof(data));if (bytes_read < 0 && errno == EINTR)continue;bool was_interrupted = (bytes_read > 0);while (bytes_read == sizeof(data))bytes_read = ::read(read_descriptor_, data, sizeof(data));return was_interrupted;}
}
套接字机制
socket_select_interrupter通过网络套接字
open_descriptors:创建两个套接字,一个作为服务端,一个作为客户端,客户端作为写套接字,服务端作为读套接字
void socket_select_interrupter::open_descriptors()
{boost::system::error_code ec;socket_holder acceptor(socket_ops::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, ec));if (acceptor.get() == invalid_socket)boost::asio::detail::throw_error(ec, "socket_select_interrupter");int opt = 1;socket_ops::state_type acceptor_state = 0;socket_ops::setsockopt(acceptor.get(), acceptor_state,SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt), ec);using namespace std; // For memset.sockaddr_in4_type addr;std::size_t addr_len = sizeof(addr);memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_addr.s_addr = socket_ops::host_to_network_long(INADDR_LOOPBACK);addr.sin_port = 0;if (socket_ops::bind(acceptor.get(), (const socket_addr_type*)&addr,addr_len, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");if (socket_ops::getsockname(acceptor.get(), (socket_addr_type*)&addr,&addr_len, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");// Some broken firewalls on Windows will intermittently cause getsockname to// return 0.0.0.0 when the socket is actually bound to 127.0.0.1. We// explicitly specify the target address here to work around this problem.addr.sin_addr.s_addr = socket_ops::host_to_network_long(INADDR_LOOPBACK);if (socket_ops::listen(acceptor.get(),SOMAXCONN, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");socket_holder client(socket_ops::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, ec));if (client.get() == invalid_socket)boost::asio::detail::throw_error(ec, "socket_select_interrupter");if (socket_ops::connect(client.get(), (const socket_addr_type*)&addr,addr_len, ec) == socket_error_retval)boost::asio::detail::throw_error(ec, "socket_select_interrupter");socket_holder server(socket_ops::accept(acceptor.get(), 0, 0, ec));if (server.get() == invalid_socket)boost::asio::detail::throw_error(ec, "socket_select_interrupter");ioctl_arg_type non_blocking = 1;socket_ops::state_type client_state = 0;if (socket_ops::ioctl(client.get(), client_state,FIONBIO, &non_blocking, ec))boost::asio::detail::throw_error(ec, "socket_select_interrupter");opt = 1;socket_ops::setsockopt(client.get(), client_state,IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt), ec);non_blocking = 1;socket_ops::state_type server_state = 0;if (socket_ops::ioctl(server.get(), server_state,FIONBIO, &non_blocking, ec))boost::asio::detail::throw_error(ec, "socket_select_interrupter");opt = 1;socket_ops::setsockopt(server.get(), server_state,IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt), ec);read_descriptor_ = server.release();write_descriptor_ = client.release();
}
close_descriptors:关闭创建的两个套接字
void socket_select_interrupter::close_descriptors()
{boost::system::error_code ec;socket_ops::state_type state = socket_ops::internal_non_blocking;if (read_descriptor_ != invalid_socket)socket_ops::close(read_descriptor_, state, true, ec);if (write_descriptor_ != invalid_socket)socket_ops::close(write_descriptor_, state, true, ec);
}
recreate:先关闭两个套接字,然后新建两个套接字
void socket_select_interrupter::recreate()
{close_descriptors();write_descriptor_ = invalid_socket;read_descriptor_ = invalid_socket;open_descriptors();
}
interrupt:向服务端发送一个字节,内容为0
void socket_select_interrupter::interrupt()
{char byte = 0;socket_ops::buf b;socket_ops::init_buf(b, &byte, 1);boost::system::error_code ec;socket_ops::send(write_descriptor_, &b, 1, 0, ec);
}
reset:服务端接收客户端的数据,直到数据读完
bool socket_select_interrupter::reset()
{char data[1024];socket_ops::buf b;socket_ops::init_buf(b, data, sizeof(data));boost::system::error_code ec;int bytes_read = socket_ops::recv(read_descriptor_, &b, 1, 0, ec);bool was_interrupted = (bytes_read > 0);while (bytes_read == sizeof(data))bytes_read = socket_ops::recv(read_descriptor_, &b, 1, 0, ec);return was_interrupted;
}
eventfd机制
eventfd_select_interrupter是通过eventfd机制创建读定两个文件描述符,如果失败时则使用pipe
open_descriptors:使用eventfd创建读写两个文件描述符,如果失败则使用pipe
void eventfd_select_interrupter::open_descriptors()
{
#if __GLIBC__ == 2 && __GLIBC_MINOR__ < 8write_descriptor_ = read_descriptor_ = syscall(__NR_eventfd, 0);if (read_descriptor_ != -1){::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);}
#else // __GLIBC__ == 2 && __GLIBC_MINOR__ < 8
# if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)write_descriptor_ = read_descriptor_ =::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
# else // defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)errno = EINVAL;write_descriptor_ = read_descriptor_ = -1;
# endif // defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)if (read_descriptor_ == -1 && errno == EINVAL){write_descriptor_ = read_descriptor_ = ::eventfd(0, 0);if (read_descriptor_ != -1){::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);}}
#endif // __GLIBC__ == 2 && __GLIBC_MINOR__ < 8if (read_descriptor_ == -1){int pipe_fds[2];if (pipe(pipe_fds) == 0){read_descriptor_ = pipe_fds[0];::fcntl(read_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(read_descriptor_, F_SETFD, FD_CLOEXEC);write_descriptor_ = pipe_fds[1];::fcntl(write_descriptor_, F_SETFL, O_NONBLOCK);::fcntl(write_descriptor_, F_SETFD, FD_CLOEXEC);}else{boost::system::error_code ec(errno,boost::asio::error::get_system_category());boost::asio::detail::throw_error(ec, "eventfd_select_interrupter");}}
}
close_descriptors:关闭读写两个文件描述符
void eventfd_select_interrupter::close_descriptors()
{if (write_descriptor_ != -1 && write_descriptor_ != read_descriptor_)::close(write_descriptor_);if (read_descriptor_ != -1)::close(read_descriptor_);
}
recreate:先关闭读写文件描述符,然后新建读写文件描述符
void eventfd_select_interrupter::recreate()
{close_descriptors();write_descriptor_ = -1;read_descriptor_ = -1;open_descriptors();
}
interrupt:向写文件描述符写入一字节,内容为0
void eventfd_select_interrupter::interrupt()
{uint64_t counter(1UL);int result = ::write(write_descriptor_, &counter, sizeof(uint64_t));(void)result;
}
reset:从读文件描述符读取数据,直至读写
bool eventfd_select_interrupter::reset()
{if (write_descriptor_ == read_descriptor_){for (;;){// Only perform one read. The kernel maintains an atomic counter.uint64_t counter(0);errno = 0;int bytes_read = ::read(read_descriptor_, &counter, sizeof(uint64_t));if (bytes_read < 0 && errno == EINTR)continue;bool was_interrupted = (bytes_read > 0);return was_interrupted;}}else{for (;;){// Clear all data from the pipe.char data[1024];int bytes_read = ::read(read_descriptor_, data, sizeof(data));if (bytes_read < 0 && errno == EINTR)continue;bool was_interrupted = (bytes_read > 0);while (bytes_read == sizeof(data))bytes_read = ::read(read_descriptor_, data, sizeof(data));return was_interrupted;}}
}
epoll_reactor中是如何调用的
在epoll_reactor构造函数中将读描述符添加到epoll中,并且开始调用select_interrupter的interrupt方法写入一字节内容
epoll_reactor::epoll_reactor(boost::asio::io_service& io_service): boost::asio::detail::service_base<epoll_reactor>(io_service),io_service_(use_service<io_service_impl>(io_service)),mutex_(),interrupter_(),epoll_fd_(do_epoll_create()),timer_fd_(do_timerfd_create()),shutdown_(false)
{// Add the interrupter's descriptor to epoll.epoll_event ev = { 0, { 0 } };ev.events = EPOLLIN | EPOLLERR | EPOLLET;ev.data.ptr = &interrupter_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);interrupter_.interrupt();// Add the timer descriptor to epoll.if (timer_fd_ != -1){ev.events = EPOLLIN | EPOLLERR;ev.data.ptr = &timer_fd_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);}
}
在事件循环中,如果事件描述符为interrupter_,没有做任何操作,即没有调用reset读取数据
void epoll_reactor::run(bool block, op_queue<operation>& ops)
{// This code relies on the fact that the task_io_service queues the reactor// task behind all descriptor operations generated by this function. This// means, that by the time we reach this point, any previously returned// descriptor operations have already been dequeued. Therefore it is now safe// for us to reuse and return them for the task_io_service to queue again.// Calculate a timeout only if timerfd is not used.int timeout;if (timer_fd_ != -1)timeout = block ? -1 : 0;else{mutex::scoped_lock lock(mutex_);timeout = block ? get_timeout() : 0;}// Block on the epoll descriptor.epoll_event events[128];int num_events = epoll_wait(epoll_fd_, events, 128, timeout);#if defined(BOOST_ASIO_HAS_TIMERFD)bool check_timers = (timer_fd_ == -1);
#else // defined(BOOST_ASIO_HAS_TIMERFD)bool check_timers = true;
#endif // defined(BOOST_ASIO_HAS_TIMERFD)// Dispatch the waiting events.for (int i = 0; i < num_events; ++i){void* ptr = events[i].data.ptr;if (ptr == &interrupter_){// No need to reset the interrupter since we're leaving the descriptor// in a ready-to-read state and relying on edge-triggered notifications// to make it so that we only get woken up when the descriptor's epoll// registration is updated.#if defined(BOOST_ASIO_HAS_TIMERFD)if (timer_fd_ == -1)check_timers = true;
#else // defined(BOOST_ASIO_HAS_TIMERFD)check_timers = true;
#endif // defined(BOOST_ASIO_HAS_TIMERFD)}
#if defined(BOOST_ASIO_HAS_TIMERFD)else if (ptr == &timer_fd_){check_timers = true;}
#endif // defined(BOOST_ASIO_HAS_TIMERFD)else{// The descriptor operation doesn't count as work in and of itself, so we// don't call work_started() here. This still allows the io_service to// stop if the only remaining operations are descriptor operations.descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);descriptor_data->set_ready_events(events[i].events);ops.push(descriptor_data);}}if (check_timers){mutex::scoped_lock common_lock(mutex_);timer_queues_.get_ready_timers(ops);#if defined(BOOST_ASIO_HAS_TIMERFD)if (timer_fd_ != -1){itimerspec new_timeout;itimerspec old_timeout;int flags = get_timeout(new_timeout);timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);}
#endif // defined(BOOST_ASIO_HAS_TIMERFD)}
}
唤醒是通过使用EPOLL_CTL_MOD重新修改interrupter_
void epoll_reactor::interrupt()
{epoll_event ev = { 0, { 0 } };ev.events = EPOLLIN | EPOLLERR | EPOLLET;ev.data.ptr = &interrupter_;epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
}