C++编程:利用环形缓冲区优化 TCP 发送流程,避免 Short Write 问题

文章目录

    • 1. 什么是 Short Write 问题?
    • 2. 如何解决 Short Write 问题?
      • 2.1 方法 1:将 Socket 设置为阻塞模式
      • 2.2 方法 2:用户态维护发送缓冲区
    • 3. 用户态维护发送缓冲区实现
      • 3.1 核心要点
      • 3.2 代码实现
      • 3.3 测试程序
    • 参考文档

1. 什么是 Short Write 问题?

在 TCP 编程中,short write 问题指的是在调用 sendwrite 系统调用时,实际发送的数据量比预期要少。这通常是因为网络协议栈发送缓冲区的空间不足,导致不能一次性发送完整的数据。遇到这种情况时,系统调用会返回实际发送的字节数,并将 errno 设置为 EAGAIN,表示缓冲区没有足够的空间来继续发送数据。

2. 如何解决 Short Write 问题?

针对 EPOLL 模型 中的 LT(Level Triggered)模式,可以采取以下几种方案来解决 short write 问题:

2.1 方法 1:将 Socket 设置为阻塞模式

将 Socket 设置为阻塞模式时,send 系统调用会一直阻塞,直到有足够的缓冲区空间发送完整的数据。这种方法能够避免 short write 问题,但会导致线程阻塞,从而影响性能。因此,通常不推荐在高并发或需要高吞吐量的场景中使用此方法。

2.2 方法 2:用户态维护发送缓冲区

更推荐的方法是用户态维护一个发送缓冲区,并结合 EPOLLONESHOTEPOLLOUT 事件来控制数据发送。这种方法不需要阻塞线程,能够有效地处理 short write 问题。

3. 用户态维护发送缓冲区实现

3.1 核心要点

使用环形缓冲区来保存待发送的数据,当系统发送缓冲区不够时,数据会被存入环形缓冲区,并在后续的 EPOLLOUT 事件触发时继续发送。

  • 环形缓冲区设计

环形缓冲区(Circular Buffer)是一个固定大小的缓存,用于暂存数据。当数据无法完全写入网络协议栈缓冲区时,可以将其暂存,并在缓冲区有足够空间时继续写入。通过注册 EPOLLOUT 事件,当缓冲区有空闲空间时,程序可以重新尝试发送数据。

  • 数据收发管理类设计

    • asyncSend:当数据网络协议栈发送缓冲区没有足够空间时,会将数据存储到环形缓冲区,并通过 EPOLLONESHOTEPOLLOUT 事件确保数据能在后续时刻继续发送。

    • doSend:此函数会被 EPOLLOUT 事件触发,它从环形缓冲区中取出数据并尝试发送。如果发送成功,则释放相应的缓冲区空间;如果发送失败,且错误码为 EAGAINEINTER,则会重试。

3.2 代码实现

