Linux编程:基于 Unix Domain Socket 的进程/线程间通信实时性优化

文章目录

    • 0. 引言
    • 1. 使用 `epoll` 边缘触发模式
      • 非不要不选择阻塞模式
      • 边缘触发(ET)模式
      • 优点
      • 示例
    • 2. 使用实时调度策略
    • 3. CPU 绑定
    • 4. 使用无锁缓冲区
    • 5. 优化消息传递的大小和频率
    • 6. 使用 `SO_RCVTIMEO` 和 `SO_SNDTIMEO`
    • 7. 示例代码
    • 其他阅读

0. 引言

前几天被问到“如何优化Linux中Domain Socket的线程间通信实时性?”当时的回答感觉不够好,经过仔细思考后,我整理出以下优化策略,考虑的是高并发和低延迟场景中的应用优化。

1. 使用 epoll 边缘触发模式

非不要不选择阻塞模式

阻塞式 read() 在单客户端的情况下,能够立即响应数据的到达,但其局限性在于:

  • 无法同时处理多个 I/O 操作。如果同时需要接收和发送数据,阻塞式 read() 会在读取数据时阻塞当前线程,直到数据可用,这使得线程无法在等待数据时执行其他任务(例如发送数据)。 也就是处理双向通信不够高效。
  • 阻塞导致线程空闲。即使线程处于阻塞状态,系统仍需要为其调度,但线程无法做任何实际工作。这样会浪费 CPU 时间,降低系统的响应性和资源利用率。

边缘触发(ET)模式

epoll边缘触发 模式(ET)在文件描述符的状态发生变化时仅触发一次事件。当状态从“不可读”变为“可读”时,epoll 只会通知一次,后续不会触发事件直到状态再次变化。这减少了重复触发事件的系统调用,降低了上下文切换的频率。

优点

  • 减少系统调用和上下文切换:边缘触发模式比水平触发模式(LT)减少了不必要的系统调用。
  • 更低延迟:每个事件只触发一次,避免了多次触发导致的等待时间。
  • 更高效率:配合非阻塞 I/O 使用,避免了重复的事件通知。

示例

struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;  // 设置为边缘触发模式
ev.data.fd = sockfd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {perror("epoll_ctl");exit(EXIT_FAILURE);
}

2. 使用实时调度策略

Linux 提供了 SCHED_FIFOSCHED_RR 等实时调度策略,可以降低调度延迟。通过 sched_setscheduler() 函数设置线程调度策略,有助于提升线程的响应速度。

struct sched_param param;
param.sched_priority = 99;  // 设置较高的优先级
sched_setscheduler(pid, SCHED_FIFO, &param);  // 设置实时调度策略

3. CPU 绑定

将线程绑定到特定的 CPU 核,减少跨核调度和缓存失效,降低延迟。

cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset);  // 将线程绑定到指定的 CPU 核
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);

4. 使用无锁缓冲区

使用无锁缓冲区可以减少CPU时间片切换次数:

  • 无锁队列:使用原子操作管理数据结构,避免传统锁机制的性能瓶颈,减少线程同步的开销。

实现请见:C++生产者-消费者无锁缓冲区的简单实现

5. 优化消息传递的大小和频率

每次发送或接收的数据大小直接影响通信延迟。频繁的小数据传输会增加 I/O 操作次数,导致延迟增加。优化措施包括:

  • 批量传输:将多个小消息合并为一个大消息,减少系统调用次数和上下文切换频率。
  • 调整缓冲区大小:根据应用需求调整套接字的发送和接收缓冲区大小,以避免缓冲区过小导致频繁的上下文切换。
int bufsize = 8192;  // 请根据实际设置合适的缓冲区大小
setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));

6. 使用 SO_RCVTIMEOSO_SNDTIMEO

SO_RCVTIMEOSO_SNDTIMEO 是用来防止套接字在接收或发送数据时无限期阻塞的选项。当设置了这些超时选项后,套接字在等待数据时会在超时后返回错误(如 EAGAINEWOULDBLOCK),从而提高应用程序的响应性。然而,这些选项不能直接解决由于 CPU 调度延迟引起的实时性问题。它们的作用仅仅是在指定时间内没有完成操作时返回错误,而不是保证操作在一定时间内完成。

