网络编程 io_uring

io_uring

1、概述

io_uring是Linux(内核版本在5.1以后)在2019年加入到内核中的一种新型的异步I/O模型;

io_uring使用共享内存,解决高IOPS场景中的用户态和内核态的切换过程,减少系统调用;用户可以直接向共享内存提交要发起的I/O操作,内核线程可以直接获取共享内存中的I/O操作,并进行相应的读写操作;io_uring是一种proactor模式的网络架构;

  • Reactor 是非阻塞同步网络模式,感知的是就绪可读写事件。在每次感知到有事件发生(比如可读就绪事件)后,就需要应用进程主动调用 read 方法来完成数据的读取,也就是要应用进程主动将 socket 接收缓存中的数据读到应用进程内存中,这个过程是同步的,读取完数据后应用进程才能处理数据。

  • Proactor 是异步网络模式, 感知的是已完成的读写事件。在发起异步读写请求时,需要传入数据缓冲区的地址(用来存放结果数据)等信息,这样系统内核才可以自动帮我们把数据的读写工作完成,这里的读写工作全程由操作系统来做,并不需要像 Reactor 那样还需要应用进程主动发起 read/write 来读写数据,操作系统完成读写工作后,就会通知应用进程直接处理数据。

优点
  • 避免了提交I/O事件和完成事件中存在的内存拷贝(使用共享内存)

  • 减少的了I/O任务提交和完成事件任务是的系统调用过程

  • 采取无锁队列,减少了锁资源的竞争

主要内存结构
  • 提交队列(Submission Queue,SQ)连续的内存空间,环形队列,存放将要执行的I/O操作数据
  • 完成队列(Completion Queue, CQ)连续的内存空间,环形队列,存放执行完成I/O操作后的返回结果
  • 提交队列项数组提(Submission Queue Entry,SQE):方便通过环形缓冲区提交内存请求
2、主要接口

io_uring提供三个用户态的系统调用接口

  1. io_uring_setup:初始化一个新的io_uring对象,一个SQ和一个CQ,通过使用共享内存进行数据操作
  2. io_uring_register:注册用于异步I/O的文件或用户缓冲区(buffers)
  3. io_uring_enter:提交I/O任务,等待I/O完成

在这里插入图片描述

SQ和CQ保存的都是SQEs数据的索引,不是真正的请求,真实是请求保存在SQE数组中,在提交请求时可以批量提交一组SQE数值上不连续的请求;

SQ、CQ、SQE中的内存区域都是有内核进行分配的,用户初始化会返回对应的fd,通过fd进行mmap和内核共享内存空间;

3、第三方库

liburing通过对io_uring进行分装,提供了一个简单的API,通过一下命令可以安装该动态库

git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新动态库连接缓存
4、主要使用流程
1. io_uring初始化

io_uring通过io_uring_setup函数初始化,在liburing库中,通过io_uring_queue_init_params函数进行初始化,创建sumbmit队列和complete队列,以及SQE内存数组;

//io_uring实现异步的方式
struct io_uring_params pragma;
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 创建submit队列和complite队列
io_uring_queue_init_params(1024, &ring, &pragma);
2. io_uring 提交(注册)到SQ环形队列

io_uring通过io_uring_register函数提交(注册)到用于异步I/O的缓冲区中,在liburing中通过io_uring_prep_accept函数对io_uring_refister进行封装使用;

// 获取ringbuffer的头
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 注册一个I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
3. io_uring_enter 提交I/O

io_uring中通过io_uring_enter函数来提交I/O,并等待事件的完成;在liburing中通过io_uring_submit来提交SQE的读写请求,io_uring_wait_cqe来等待I/O的处理结果,io_uring_peek_batch_cqe来获取CQ中的处理结果;

 // 提交worker中执行
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待complete队列中的结果
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
// 获取CQ环形队列中的处理结果
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
5、实现