#include <atomic>
#include <cstring>
#include <memory>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstdio>
#include <mutex>
#include <cassert>
#include <fcntl.h>class LockFreeBytesBuffer {public:static const std::size_t kBufferSize = 10240U;  // 缓冲区大小LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) {std::memset(buffer_, 0, kBufferSize);}bool append(const char* data, std::size_t length) noexcept;std::size_t beginRead(const char** target) noexcept;void endRead(std::size_t length) noexcept;private:char buffer_[kBufferSize];std::atomic<std::size_t> reader_index_;std::atomic<std::size_t> writer_index_;
};bool LockFreeBytesBuffer::append(const char* data, std::size_t length) noexcept {const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed);const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire);const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize;if (length > free_space) {return false;  // 缓冲区满}const std::size_t pos = current_write_index % kBufferSize;const std::size_t first_part = std::min(length, kBufferSize - pos);std::memcpy(&buffer_[pos], data, first_part);std::memcpy(&buffer_[0], data + first_part, length - first_part);writer_index_.store(current_write_index + length, std::memory_order_release);return true;
}std::size_t LockFreeBytesBuffer::beginRead(const char** target) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire);const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize;if (available_data == 0U) {return 0U;  // 缓冲区空}const std::size_t pos = current_read_index % kBufferSize;*target = &buffer_[pos];return std::min(available_data, kBufferSize - pos);
}void LockFreeBytesBuffer::endRead(std::size_t length) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);reader_index_.store(current_read_index + length, std::memory_order_release);
}class SocketContext {public:SocketContext(int epoll_fd, int sock_fd): epoll_fd_(epoll_fd), sock_fd_(sock_fd) {setNonblocking();addFd();}~SocketContext() {removeFd();close(sock_fd_);}bool asyncSend(const char* data, int size) {bool result = buffer_.append(data, static_cast<std::size_t>(size));if (result) {modifyEvent(false, true);  // 修改 EPOLLONESHOT 和 EPOLLOUT}return result;}int doRecv() {char buffer[1024] = {};int count = read(sock_fd_, buffer, sizeof(buffer));if (count <= 0) {return count;  // 读取失败或连接关闭}modifyEvent(true, false);  // 恢复 EPOLLIN 事件fprintf(stderr, "Received: %s\n", buffer);return count;}int doSend() {const char* pdata = nullptr;std::size_t data_size = buffer_.beginRead(&pdata);if (data_size == 0) {return 0;  // 没有数据可以发送}int send_size = send(sock_fd_, pdata, static_cast<int>(data_size), MSG_DONTWAIT);if (send_size > 0) {buffer_.endRead(static_cast<std::size_t>(send_size));  // 更新已发送数据} else if (send_size == -1 && errno != EAGAIN) {fprintf(stderr, "send failed, error: %s\n", strerror(errno));}return send_size;}protected:void setNonblocking() {int flags = fcntl(sock_fd_, F_GETFL, 0);if (flags == -1) {fprintf(stderr, "fcntl GETFL failed: %s\n", strerror(errno));return;}fcntl(sock_fd_, F_SETFL, flags | O_NONBLOCK);}void addFd() {struct epoll_event event;event.data.ptr = this;event.events = EPOLLIN | EPOLLONESHOT | EPOLLOUT;if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, sock_fd_, &event) == -1) {fprintf(stderr, "epoll_ctl add failed: %s\n", strerror(errno));}}void removeFd() {epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, sock_fd_, nullptr);}inline void modifyEvent(bool in_event = true, bool out_event = true) {struct epoll_event event;event.data.ptr = this;event.events = EPOLLONESHOT;if (in_event) {event.events |= EPOLLIN;}if (out_event) {event.events |= EPOLLOUT;}epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, sock_fd_, &event);}private:int epoll_fd_;int sock_fd_;LockFreeBytesBuffer buffer_;
};

代码说明

  • 无锁环形缓冲区LockFreeBytesBuffer 类通过原子操作(std::atomic)来确保线程安全,避免了传统的锁机制。
    更多请见:C++生产者-消费者无锁缓冲区的简单实现
  • 事件驱动机制:通过 EPOLLINEPOLLOUT 事件来控制数据的接收和发送,避免了阻塞操作。
  • 非阻塞发送:通过 send 函数的 MSG_DONTWAIT 标志来确保发送操作不会阻塞,遇到 EAGAIN 错误时会重试。

在这里插入图片描述

3.3 测试程序

#include <iostream>
#include <memory>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <cassert>#define MAX_EVENTS 10int createServerSocket(int port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {fprintf(stderr, "socket creation failed: %s\n", strerror(errno));return -1;}sockaddr_in server_addr;std::memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {fprintf(stderr, "bind failed: %s\n", strerror(errno));close(sockfd);return -1;}if (listen(sockfd, 5) == -1) {fprintf(stderr, "listen failed: %s\n", strerror(errno));close(sockfd);return -1;}return sockfd;
}int main() {int epoll_fd = epoll_create1(0);if (epoll_fd == -1) {fprintf(stderr, "epoll_create1 failed: %s\n", strerror(errno));return -1;}int server_fd = createServerSocket(8080);if (server_fd == -1) {return -1;}// Set the server socket to non-blocking modefcntl(server_fd, F_SETFL, O_NONBLOCK);// Add server socket to epollstruct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = server_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {fprintf(stderr, "epoll_ctl: server_fd failed: %s\n", strerror(errno));return -1;}fprintf(stderr, "Server listening on port 8080...\n");while (true) {struct epoll_event events[MAX_EVENTS];int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < n; ++i) {if (events[i].data.fd == server_fd) {// Accept new client connectionint client_fd = accept(server_fd, NULL, NULL);if (client_fd == -1) {fprintf(stderr, "accept failed: %s\n", strerror(errno));continue;}fcntl(client_fd, F_SETFL, O_NONBLOCK);std::unique_ptr<SocketContext> client = std::make_unique<SocketContext>(epoll_fd, client_fd);ev.events = EPOLLIN | EPOLLOUT | EPOLLONESHOT;ev.data.ptr = client.get();if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {fprintf(stderr, "epoll_ctl: client_fd failed: %s\n", strerror(errno));}} else {SocketContext* client = static_cast<SocketContext*>(events[i].data.ptr);if (events[i].events & EPOLLIN) {client->doRecv();}if (events[i].events & EPOLLOUT) {client->doSend();}}}}close(server_fd);close(epoll_fd);return 0;
}

参考文档

  • tcp 解决short write问题
  • C++生产者-消费者无锁缓冲区的简单实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60275.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第十三届交通运输研究(上海)论坛┆智能网联汽车技术现状与研究实践

0.简介 交通运输研究&#xff08;上海&#xff09;论坛&#xff08;简称为TRF&#xff09;是按照国际会议的组织原则&#xff0c;为综合交通运输领域学者们构建的良好合作交流平台。交通运输研究&#xff08;上海&#xff09;论坛已经成功举办了十二届&#xff0c;凝聚了全国百…

中仕公考:大三大四考公备考时间线

大三大四想要考公务员的可以借鉴以下时间线&#xff1a; 大三下学期&#xff1a; 基础看课阶段&#xff0c;刚开始先打好基础很重要&#xff0c;根据课程和教材理解知识点&#xff0c;按照模块学习&#xff0c;对考试科目的题型有深入的认识和掌握。 大四初&#xff1a; 强…

Android Studio 将项目打包成apk文件

第一步&#xff1a;选择Build -> Generate Signed APK 会出现&#xff1a; 我们选择 Create new… 然后选择你要存放密钥的地方 点击ok之后&#xff0c;则选择好了文件&#xff0c;并生成了jks文件了。 点击ok之后&#xff0c; 会出现&#xff1a; 选择release&#xf…

FFmpeg存放压缩后的音视频数据的结构体:AVPacket简介,结构体,函数

如下图的解码流程&#xff0c;AVPacket中的位置 FFmpeg源码中通过AVPacket存储压缩后的音视频数据。它通常由解复用器&#xff08;demuxers&#xff09;输出&#xff0c;然后作为输入传递给解码器。 或者从编码器作为输出接收&#xff0c;然后传递给多路复用器&#xff08;mux…

纺织品缺陷检测

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【基于CNN-RNN的影像报告生成】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…

SSM房屋销售管理系统-计算机毕业设计源码49529

摘 要 随着国民生活水平的提高&#xff0c;人们日益重视休闲旅游&#xff0c;而互联网的普及也为房屋销售管理带来了新的机遇。将房屋租赁产业与互联网相结合&#xff0c;利用 SSM 框架建设房屋销售管理系统&#xff0c;实现房屋销售管理的网络化&#xff0c;对提高国民经济发展…

【PLC一体机】触摸屏上一直显示ERR警示灯的原因和解决方法

博主之前买了一个PLC一体机&#xff0c;在触摸屏上有ERR、RUN和SYS三个显示灯&#xff08;如下图&#xff09; 其中触摸屏工作时&#xff0c;RUN显示灯会显示为绿色&#xff0c;证明触摸屏正常工作。 SYS是触摸屏上刷写PLC程序和触摸屏程序时&#xff0c;会显示为橙色&#xf…

(十四)JavaWeb后端开发——MyBatis

目录 1.MyBatis概述 2.MyBatis简单入门 3.JDBC&#xff08;了解即可&#xff09; 4.数据库连接池​ 5.lombok 6.MyBatis基本操作 7.XML映射文件 8.动态SQL 8.1 if标签 8.2 foreach标签 8.3 sql/include标签​ 1.MyBatis概述 MyBatis是一款优秀的持久层&#xff08…

Elasticsearch可视化工具Elasticvue插件用法

目录 1.打开浏览器扩展程序(示例Edge浏览器) ​2.搜索elasticvue并安装 3.打开elasticvue ​4.连接Es 5.有些浏览器无法下载安装扩展&#xff0c;例如谷歌。可以打包扩展给别的浏览器使用。 5.1打开浏览器扩展&#xff0c;打开开发人员模式&#xff0c;记住扩展程序id 5…

数据结构之排序补充

1. 非比较排序 上一篇文章我们罗列了数据结构中排序的八种方法。这八种方法都是需要比较才能实现的&#xff0c;那怎么样才可以通过非比较的方法来实现数组的排序呢&#xff1f;这里就提供一种非比较排序的方法。 具体的操作思路如下&#xff1a; 1. 先统计待比较数组arr中重…

1.62亿元!812个项目立项!上海市2024年度“科技创新行动计划”自然科学基金项目立项

本期精选SCI&EI ●IEEE 1区TOP 计算机类&#xff08;含CCF&#xff09;&#xff1b; ●EI快刊&#xff1a;最快1周录用&#xff01; 知网(CNKI)、谷歌学术期刊 ●7天录用-检索&#xff08;100%录用&#xff09;&#xff0c;1周上线&#xff1b; 免费稿件评估 免费匹配期…

危!这些高危端口再不知道问题就大了

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 下午好&#xff0c;我的网工朋友。 端口作为网络通信的基本单元&#xff0c;用于标识网络服务和应用程序。 但某些端口由于其开放性和易受攻击的…

Excel中截取中文地址转换为省、市、区

使用方法/步骤 首先我们在网页打上方方格子&#xff0c;进入官网&#xff0c;下载方方格子。 解压后进行安装&#xff0c;打开OFFIE中的EXCEL&#xff0c;可以发现新新添加一个DIY工具箱&#xff0c;其中的提取地址功能可以将字符串地址解析为省、市、区 如下图所示

引入 axios,根据 api 文档生成调用接口

起步 | Axios Docs 安装 axios npm install axios 生成 api 调用接口【可选】 https://github.com/ferdikoomen/openapi-typescript-codegen 安装 npm install openapi-typescript-codegen --save-dev 然后执行生成代码 # http://localhost:8805/api/user/v3/api-docs&a…

2款使用.NET开发的数据库系统

今天大姚给大家分享2款使用.NET开发且开源的数据库系统。 Garnet Garnet是一款由微软研究院基于.NET开源的高性能、跨平台的分布式缓存存储数据库&#xff0c;该项目提供强大的性能&#xff08;吞吐量和延迟&#xff09;、可扩展性、存储、恢复、集群分片、密钥迁移和复制功能…

ARM-8 定位发布版本 pstree 程序的 main 地址

逆向时如何找到main&#xff0c;如下&#xff1a; 1.readelf -h pstree ELF Header: Magic: 7f 45 4c 46 02 01 01 00 00 00 00 00 00 00 00 00 Class: ELF64 Data: 2s complement, little endian Versi…

13.UE5流星火雨,引导施法技能制作

2-15 流星火雨&#xff0c;引导施法技能制作、随机数_哔哩哔哩_bilibili 目录 1.为流星火雨添加按键映射 2.创建流星火雨的动画蒙太奇 3.实现播放动画蒙太奇的逻辑 ​编辑 4.定义发射一波流星火雨的发射物 5.使用动画通知释放流星火雨 1.为流星火雨添加按键映射 创建名为流…

Web大型网站的性能测试要求和工具方法

Web大型网站的性能测试要求和工具方法涉及多个层面的考量&#xff0c;旨在确保网站在高并发访问、大数据量处理、复杂交互场景下仍能保持良好的用户体验和系统稳定性。以下是针对大型网站性能测试的主要要求和相应的工具与方法&#xff1a; 性能测试要求 1. 高并发处理能力&…

贪心算法day3(最长递增序列问题)

目录 1.最长递增三元子序列 2.最长连续递增序列 1.最长递增三元子序列 题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a;我们只需要设置两个数进行比较就好。设a为nums[0]&#xff0c;b 为一个无穷大的数&#xff0c;只要有比a小的数字就赋值…

在CentOS7传统部署wordpress

1 环境准备 所需环境说明CentOS7.9ip地址&#xff1a;10.0.0.7&#xff0c;可以上网PHP72系列软件下面会介绍MySQL数据库暴露端口3306&#xff0c;用户wordpress&#xff0c;库wordpressnginx版本任意wordpres v6.5.2代码下载地址&#xff1a;https://cn.wordpress.org/wordpr…