// 设置接收超时时间
struct timeval recv_timeout = { 1, 0 }; // 1 seconds
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)) == -1) {perror("setsockopt SO_RCVTIMEO");close(sock);exit(EXIT_FAILURE);
}// 设置发送超时时间
struct timeval send_timeout = { 1, 0 }; // 1 seconds
if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout)) == -1) {perror("setsockopt SO_SNDTIMEO");close(sock);exit(EXIT_FAILURE);
}

7. 示例代码

// g++ -o uds_server uds_server.cpp -pthread
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/un.h>
#include <cstring>
#include <cerrno>
#include <atomic>
#include <pthread.h>
#include <sched.h>#define SOCKET_PATH "/tmp/uds_socket"
#define MAX_EVENTS 10
#define BUF_SIZE 1024
#define SOCKET_BACKLOG 5// 无锁环形缓冲区
class LockFreeBytesBuffer {
public:static const std::size_t kBufferSize = 10240U;  // 缓冲区大小LockFreeBytesBuffer() noexcept : readerIndex_(0U), writerIndex_(0U) {std::memset(buffer_, 0, kBufferSize);}bool append(const char* data, std::size_t length) noexcept;std::size_t beginRead(const char** target) noexcept;void endRead(std::size_t length) noexcept;private:char buffer_[kBufferSize];std::atomic<std::size_t> readerIndex_;std::atomic<std::size_t> writerIndex_;
};bool LockFreeBytesBuffer::append(const char* data, std::size_t length) noexcept {const std::size_t currentWriteIndex = writerIndex_.load(std::memory_order_relaxed);const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_acquire);const std::size_t freeSpace = (currentReadIndex + kBufferSize - currentWriteIndex - 1U) % kBufferSize;if (length > freeSpace) {return false;  // 缓冲区满}const std::size_t pos = currentWriteIndex % kBufferSize;const std::size_t firstPart = std::min(length, kBufferSize - pos);std::memcpy(&buffer_[pos], data, firstPart);std::memcpy(&buffer_[0], data + firstPart, length - firstPart);writerIndex_.store(currentWriteIndex + length, std::memory_order_release);return true;
}std::size_t LockFreeBytesBuffer::beginRead(const char** target) noexcept {const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_relaxed);const std::size_t currentWriteIndex = writerIndex_.load(std::memory_order_acquire);const std::size_t availableData = (currentWriteIndex - currentReadIndex) % kBufferSize;if (availableData == 0U) {return 0U;  // 缓冲区空}const std::size_t pos = currentReadIndex % kBufferSize;*target = &buffer_[pos];return std::min(availableData, kBufferSize - pos);
}void LockFreeBytesBuffer::endRead(std::size_t length) noexcept {const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_relaxed);readerIndex_.store(currentReadIndex + length, std::memory_order_release);
}// 设置套接字为非阻塞
int setSocketNonBlocking(int sockfd) {int flags = fcntl(sockfd, F_GETFL, 0);if (flags == -1) {fprintf(stderr, "Error getting socket flags: %s\n", strerror(errno));return -1;}if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {fprintf(stderr, "Error setting socket to non-blocking: %s\n", strerror(errno));return -1;}return 0;
}// 设置实时调度策略
void setRealTimeScheduling() {struct sched_param param;param.sched_priority = 99;  // 设置较高的优先级if (sched_setscheduler(0, SCHED_FIFO, &param) == -1) {fprintf(stderr, "Error setting real-time scheduler: %s\n", strerror(errno));exit(EXIT_FAILURE);}
}// 绑定线程到指定 CPU
void setThreadAffinity(int cpuId) {cpu_set_t cpuset;CPU_ZERO(&cpuset);CPU_SET(cpuId, &cpuset);if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {fprintf(stderr, "Error setting thread affinity: %s\n", strerror(errno));exit(EXIT_FAILURE);}
}// 处理新连接
void handleNewConnection(int epollFd, int sockfd) {struct epoll_event ev;int connfd = accept(sockfd, nullptr, nullptr);if (connfd == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {return;}fprintf(stderr, "Error accepting connection: %s\n", strerror(errno));return;}if (setSocketNonBlocking(connfd) == -1) {close(connfd);return;}ev.events = EPOLLIN | EPOLLET;  // 设置为边缘触发模式ev.data.fd = connfd;if (epoll_ctl(epollFd, EPOLL_CTL_ADD, connfd, &ev) == -1) {fprintf(stderr, "Error adding connection to epoll: %s\n", strerror(errno));close(connfd);}
}// 处理读取数据
void handleRead(int epollFd, struct epoll_event& event, LockFreeBytesBuffer& buffer) {char buf[BUF_SIZE];ssize_t nread = read(event.data.fd, buf, sizeof(buf));if (nread == -1) {if (errno != EAGAIN) {fprintf(stderr, "Error reading data: %s\n", strerror(errno));epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);close(event.data.fd);}} else if (nread == 0) {epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);close(event.data.fd);  // 连接关闭} else {fprintf(stdout, "Received data: %.*s\n", static_cast<int>(nread), buf);if (!buffer.append(buf, nread)) {fprintf(stderr, "Error appending to buffer: Buffer overflow!\n");}}
}// 处理写操作
void handleWrite(int epollFd, struct epoll_event& event, LockFreeBytesBuffer& buffer) {const char* data;std::size_t len = buffer.beginRead(&data);if (len > 0) {ssize_t nwrite = write(event.data.fd, data, len);if (nwrite == -1) {if (errno != EAGAIN) {fprintf(stderr, "Error writing data: %s\n", strerror(errno));epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);close(event.data.fd);}} else {buffer.endRead(nwrite);}}
}// 主函数
int main() {int sockfd, epollFd;struct sockaddr_un addr;struct epoll_event ev, events[MAX_EVENTS];// 设置实时调度setRealTimeScheduling();// 设置线程亲和性setThreadAffinity(0);  // 绑定到 CPU 0// 创建 Unix Domain Socketsockfd = socket(AF_UNIX, SOCK_STREAM, 0);if (sockfd == -1) {fprintf(stderr, "Error creating socket: %s\n", strerror(errno));return EXIT_FAILURE;}// 设置套接字为非阻塞if (setSocketNonBlocking(sockfd) == -1) {close(sockfd);return EXIT_FAILURE;}// 绑定套接字到文件路径std::memset(&addr, 0, sizeof(struct sockaddr_un));addr.sun_family = AF_UNIX;std::strcpy(addr.sun_path, SOCKET_PATH);unlink(SOCKET_PATH);if (bind(sockfd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1) {fprintf(stderr, "Error binding socket: %s\n", strerror(errno));close(sockfd);return EXIT_FAILURE;}// 监听连接请求if (listen(sockfd, SOCKET_BACKLOG) == -1) {fprintf(stderr, "Error listening on socket: %s\n", strerror(errno));close(sockfd);return EXIT_FAILURE;}// 创建 epoll 实例epollFd = epoll_create1(0);if (epollFd == -1) {fprintf(stderr, "Error creating epoll instance: %s\n", strerror(errno));close(sockfd);return EXIT_FAILURE;}// 将服务器套接字加入 epollev.events = EPOLLIN | EPOLLET;  // 边缘触发模式ev.data.fd = sockfd;if (epoll_ctl(epollFd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {fprintf(stderr, "Error adding socket to epoll: %s\n", strerror(errno));close(sockfd);close(epollFd);return EXIT_FAILURE;}LockFreeBytesBuffer buffer;// 主循环,等待并处理事件while (true) {int n = epoll_wait(epollFd, events, MAX_EVENTS, -1);if (n == -1) {fprintf(stderr, "Error in epoll_wait: %s\n", strerror(errno));break;}for (int i = 0; i < n; i++) {if (events[i].data.fd == sockfd) {// 处理新连接handleNewConnection(epollFd, sockfd);} else if (events[i].events & EPOLLIN) {// 处理读取数据handleRead(epollFd, events[i], buffer);} else if (events[i].events & EPOLLOUT) {// 处理写操作handleWrite(epollFd, events[i], buffer);}}}close(epollFd);close(sockfd);return EXIT_SUCCESS;
}

