本文搭建一个Unix环境下的、局域网内的、简易的本地时间获取服务。
主要用于验证:
- 当TCP连接成功后,可以在两个线程中分别进行读操作、写操作动作
- 当客户端自行终止连接后,服务端会在写操作时收到 SIGPIPE 信号
- 当客户端执行shutdown写操作后,客户端会在写操作时收到 SIGPIPE 信号
- 当客户端执行shutdown写操作后,服务端会在读操作时得到返回值 0
服务端功能:
- 轮询监听Client的连接(阻塞式)
- 创建并缓存会话对象
- 开启会话对象的读操作线(阻塞式IO)、写操作线程(阻塞式IO)
- 当读写操作线程退出时通过回调来执行资源释放(fd,会话对象)
客户端功能:
- 连接成功后直接开启读操作线程(阻塞式IO)、写操作线程(阻塞式IO)
- 在2秒后shutdown写端
- 在3秒后退出工作线程
(本文对打印进行了加锁,确保输出信息看起来更清晰,否则信息会混乱交错)
服务端源码(局域网ip、端口port 按需自行修改噢😊):
// TimeServer.cpp#include <iostream>
#include <thread>
#include <vector>
#include <map>
#include <atomic>
#include <exception>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>#include "TimeConn.hpp"std::map<int, TimeConn> conn_map;int initServer(const std::string& ip, uint16_t port) {int server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (server == -1) {std::cout << "socket failed, errno: " << strerror(errno) << std::endl;_exit(0);}sockaddr_in addr{.sin_family = AF_INET,.sin_port = htons(port)};int success = inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);if (success == 0) {std::cout << "invalid ip address, errno: " << strerror(errno) << std::endl;_exit(0);} else if (success == -1) {std::cout << "inet_pton error, errno: " << strerror(errno) << std::endl;_exit(0);}success = bind(server, reinterpret_cast<const sockaddr*>(&addr), sizeof(sockaddr_in));if (success == -1) {std::cout << "bind failed, errno: " << strerror(errno) << std::endl;_exit(0);}success = listen(server, 50);if (success == -1) {std::cout << "listen failed, errno: " << strerror(errno) << std::endl;_exit(0);}return server;
}void handleConn(int conn) noexcept(false) {sockaddr client_addr;socklen_t len = sizeof(decltype(client_addr));// 读取连接建立时client的信息auto success = getpeername(conn, &client_addr, &len);if (success == 0) {if (client_addr.sa_family == AF_INET) {sockaddr_in* ipv4 = reinterpret_cast<sockaddr_in*>(&client_addr);std::cout << "client ip: " << std::hex << ipv4->sin_addr.s_addr << std::dec << " port: " << ipv4->sin_port << std::endl;}} else if (success == -1) {std::cout << "getpeername failed, errno: " << strerror(errno) << std::endl;close(conn);return;}TimeConn& timeConn = conn_map[conn];timeConn.initConnFd(conn);timeConn.startRead([&timeConn](int conn_fd){if (timeConn.canClose()) {close(conn_fd);conn_map.erase(conn_fd);}});timeConn.startWrite([&timeConn](int conn_fd){if (timeConn.canClose()) {close(conn_fd);conn_map.erase(conn_fd);}});
}int main(int argc, char* argv[]) {std::cout << "Hello, I am server" << std::endl;std::string ip{"192.168.0.110"};auto server = initServer(ip, 10080);while (true) {std::cout << "Server accepting..." << std::endl;int conn = accept(server, nullptr, nullptr);if (conn == -1) {if (errno == EAGAIN) {continue;} else {std::cout << "accept failed, errno: " << strerror(errno) << std::endl;_exit(0);}}std::cout << "new connect! conn fd: " << conn << std::endl;try {handleConn(conn);} catch (std::exception& e) {// std::cout << "handleConn exception: " << e.what() << std::endl;}}close(server);return 0;
}
服务端会话源码:
// TimeConn.hpp#ifndef __TIMECONN_HPP__
#define __TIMECONN_HPP__#include <unistd.h>
#include <atomic>
#include <thread>class TimeConn
{
public:TimeConn(int conn = -1) : mConnFd{conn}, isReading{false}, isWriting{false}{// ...};virtual ~TimeConn(){close(mConnFd);};// constexpr TimeConn &operator=(const TimeConn &);public:void initConnFd(int conn);void startRead(std::function<void(int)> callback);void startWrite(std::function<void(int)> callback);void stopRead();void stopWrite();bool canClose();private:int mConnFd;std::atomic_bool isReading;std::atomic_bool isWriting;
};#endif
// TimeConn.cpp#include <chrono>
#include <iostream>
#include <mutex>
#include <sstream>
#include <sys/socket.h>
#include <signal.h>#include "TimeConn.hpp"// constexpr TimeConn &TimeConn::operator=(const TimeConn & other) {
// this->mConnFd = other.mConnFd;
// return *this;
// }static std::mutex m;static void print_log(const std::stringstream& ss) {std::lock_guard<std::mutex> lock(m);std::cout << ss.str() << std::endl;
}void TimeConn::initConnFd(int conn) {this->mConnFd = conn;
}void TimeConn::startRead(std::function<void(int)> callback) {using namespace std::literals;isReading = true;std::thread([this, callback]{std::stringstream ss;ss << "conn fd: " << this->mConnFd << " start read";print_log(ss);while (this->isReading) {char buffer[512];ssize_t res = recv(this->mConnFd, &buffer, sizeof(buffer), 0);if (res == 0) {std::stringstream ss1;ss1 << "no data or remote end";print_log(ss1);break;} else if (res == -1) {// errorstd::stringstream ss2;ss2 << "conn fd:" << this->mConnFd << " recv failed: " << strerror(errno);print_log(ss2);break;} else {std::stringstream ss3;ss3 << "recv success, count: " << res << " data: " << buffer;print_log(ss3);}}this->isReading = false;callback(this->mConnFd);// 注意!// 在经过callback后,若map进行了erase操作,则该TimeConn obj内存被清除,this->mConnFd值是不确定的,大概率是0,但也可能已被其它值占用std::stringstream ss4;ss4 << "conn fd " << this->mConnFd << " Reading finish";print_log(ss4);}).detach();
}void TimeConn::startWrite(std::function<void(int)> callback) {using namespace std::literals;isWriting = true;std::thread([this, callback]{std::stringstream ss;ss << "conn fd: " << this->mConnFd << " start write";print_log(ss);// send 时若该连接已关闭,则会产生SIGPIPE信号,程序默认执行动作是“退出进程”// 解决方案一 使用signal忽略SIGPIPE// signal(SIGPIPE, SIG_IGN);while (this->isWriting) {const auto now = std::chrono::system_clock::now();const std::time_t t_c = std::chrono::system_clock::to_time_t(now);const auto* t = std::ctime(&t_c);ssize_t res = -1;// 发送数据// send 时若该连接已关闭,则会产生SIGPIPE信号,程序默认执行动作是“退出进程”// 解决方案二(操作系统受限) 若操作系统支持,可以加上flag MSG_NOSIGNALres = send(this->mConnFd, t, strlen(t) + sizeof('\0'), MSG_DONTROUTE | MSG_NOSIGNAL);if (res == -1) {// errorstd::stringstream ss1;ss1 << "conn fd:" << this->mConnFd << " send failed: " << strerror(errno);print_log(ss1);break;} else {std::stringstream ss2;ss2 << "send success, count: " << res << " data: " << t;print_log(ss2);}// std::this_thread::sleep_for(1s);}this->isWriting = false;callback(this->mConnFd);// 注意!// 在经过callback后,若map进行了erase操作,则该TimeConn obj内存被清除,this->mConnFd值是不确定的,大概率是0,但也可能已被其它值占用std::stringstream ss3;ss3 << "conn fd " << this->mConnFd << " Writing finish";print_log(ss3);}).detach();
}void TimeConn::stopRead() {isReading = false;
}void TimeConn::stopWrite() {isWriting = false;
}bool TimeConn::canClose() {// 鉴于该示例启动线程的时机与退出线程的时机比较简单,所以无需加锁return !isReading && !isWriting;
}
本文中使用的recv、send函数都是用阻塞式IO,所以相应的返回值处理都是按照阻塞式时的错误来进行处理的。若采用非阻塞式IO,则处理方式并不是如此的。