问题
在使用非阻塞connect时,发现在服务端没有运行时,连接成功,发现是对于connect返回为-1时,对错误码没有做正确的处理,只是对于errno为EINTR做了处理,其它错误码时使用select判断是否可写,可写就将套接字添加到reactor中,这种处理方式是有问题的,应该是在errno为EINPROGRESS时,才使用select判断是否可写,由于出错时,套接字也会变为可写,所以在判断为可写时,需要调用getsockopt(s, SOL_SOCKET, SO_ERROR,…)判断没有出错,才认为是连接成功的。
asio的异步连接处理
linux的处理
首先创建reactive_socket_connect_op操作,调用start_connect_op,在套接字为非阻塞,或者将套接字设置为非阻塞,调用connect发起连接,如果错误码为in_progress或者would_block,将reactive_socket_connect_op操作添加到事件循环中,在套接字变为可写时,会执行reactive_socket_connect_op_base基类的do_perform方法,其会调用socket_ops#non_blocking_connect方法
template <typename Handler>void async_connect(implementation_type& impl,const endpoint_type& peer_endpoint, Handler& handler){bool is_continuation =boost_asio_handler_cont_helpers::is_continuation(handler);// Allocate and construct an operation to wrap the handler.typedef reactive_socket_connect_op<Handler> op;typename op::ptr p = { boost::asio::detail::addressof(handler),boost_asio_handler_alloc_helpers::allocate(sizeof(op), handler), 0 };p.p = new (p.v) op(impl.socket_, handler);BOOST_ASIO_HANDLER_CREATION((p.p, "socket", &impl, "async_connect"));start_connect_op(impl, p.p, is_continuation,peer_endpoint.data(), peer_endpoint.size());p.v = p.p = 0;}
};```cpp
void reactive_socket_service_base::start_connect_op(reactive_socket_service_base::base_implementation_type& impl,reactor_op* op, bool is_continuation,const socket_addr_type* addr, size_t addrlen)
{if ((impl.state_ & socket_ops::non_blocking)|| socket_ops::set_internal_non_blocking(impl.socket_, impl.state_, true, op->ec_)){if (socket_ops::connect(impl.socket_, addr, addrlen, op->ec_) != 0){if (op->ec_ == boost::asio::error::in_progress|| op->ec_ == boost::asio::error::would_block){op->ec_ = boost::system::error_code();reactor_.start_op(reactor::connect_op, impl.socket_,impl.reactor_data_, op, is_continuation, false);return;}}}reactor_.post_immediate_completion(op, is_continuation);
}class reactive_socket_connect_op_base : public reactor_op
{
public:reactive_socket_connect_op_base(socket_type socket, func_type complete_func): reactor_op(&reactive_socket_connect_op_base::do_perform, complete_func),socket_(socket){}static bool do_perform(reactor_op* base){reactive_socket_connect_op_base* o(static_cast<reactive_socket_connect_op_base*>(base));return socket_ops::non_blocking_connect(o->socket_, o->ec_);}private:socket_type socket_;
};
windows的处理
template <typename Handler>void async_connect(implementation_type& impl,const endpoint_type& peer_endpoint, Handler& handler){// Allocate and construct an operation to wrap the handler.typedef win_iocp_socket_connect_op<Handler> op;typename op::ptr p = { boost::asio::detail::addressof(handler),boost_asio_handler_alloc_helpers::allocate(sizeof(op), handler), 0 };p.p = new (p.v) op(impl.socket_, handler);BOOST_ASIO_HANDLER_CREATION((p.p, "socket", &impl, "async_connect"));start_connect_op(impl, impl.protocol_.family(), impl.protocol_.type(),peer_endpoint.data(), static_cast<int>(peer_endpoint.size()), p.p);p.v = p.p = 0;}
};void win_iocp_socket_service_base::start_connect_op(win_iocp_socket_service_base::base_implementation_type& impl,int family, int type, const socket_addr_type* addr,std::size_t addrlen, win_iocp_socket_connect_op_base* op)
{// If ConnectEx is available, use that.if (family == BOOST_ASIO_OS_DEF(AF_INET)|| family == BOOST_ASIO_OS_DEF(AF_INET6)){if (connect_ex_fn connect_ex = get_connect_ex(impl, type)){union address_union{socket_addr_type base;sockaddr_in4_type v4;sockaddr_in6_type v6;} a;using namespace std; // For memset.memset(&a, 0, sizeof(a));a.base.sa_family = family;socket_ops::bind(impl.socket_, &a.base,family == BOOST_ASIO_OS_DEF(AF_INET)? sizeof(a.v4) : sizeof(a.v6), op->ec_);if (op->ec_ && op->ec_ != boost::asio::error::invalid_argument){iocp_service_.post_immediate_completion(op, false);return;}op->connect_ex_ = true;update_cancellation_thread_id(impl);iocp_service_.work_started();BOOL result = connect_ex(impl.socket_,addr, static_cast<int>(addrlen), 0, 0, 0, op);DWORD last_error = ::WSAGetLastError();if (!result && last_error != WSA_IO_PENDING)iocp_service_.on_completion(op, last_error);elseiocp_service_.on_pending(op);return;}}// Otherwise, fall back to a reactor-based implementation.reactor& r = get_reactor();update_cancellation_thread_id(impl);if ((impl.state_ & socket_ops::non_blocking) != 0|| socket_ops::set_internal_non_blocking(impl.socket_, impl.state_, true, op->ec_)){if (socket_ops::connect(impl.socket_, addr, addrlen, op->ec_) != 0){if (op->ec_ == boost::asio::error::in_progress|| op->ec_ == boost::asio::error::would_block){op->ec_ = boost::system::error_code();r.start_op(reactor::connect_op, impl.socket_,impl.reactor_data_, op, false, false);return;}}}r.post_immediate_completion(op, false);
}class win_iocp_socket_connect_op_base : public reactor_op
{
public:win_iocp_socket_connect_op_base(socket_type socket, func_type complete_func): reactor_op(&win_iocp_socket_connect_op_base::do_perform, complete_func),socket_(socket),connect_ex_(false){}static bool do_perform(reactor_op* base){win_iocp_socket_connect_op_base* o(static_cast<win_iocp_socket_connect_op_base*>(base));return socket_ops::non_blocking_connect(o->socket_, o->ec_);}socket_type socket_;bool connect_ex_;
};
其中win_iocp_socket_connect_op_base 和reactive_socket_connect_op_base 都会调用non_blocking_connect,其内部对window和linux作了区分处理,window是调用select,其设置的超时时间为0,将套接字添加到可写以及异常集合中上。linux是调用poll,添加POLLOUT事件,超时时间设置为0。两个都是立即返回。如果返回的事件数为0,说明还在尝试连接中。否则调用getsockopt判断是否出错,没有出错说明连接成功了
bool non_blocking_connect(socket_type s, boost::system::error_code& ec)
{// Check if the connect operation has finished. This is required since we may// get spurious readiness notifications from the reactor.
#if defined(BOOST_ASIO_WINDOWS) \|| defined(__CYGWIN__) \|| defined(__SYMBIAN32__)fd_set write_fds;FD_ZERO(&write_fds);FD_SET(s, &write_fds);fd_set except_fds;FD_ZERO(&except_fds);FD_SET(s, &except_fds);timeval zero_timeout;zero_timeout.tv_sec = 0;zero_timeout.tv_usec = 0;int ready = ::select(s + 1, 0, &write_fds, &except_fds, &zero_timeout);
#else // defined(BOOST_ASIO_WINDOWS)// || defined(__CYGWIN__)// || defined(__SYMBIAN32__)pollfd fds;fds.fd = s;fds.events = POLLOUT;fds.revents = 0;int ready = ::poll(&fds, 1, 0);
#endif // defined(BOOST_ASIO_WINDOWS)// || defined(__CYGWIN__)// || defined(__SYMBIAN32__)if (ready == 0){// The asynchronous connect operation is still in progress.return false;}// Get the error code from the connect operation.int connect_error = 0;size_t connect_error_len = sizeof(connect_error);if (socket_ops::getsockopt(s, 0, SOL_SOCKET, SO_ERROR,&connect_error, &connect_error_len, ec) == 0){if (connect_error){ec = boost::system::error_code(connect_error,boost::asio::error::get_system_category());}elseec = boost::system::error_code();}return true;
}
qt的非阻塞连接处理
是通过QAbstractSocket::waitForConnected
首先开启QElapsedTimer定时器,调用QAbstractSocketPrivate::_q_startConnecting发起连接,内部调用QAbstractSocketPrivate::_q_connectToNextAddress,调用QNativeSocketEngine::connectToHost,通过调用QNativeSocketEnginePrivate::nativeConnect即connect来发起连接 ,在连接没有成功时,会设置套接字的可写socketEngine->setWriteNotificationEnabled(true);
会为套接字添加写处理器QWriteNotifier,当事件触发并且当前连接状态为ConnectingState时,再次发起连接,如果errno为EISCONN则更新状态为ConnectedState。同时会调用waitForWrite等待可写事件
对于windows系统,如果返回的事件为不为0,则表示连接成功,否则调用getsockopt(d->socketDescriptor, SOL_SOCKET, SO_ERROR, (char *) &value, &valueSize)
获取错误原因。
对于 linux系统,当事件触发并且当前连接状态为ConnectingState时,再次发起连接,如果errno为EISCONN则更新状态为ConnectedState。waitForWrite时,如果没有超时,并且当前状态为ConnectingState,则发起重连,同时调用QAbstractSocketPrivate::_q_testConnection判断连接状态。
bool QNativeSocketEngine::waitForWrite(int msecs, bool *timedOut)
{Q_D(QNativeSocketEngine);Q_CHECK_VALID_SOCKETLAYER(QNativeSocketEngine::waitForWrite(), false);Q_CHECK_NOT_STATE(QNativeSocketEngine::waitForWrite(),QAbstractSocket::UnconnectedState, false);if (timedOut)*timedOut = false;int ret = d->nativeSelect(msecs, false);// On Windows, the socket is in connected state if a call to// select(writable) is successful. In this case we should not// issue a second call to WSAConnect()
#if defined (Q_OS_WIN)if (state() == QAbstractSocket::ConnectingState) {if (ret > 0) {setState(QAbstractSocket::ConnectedState);d_func()->fetchConnectionParameters();return true;} else {int value = 0;int valueSize = sizeof(value);if (::getsockopt(d->socketDescriptor, SOL_SOCKET, SO_ERROR, (char *) &value, &valueSize) == 0) {if (value == WSAECONNREFUSED) {d->setError(QAbstractSocket::ConnectionRefusedError, QNativeSocketEnginePrivate::ConnectionRefusedErrorString);d->socketState = QAbstractSocket::UnconnectedState;return false;} else if (value == WSAETIMEDOUT) {d->setError(QAbstractSocket::NetworkError, QNativeSocketEnginePrivate::ConnectionTimeOutErrorString);d->socketState = QAbstractSocket::UnconnectedState;return false;} else if (value == WSAEHOSTUNREACH) {d->setError(QAbstractSocket::NetworkError, QNativeSocketEnginePrivate::HostUnreachableErrorString);d->socketState = QAbstractSocket::UnconnectedState;return false;}}}}
#endifif (ret == 0) {if (timedOut)*timedOut = true;d->setError(QAbstractSocket::SocketTimeoutError,QNativeSocketEnginePrivate::TimeOutErrorString);d->hasSetSocketError = false; // A timeout error is temporary in waitFor functionsreturn false;} else if (state() == QAbstractSocket::ConnectingState || (state() == QAbstractSocket::BoundState && d->socketDescriptor != -1)) {connectToHost(d->peerAddress, d->peerPort);}return ret > 0;
}