io_uring_server.c

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>enum event_type {EVENT_ACCEPT,EVENT_READ,EVENT_WRITE
};typedef struct connect_info{int conn_fd;int event;
}connect_info_t;struct conn_info {int fd;int event;
};int init_server(unsigned short port) 
{   int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {perror("socket");return -1;}struct sockaddr_in serveraddr;;serveraddr.sin_family = AF_INET;serveraddr.sin_port = htons(port);serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {perror("bind error");return -1;}int opt = 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {perror("setsockopt");return -1;}listen(sockfd, 10);return sockfd; 
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_READ};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event recv----\n");return 0;
}int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_WRITE};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event send----\n");return 0;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,socklen_t *addrlen, int flags) {// 获取sqestruct io_uring_sqe *sqe = io_uring_get_sqe(ring);// 初始化accept_infoconnect_info_t accept_info = {sockfd, EVENT_ACCEPT};// 准备accept操作io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);// 设置用户数据memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event accept\n");return 0;
}int main(int argc, char *argv[])
{// 初始化服务器unsigned short port = 9999;// 初始化服务器int socketfd = init_server(port);if (socketfd < 0)return -1;//io_uring实现异步的方式struct io_uring_params pragma;// 初始化io_uring 创建submit队列和complite队列memset(&pragma, 0, sizeof(pragma));struct io_uring ring;io_uring_queue_init_params(1024, &ring, &pragma);struct sockaddr_in clientaddr;socklen_t addrlen = sizeof(struct sockaddr);// 提交到submit队列中set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);char buffer[1024] = {0};while (1){// 提交worker中执行io_uring_submit(&ring);printf("complete\n");struct io_uring_cqe *cqe;//等待complete队列中的结果io_uring_wait_cqe(&ring, &cqe);printf("complete end\n");struct io_uring_cqe *cqes[128];int count = io_uring_peek_batch_cqe(&ring, cqes, 128);for (int i = 0; i < count; i++){struct io_uring_cqe *entries = cqes[i];connect_info_t result;//struct conn_info result;memcpy(&result, &entries->user_data, sizeof(connect_info_t));if (result.event == EVENT_ACCEPT) {// 设置读事件set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);printf("accept success\n");int conn_fd = entries->res;printf("conn_fd = %d  res = %d\n", conn_fd, entries->res);// 设置读事件set_event_recv(&ring, conn_fd, buffer, 1024,0);}else if (result.event == EVENT_READ){int ret = entries->res;printf("set_event_recv ret: %d, %s\n", ret, buffer);if (ret == 0){close(result.conn_fd);continue;}else if (ret > 0){// 设置写事件set_event_send(&ring, result.conn_fd, buffer, ret,0);}printf("read success\n");}else if (result.event == EVENT_WRITE){int ret = entries->res;set_event_recv(&ring, result.conn_fd, buffer, 1024,0);printf("write success\n");}}io_uring_cq_advance(&ring, count);}return 0;
}