这个程序监听 Unix 域套接字 /tmp/uds_socket,能够处理多个客户端的连接,并异步地读取和写入数据:

  • 监听和接受连接:服务器首先通过 bindlisten 绑定套接字,然后通过 accept 等待来自客户端的连接。
  • 异步 I/O 事件处理:使用 epoll 来监听并处理事件(如接收数据、发送数据、错误等)。
  • epoll边缘触发:通过设置非阻塞 I/O 和边缘触发模式,程序能够高效地处理大量并发连接。
  • 缓冲区管理:使用环形缓冲区管理接收的数据。

其他阅读

  • 非Domain Socket的优化请参考:Linux编程:嵌入式ARM平台Linux网络实时性能优化
  • Linux 编程:高实时性场景下的内核线程调度与网络包发送优化
  • Linux I/O编程:I/O多路复用与异步 I/O对比

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59396.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++入门基础知识147—【关于C++ 一元运算符重载】

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C 一元运算符重载的相关内容&#xff0…

2022年蓝桥杯JavaB组 省赛 题目解析(含AC_Code)

目录 前言&#xff08;必读&#xff09;第一题&#xff1a;星期计算 &#xff08;简单&#xff09;问题描述思路AC代码总结 第二题 山 &#xff08;简单&#xff09;问题描述题目分析山形数定义解题思路代码实现解析代码详解回文和“山形”判断函数主函数 AC代码复杂度分析 总结…

NLP论文速读(微软出品)|使用GPT-4进行指令微调(Instruction Tuning with GPT-4)

论文速读|Instruction Tuning with GPT-4 论文信息&#xff1a; 简介&#xff1a; 这篇论文试图解决的问题是如何通过指令调优&#xff08;instruction-tuning&#xff09;提升大型语言模型&#xff08;LLMs&#xff09;在执行新任务时的零样本&#xff08;zero-shot&#xff0…

C++20 概念与约束(3)—— 约束的进阶用法

《C20 概念与约束&#xff08;1&#xff09;—— SFINAE》 《C20 概念与约束&#xff08;2&#xff09;—— 初识概念与约束》 ●《C20 概念与约束&#xff08;3&#xff09;—— 约束的进阶用法》 1、再谈约束主句与从句 上一篇文章中提到过约束可以无限嵌套。末尾也提到不…

c#使用COM接口设置excel单元格宽高匹配图片,如何计算?

c#使用COM接口设置excel单元格宽高如何换算 在实际工作中&#xff0c;经常需要在excel中插入图片。并设置单元格与图片对齐。但是excel单元格的宽度和高度使用不同的单位。单元格的宽度以字符宽度为单位&#xff0c;而高度以点为单位。如果按照实际值来设置&#xff0c;例如设…

【activiti工作流源码集成】springboot+activiti+mysql+vue+redis工作流审批流集成整合业务绑定表单流程图会签驳回

工作流集成实际项目案例&#xff0c;demo提供 源码获取方式&#xff1a;本文末个人名片直接获取。 前言 activiti工作流引擎项目&#xff0c;企业erp、oa、hr、crm等企事业办公系统轻松落地&#xff0c;请假审批demo从流程绘制到审批结束实例。 一、项目形式 springbootvue…

CKA认证 | Day2 K8s内部监控与日志

第三章 Kubernetes监控与日志 1、查看集群资源状态 在 Kubernetes 集群中&#xff0c;查看集群资源状态和组件状态是非常重要的操作。以下是一些常用的命令和解释&#xff0c;帮助你更好地管理和监控 Kubernetes 集群。 1.1 查看master组件状态 Kubernetes 的 Master 组件包…

推荐一款好用的postman替代工具2024

Apifox 是国内团队自主研发的 API 文档、API 调试、API Mock、API 自动化测试一体化协作平台&#xff0c;是非常好的一款 postman 替代工具。 它通过一套系统、一份数据&#xff0c;解决多个系统之间的数据同步问题。只要定义好接口文档&#xff0c;接口调试、数据 Mock、接口…

gdb调试redis。sudo

1.先启动redis-server和一个redis-cli。 2.ps -aux|grep reids查看redis相关进程。 3.开始以管理员模式附加进程调试sudo gdb -p 2968.注意这里不能不加sudo&#xff0c;因为Redis 可能以 root 用户启动&#xff0c;普通用户无法附加到该进程。否则就会出现可能下列情形&#…

YUM 的使用

YUM 是一个用于 Fedora 和 Red Hat 以及 CentOS 操作系统的前端软件包管理器&#xff0c;它可以自动处理依赖关系并一次性安装所有必需的软件包。 镜像站点选择 1. 备份原有的镜像源配置文件 系统默认的 yum 镜像源配置文件存储在 /etc/yum.repos.d/ 目录下&#xff0c;可以…