io_uring_test.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>#include <sys/socket.h>
#include <arpa/inet.h>#define TIMESUB_MS(tv1, tv2)  (((tv2).tv_sec - (tv1).tv_sec) * 1000 + ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE   "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048typedef struct test_conttext
{char server_ip[16];int server_port;int thread_num;int connection_num;int request_num;int fail_num;
} test_conttext_t;int send_recv_tcp(int sockfd)
{char wbuffer[WBUFFER_LENGTH];char rbuffer[RBUFFER_LENGTH];memset(wbuffer, 0, sizeof(wbuffer));memset(rbuffer, 0, sizeof(rbuffer));for (int i = 0; i < 8; i++){strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res = send(sockfd, wbuffer, strlen(wbuffer), 0);if (res <= 0){return -1;}res = recv(sockfd, rbuffer, sizeof(rbuffer), 0);if (res <= 0){return -1;}if (strcmp(rbuffer, wbuffer) != 0){printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);return -1;}return 0;
}int connect_tcpserver(char *ip, int port)
{int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){perror("socket");return -1;}struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(port);server_addr.sin_addr.s_addr = inet_addr(ip);if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0){perror("connect");close(sockfd);return -1;}return sockfd;
}static void *test_qps(void *arg)
{test_conttext_t *ctx = (test_conttext_t *)arg;int sockfd = connect_tcpserver(ctx->server_ip, ctx->server_port);if (sockfd < 0){printf("connect server failed\n");return NULL;}int conut = ctx->request_num / ctx->connection_num;int indx = 0;int res;while (indx++ < conut){res = send_recv_tcp(sockfd);if (res < 0){printf("send_recv_tcp failed\n");ctx->fail_num++;continue;}}return NULL;
}int main(int argc, char *argv[])
{int i;printf("----%d\n", argc);// for (i = 1; i < argc; i++)//     printf("%s\n", argv[i]);test_conttext_t ctx = {0};int opt;while ((opt = getopt(argc, argv, "s:p:t:c:n:")) != -1){switch (opt){case 's':strcpy(ctx.server_ip, optarg);printf("-s: %s\n", optarg);break;case 'p':ctx.server_port = atoi(optarg);printf("-p: %s\n", optarg);break;case 't':ctx.thread_num = atoi(optarg);printf("-t: %s\n", optarg);break;case 'c':ctx.connection_num = atoi(optarg);printf("-c: %s\n", optarg);break;case 'n':ctx.request_num = atoi(optarg);printf("-n: %s\n", optarg);break;default:return EXIT_FAILURE;}}pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);struct timeval start, end;gettimeofday(&start, NULL);for (i = 0; i < ctx.thread_num; i++){printf("thread %d pthread_create\n", i);pthread_create(&threads[i], NULL, test_qps, &ctx);}for (i = 0; i < ctx.thread_num; i++){pthread_join(threads[i], NULL);printf("thread %d finished\n", i);}gettimeofday(&end, NULL);int time_used = TIMESUB_MS(start, end);printf("success :%d, failed:%d,  time used: %d , qps %d\n", ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);free(threads);return EXIT_SUCCESS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/713948.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity(第二十二部)官方的反向动力学一般使用商城的IK插件,这个用的不多

反向动力学&#xff08;Inverse Kinematic&#xff0c;简称IK&#xff09;是一种通过子节点带动父节点运动的方法。 正向动力学 在骨骼动画中&#xff0c;大多数动画是通过将骨架中的关节角度旋转到预定值来生成的&#xff0c;子关节的位置根据父关节的旋转而改变&#xff0c;这…

Vmware esxi虚拟主机状态无效,无法注销重启等操作修复解决

问题 装有ESXI系统的服务器在强制关机启动后&#xff0c;显示虚拟机状态是无效的&#xff0c;并且无法进行任何操作。 解决办法 对出问题的虚拟机重新注册 1、开启esxi系统的ssh功能 2、取消注册出问题的虚拟机 找到问题的虚拟机 [rootlocalhost:~] vim-cmd vmsvc/getal…

烧脑问题解决办法:如何选择一款合适自己的手机流量卡

现在社会人们越来越离不开手机了&#xff0c;手机给我们生活带来了翻天覆地的变化&#xff0c;手机需要最多的就是流量了&#xff0c;所以选择一款合适自己的手机流量卡就显得尤为重要了&#xff0c;今天小编就给大家来分享一下我的经验&#xff0c;希望对大家能有帮助&#xf…

【Web安全靶场】sqli-labs-master 54-65 Challenges 与62关二分法和like模糊搜索

sqli-labs-master 54-65 Challenges 其他关卡和靶场见专栏… 文章目录 sqli-labs-master 54-65 Challenges第五十四关-联合注入第五十五关-联合注入第五十六关-联合注入第五十七关-联合注入第五十八关-报错注入第五十九关-报错注入第六十关-报错注入第六十一关-报错注入第六十…

Mysql与StarRocks语法上的不同

&#x1f413; 序言 StarRocks 是新一代极速全场景 MPP (Massively Parallel Processing) 数据库。StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。用户无需经过复杂的预处理&#xff0c;可以用StarRocks 来支持多种数据分析场景的极速分析。 &#x1f413; 语法…

嵌入式驱动学习第一周——linux的休眠与唤醒

前言 本文介绍进程的休眠与唤醒。 嵌入式驱动学习专栏将详细记录博主学习驱动的详细过程&#xff0c;未来预计四个月将高强度更新本专栏&#xff0c;喜欢的可以关注本博主并订阅本专栏&#xff0c;一起讨论一起学习。现在关注就是老粉啦&#xff01; 行文目录 前言1. 阻塞和非阻…

第二节 数学知识补充

一、线性代数 向量的 L 2 L_2 L2​范数&#xff08;Euclidean范数/Frobenius范数&#xff09;&矩阵的元素形式范数 向量的 L 2 L_2 L2​范数&#xff1a; ∣ ∣ x ∣ ∣ 2 ( ∣ x 1 ∣ 2 ⋯ ∣ x m ∣ 2 ) 1 2 ||x||_2(|x_1|^2\cdots|x_m|^2)^{\frac12} ∣∣x∣∣2​(∣…

电脑桌面便签哪个好,好用的电脑桌面便签推荐

在如今信息爆炸的时代&#xff0c;人们的工作和生活节奏越来越快&#xff0c;记事和备忘变得尤为重要。而电脑桌面便签作为一种方便快捷的记录工具&#xff0c;备受广大用户青睐。那么&#xff0c;电脑桌面便签哪个好&#xff0c;哪个更加出色呢&#xff1f; 作为一名人事专员…

CryoEM - 使用 cryoSPARC 基于单颗粒图像从头重构蛋白质三维结构

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://blog.csdn.net/caroline_wendy/article/details/136384544 基于冷冻电镜单颗粒图像重构蛋白质三维结构,利用冷冻电镜技术测定生物大分子结构的方法。原理是从冷冻电镜获得大量同一种蛋白质分子的二维投影图…

【数据结构】实现队列

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解队列&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 一. 队列的概念及结构二. 队列的实现队列的结构体初始化销毁队尾插入队头删除显示第一个节点的值…

【Java程序员面试专栏 算法思维】六 高频面试算法题:动态规划

一轮的算法训练完成后,对相关的题目有了一个初步理解了,接下来进行专题训练,以下这些题目就是汇总的高频题目,本篇主要聊聊回溯算法,主要就是排列组合问题,所以放到一篇Blog中集中练习 题目关键字解题思路时间空间零钱兑换动态规划+双重循环dp[i]表示兑换金额为i元的最少…

iOS消息转发流程

当向Objc对象发送消息时&#xff0c;如果找到对象对应的方法&#xff0c;就会进入消息转发流程&#xff0c;给开发者提供一些最后的机会处理消息无法发送问题&#xff0c;以免出现程序崩溃。 1. 回调对象的resolveInstanceMethod方法&#xff0c;在这个方法中&#xff0c;允许开…

阿里云定价_ECS产品价格_云服务器收费标准 - 阿里云官方活动

2024年最新阿里云服务器租用费用优惠价格表&#xff0c;轻量2核2G3M带宽轻量服务器一年61元&#xff0c;折合5元1个月&#xff0c;新老用户同享99元一年服务器&#xff0c;2核4G5M服务器ECS优惠价199元一年&#xff0c;2核4G4M轻量服务器165元一年&#xff0c;2核4G服务器30元3…

基于 LLaMA 和 LangChain 实践本地 AI 知识库

有时候,我难免不由地感慨,真实的人类世界,本就是一个巨大的娱乐圈,即使是在英雄辈出的 IT 行业。数日前,Google 正式对外发布了 Gemini 1.5 Pro,一个建立在 Transformer 和 MoE 架构上的多模态模型。可惜,这个被 Google 寄予厚望的产品并未激起多少水花,因为就在同一天…

S2---FPGA-A7板级原理图硬件实战

视频链接 FPGA-A7板级系统硬件实战01_哔哩哔哩_bilibili FPGA-A7板级原理图硬件实战 基于XC7A100TFGG484的FPGA硬件设计流程图 A7核心板&#xff0c;是基于XILINX公司的ARTIX-7系列100T的XC7A100T,2FGG484I这款芯片开发的高性能核心板&#xff0c;具有高速&#xff0c;高带宽&a…

Android 签名机制

V1是内部文件单个签 但是增加apk文件目录下面随意增加文件并不会有影响,它只关心meta-info文件 mf汇总清单的各个文件sha256 V2 整个APK文件,按文件进行hash 那么便不能随便在这里面增加文件了,增加了签名分块&#xff08;不然签名信息存哪里&#xff09;这里涉及一个文件概念 …

如何修炼成“神医”——《OceanBase诊断系列》之一

本系列是基于OcenaBase 开发工程师在工作中的一些诊断经验&#xff0c;也欢迎大家分享相关经验。 1. 关于神医的故事 扁鹊&#xff0c;中国古代第一个被正史记载的医生&#xff0c;他的成才之路非常传奇。年轻时&#xff0c;扁鹊是一家客栈的主管。有一位名叫长桑君的客人来到…

性能优化篇(二) 静态合批步骤与所有注意事项\游戏运行时使用代码启动静态合批

静态合批步骤: 1.开启Project Settings —>Player–>Other Setting里勾选Static Batching选项(一般情况下unity都是默认勾选状态) 2.勾选需要合批的静态物体上的Batching Static项,勾选后此物体下的所有子物体都默认参与静态合批(勾选后物体不能进行移动/旋转/缩放操作,…

02-设计概述

上一篇&#xff1a;01-导言 本章重点讨论 JNI 中的主要设计问题。本节中的大多数设计问题都与本地方法有关。调用 API 的设计将在第 5 章&#xff1a;调用 API 中介绍。 2.1 JNI 接口函数和指针 本地代码通过调用 JNI 函数来访问 Java 虚拟机功能。JNI 函数可通过接口指针使用…

LeetCode383. 赎金信(C++)

LeetCode383. 赎金信 题目链接代码 题目链接 https://leetcode.cn/problems/ransom-note/description/ 代码 class Solution { public:bool canConstruct(string ransomNote, string magazine) {int record[26] {0};if(ransomNote.size() > magazine.size()) return fa…