力扣 LeetCode 242. 有效的字母异位词(Day3:哈希表)

解题思路&#xff1a; 哈希表三种数据结构的选择 1. 数组&#xff1a;适用于数据量小的情况 2. set&#xff1a;适用于数据量大的情况 3. map&#xff1a;适用于key-value 什么时候用哈希表&#xff1f; 给你一个元素&#xff0c;判断该元素在这个集合里是否出现过 本题使…

【MYSQL】锁详解(全局锁、表级锁、行级锁)【快速理解】

目录 一、全局锁 二、表级锁 1.表锁 2.元数据锁 3.意向锁 三、行级锁 1. 行锁 2.间隙锁 3.临建锁 锁是处理并发情况下&#xff0c;对数据的一致性的关键因素&#xff0c;也是并发情况下对效率影响非常大的。 1、全局锁&#xff1a;锁定表中所有数据。 2、表级锁&#xff1a;…

蓝桥杯每日真题 - 第11天

题目&#xff1a;&#xff08;合并数列&#xff09; 题目描述&#xff08;14届 C&C B组D题&#xff09; 解题思路&#xff1a; 题意理解&#xff1a;给定两个数组&#xff0c;目标是通过若干次合并操作使两个数组相同。每次合并操作可以将数组中相邻的两个数相加&#xff…

contos7.9 部署3节点 hadoop3.4 集群 非高可用

contos7.9 部署3节点 hadoop3.4 集群 非高可用 contos7.9 部署3节点 hadoop3.4 集群 非高可用环境信息服务器角色分配服务器配置服务器配置初始化 init_server.sh配置主机名映射所有节点配置 hosts文件 配置免密登录 hadoop 安装环境配置下载安装包下载 jdk1.8hadoop3.4 分发安…

人工智能:重塑医疗、企业与生活的未来知识管理——以HelpLook为例

一、医疗行业&#xff1a;AI引领的医疗革新 随着人工智能&#xff08;AI&#xff09;技术的持续飞跃&#xff0c;我们正身处一场跨行业的深刻变革之中。在医疗健康的广阔舞台上&#xff0c;人工智能技术正扮演着日益重要的角色。它不仅能够辅助医生进行病例的精准诊断&#xf…

第四十五章 Vue之Vuex模块化创建(module)

目录 一、引言 二、模块化拆分创建方式 三、模块化拆分完整代码 3.1. index.js 3.2. module1.js 3.3. module2.js 3.4. module3.js 3.5. main.js 3.6. App.vue 3.7. Son1.vue 3.8. Son2.vue 四、访问模块module的state ​五、访问模块中的getters ​六、mutati…

论文笔记 SuDORMRF:EFFICIENT NETWORKS FOR UNIVERSAL AUDIO SOURCE SEPARATION

SUDORMRF: EFFICIENT NETWORKS FOR UNIVERSAL AUDIO SOURCE SEPARATION 人的精神寄托可以是音乐&#xff0c;可以是书籍&#xff0c;可以是运动&#xff0c;可以是工作&#xff0c;可以是山川湖海&#xff0c;唯独不可以是人。 Depthwise Separable Convolution 深度分离卷积&a…

69页可编辑PPT | 大数据基础知识培训课件

课件全面介绍了大数据的基础知识&#xff0c;包括大数据的定义、特征、发展演进、产业链、关键技术以及市场规模等多个方面&#xff0c;旨在为观众提供一个关于大数据领域的综合性概览。 大数据基本概念 广义的定义(哲学) :大数据&#xff0c;是指物理世界到数字世界的映射和提…

仓储管理系统-综合管理(源码+文档+部署+讲解)

本文将深入解析“仓储管理系统-综合管理”的项目&#xff0c;探究其架构、功能以及技术栈&#xff0c;并分享获取完整源码的途径。 系统概述 仓储管理系统-综合管理是一个全面的仓库管理解决方案&#xff0c;旨在通过集成多种功能模块来优化仓库操作和管理流程。该系统提供了…

MYSQL中的两种转义操作

在 MySQL 中&#xff0c;转义字符用于处理特殊字符,以防止语法错误或 SQL 注入攻击,而单双引号都是需要重点注意的字符 可以用转义符\ 和 两个连续的引号 来起到转义引号的作用 转义符转义: 这是users表中的数据 如果查询admin 或者 admin" 用户,可以用转义符\ 两个连…