quinn源码解析:QUIC数据包是如何发送的

quinn源码解析:QUIC数据包是如何发送的

  • 简介
  • QUIC协议中的概念
    • endpoint(端点)
    • connection(连接)
    • Stream(流)
    • Frame (帧)
  • 发包过程解析
    • SendStream::write_all
    • ConnectionDriver
    • EndpointDriver

简介

quinn是Rust编程语言中用于实现QUIC(Quick UDP Internet Connections)协议的一个crate(包)。它提供了一个高级别的API,用于构建基于QUIC的网络应用程序。quinn crate的设计目标是提供一个简洁、安全和高性能的QUIC实现。它内部使用了Rust的异步编程模型(async/await),使得编写异步网络代码更加方便和高效。
本文主要介绍其发送数据的流程

QUIC协议中的概念

endpoint(端点)

在QUIC(Quick UDP Internet Connections)协议中,Endpoint(端点)是指QUIC连接的一端,可以是客户端或服务器。每个端点都有自己的网络地址,并与其他端点进行通信以建立和管理QUIC连接。在quinn中,endpoint对应一个操作系统的socket。例如client的Endpoint创建时就是bind了一个本地的地址。

    pub fn client(addr: SocketAddr) -> io::Result<Self> {let socket = std::net::UdpSocket::bind(addr)?;let runtime = default_runtime().ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no async runtime found"))?;Self::new_with_runtime(EndpointConfig::default(),None,runtime.wrap_udp_socket(socket)?,runtime,)}

connection(连接)

两个endpoint之间可以建立connection,并且一个endpoint可以向多个endpoint建立连接。

注意与TCP不同的是,QUIC的一个socket可以同时向多个其他socket建立连接。而TCP中每一个连接都对应client和server端的两个socket。

在这里插入图片描述

Stream(流)

一条连接可以同时存在多条流,每条流上的数据相互独立,一条流发生阻塞不会影响其他流。(TCP相当于只有一条流,所以会有对头阻塞的缺陷。)

client的流ID为奇数,server的流ID为偶数

在这里插入图片描述

Frame (帧)

流是抽象出的概念,而实际上在链路上传输的只是不同的帧,不同流的帧中会有流ID用于标识此帧属于哪条流,接收端收到后根据流ID将对应的帧放入对应的流缓冲区。
在这里插入图片描述

发包过程解析

以官方的client Example为例。其关键步骤如下述伪代码所示,主要包括:创建endpoint、创建连接、创建流、最后写入数据。

 //创建endpointlet mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?; ...//创建连接let conn = endpoint.connect(remote, host)?.await.map_err(|e| anyhow!("failed to connect: {}", e))?;//创建流let (mut send, mut recv) = conn.open_bi().await.map_err(|e| anyhow!("failed to open stream: {}", e))?;//写数据send.write_all(request.as_bytes()).await.map_err(|e| anyhow!("failed to send request: {}", e))?;

SendStream::write_all

首先我们以流写入数据为切入点来看。
write_all接口实际上是产生了一个WriteAllFuture,数据会暂时放在WriteAll结构体里。当Runtime(默认为Tokio的运行时)下一次pollFuture时才会将数据写入到该流的缓冲区中。

impl<'a> Future for WriteAll<'a> {type Output = Result<(), WriteError>;fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {let this = self.get_mut();loop {if this.buf.is_empty() {return Poll::Ready(Ok(()));}let buf = this.buf;#将数据写入缓冲区let n = ready!(this.stream.execute_poll(cx, |s| s.write(buf)))?;this.buf = &this.buf[n..];}}
}

注意向流的缓冲区写数据时,是经过了流控逻辑的:当可写空间为0时,写操作会被block。可写空间一般由send_window-unacked_datasend_windowunacked_data都是连接级的,所有流都受此限制。send_window是开始时设置的,此值决定整个连接的发送缓冲区的峰值大小。当应用连接数较多时应该谨慎设置此值,避免因内存占用过多而引起OOM。

    /// Returns the maximum amount of data this is allowed to be written on the connectionpub(crate) fn write_limit(&self) -> u64 {(self.max_data - self.data_sent).min(self.send_window - self.unacked_data)}

写入的数据最终又被暂时放在SendBufferunacked_segments里。

impl SendBuffer {/// Append application data to the end of the streampub(super) fn write(&mut self, data: Bytes) {self.unacked_len += data.len();self.offset += data.len() as u64;self.unacked_segments.push_back(data);}
}

到这里,write_all这个操作就算是结束了。那么放入缓冲区的数据又是如何进一步被发送的呢?

ConnectionDriver

我们把视线回到 endpoint.connect(remote, host)?.await,在连接建立时,产生了一个ConnectionDriverFuture,此ConnectionDriver一产生就被丢进runtime中去持续地执行了。

        runtime.spawn(Box::pin(ConnectionDriver(conn.clone()).instrument(Span::current()),));

而这个ConnectionDriver在被poll时最终会调用Connection::poll_transmit–>Connection::populate_packet获取将要发送的帧

    fn populate_packet(&mut self,now: Instant,space_id: SpaceId,buf: &mut BytesMut,max_size: usize,pn: u64,) -> SentFrames {let mut sent = SentFrames::default();......// STREAMif space_id == SpaceId::Data {sent.stream_frames = self.streams.write_stream_frames(buf, max_size);self.stats.frame_tx.stream += sent.stream_frames.len() as u64;}sent}

StreamsState::write_stream_frames方法中从优先级队列中取出优先级最高的流并将其数据写入buf,如果流的数据都已发送完毕则将此流从优先级队列中取出。

pub(crate) fn write_stream_frames(&mut self,buf: &mut BytesMut,max_buf_size: usize,) -> StreamMetaVec {let mut stream_frames = StreamMetaVec::new();while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {if max_buf_size.checked_sub(buf.len() + frame::Stream::SIZE_BOUND).is_none(){break;}//不同优先级的数量let num_levels = self.pending.len();//获取优先级最高的队列let mut level = match self.pending.peek_mut() {Some(x) => x,None => break,};// Poppping data from the front of the queue, storing as much data// as possible in a single frame, and enqueing sending further// remaining data at the end of the queue helps with fairness.// Other streams will have a chance to write data before we touch// this stream again.//从队列中拿到第一个流let id = match level.queue.get_mut().pop_front() {Some(x) => x,None => {debug_assert!(num_levels == 1,"An empty queue is only allowed for a single level");break;}};//拿到具体的流let stream = match self.send.get_mut(&id) {Some(s) => s,// Stream was reset with pending data and the reset was acknowledgedNone => continue,};// Reset streams aren't removed from the pending list and still exist while the peer// hasn't acknowledged the reset, but should not generate STREAM frames, so we need to// check for them explicitly.if stream.is_reset() {continue;}// Now that we know the `StreamId`, we can better account for how many bytes// are required to encode it.let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into());//从流中获取到本次要写的偏移量let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size);//如果流中的数据都已经发送完,则将此流从pending队列中移除let fin = offsets.end == stream.pending.offset()&& matches!(stream.state, SendState::DataSent { .. });if fin {stream.fin_pending = false;}if stream.is_pending() {if level.priority == stream.priority {// Enqueue for the same levellevel.queue.get_mut().push_back(id);} else {// Enqueue for a different level. If the current level is empty, drop itif level.queue.borrow().is_empty() && num_levels != 1 {// We keep the last level around even in empty form so that// the next insert doesn't have to reallocate the queuePeekMut::pop(level);} else {drop(level);}push_pending(&mut self.pending, id, stream.priority);}} else if level.queue.borrow().is_empty() && num_levels != 1 {// We keep the last level around even in empty form so that// the next insert doesn't have to reallocate the queuePeekMut::pop(level);}let meta = frame::StreamMeta { id, offsets, fin };trace!(id = %meta.id, off = meta.offsets.start, len = meta.offsets.end - meta.offsets.start, fin = meta.fin, "STREAM");//写入帧的头部meta.encode(encode_length, buf);// The range might not be retrievable in a single `get` if it is// stored in noncontiguous fashion. Therefore this loop iterates// until the range is fully copied into the frame.let mut offsets = meta.offsets.clone();while offsets.start != offsets.end {let data = stream.pending.get(offsets.clone());offsets.start += data.len() as u64;//写入具体数据buf.put_slice(data);}stream_frames.push(meta);}stream_frames}

到了这里,要发送的数据实际上还是暂存在缓冲区了。然后又以EndpointEvent::Transmit事件的方式通过channel发送到endpoint的协程里。

fn drive_transmit(&mut self) -> bool {let now = Instant::now();let mut transmits = 0;let max_datagrams = self.socket.max_transmit_segments();let capacity = self.inner.current_mtu();let mut buffer = BytesMut::with_capacity(capacity as usize);while let Some(t) = self.inner.poll_transmit(now, max_datagrams, &mut buffer) {transmits += match t.segment_size {None => 1,Some(s) => (t.size + s - 1) / s, // round up};// If the endpoint driver is gone, noop.let size = t.size;//将要发送的数据发送到endpoint协程let _ = self.endpoint_events.send((self.handle,EndpointEvent::Transmit(t, buffer.split_to(size).freeze()),));if transmits >= MAX_TRANSMIT_DATAGRAMS {// TODO: What isn't ideal here yet is that if we don't poll all// datagrams that could be sent we don't go into the `app_limited`// state and CWND continues to grow until we get here the next time.// See https://github.com/quinn-rs/quinn/issues/1126return true;}}false}

ConnectionDriver的任务到这里就完成了,总的来说ConnectionDriver的任务就是从流中取出数据,并最终将数据通过channel发送给endpoint

EndpointDriver

connection的逻辑类似,endpoints建立时就已经spawn了一个EndpointDriver在后台一直poll,正是在poll方法中会处理来自ConnectionDriver发来的events,并写入outgoing缓冲区中。

    fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {use EndpointEvent::*;for _ in 0..IO_LOOP_BOUND {match self.events.poll_recv(cx) {Poll::Ready(Some((ch, event))) => match event {......//接受从ConnectionDriver发过来的Transmit,并写入到outgoing缓冲区中Transmit(t, buf) => {let contents_len = buf.len();self.outgoing.push_back(udp_transmit(t, buf));self.transmit_queue_contents_len = self.transmit_queue_contents_len.saturating_add(contents_len);}},Poll::Ready(None) => unreachable!("EndpointInner owns one sender"),Poll::Pending => {return false;}}}true}

drive_send中从outgoing缓冲区中取出数据并写入socket

 fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {self.send_limiter.start_cycle();let result = loop {if self.outgoing.is_empty() {break Ok(false);}if !self.send_limiter.allow_work() {break Ok(true);}//实际写入match self.socket.poll_send(cx, self.outgoing.as_slices().0) {Poll::Ready(Ok(n)) => {let contents_len: usize =self.outgoing.drain(..n).map(|t| t.contents.len()).sum();self.transmit_queue_contents_len = self.transmit_queue_contents_len.saturating_sub(contents_len);// We count transmits instead of `poll_send` calls since the cost// of a `sendmmsg` still linearly increases with number of packets.self.send_limiter.record_work(n);}Poll::Pending => {break Ok(false);}Poll::Ready(Err(e)) => {break Err(e);}}};self.send_limiter.finish_cycle();result}

至此,整个发送过程就算完了。写入socket的数据由具体的操作系统底层去实现了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148756.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

R语言和RStudio的下载安装(非常简便舒适)

目录 R语言和RStudio的关系R语言和Tableau下载R语言进入官网选择清华镜像源Download R for Windows选择base版本开始下载进行安装配置环境变量检查是否安装成功 下载RStudio进入官网点击下载进行安装检查是否安装成功打开选择R语言环境成功打开显示四个工作区 R语言和RStudio的…

【预处理详解】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 1. 预定义符号 2. #define定义常量 3. #define定义宏 4. 带有副作用的宏参数 5. 宏替换的规则 6. 宏函数的对比 7. #和## 7.1 #运算符 7.2 ## 运算符 8. 命名约定 …

你知道什么是SaaS吗?

你知道什么是SaaS吗&#xff1f; 云服务架构的三个概念 PaaS 英文就是 Platform-as-a-Service&#xff08;平台即服务&#xff09;PaaS&#xff0c;某些时候也叫做中间件。就是把客户采用提供的开发语言和工具&#xff08;例如Java&#xff0c;python, .Net等&#xff09;开…

计算两个向量的叉积numpy.cross()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 计算两个向量的叉积 numpy.cross() [太阳]选择题 请问代码中最后输出正确的是&#xff1f; import numpy as np a np.array([1, 2, 3]) b np.array([4, 5, 6]) c np.cross(a, b) pri…

nrm的安装以及使用

1&#xff0c;什么是nrm nrm 是一个 npm 源管理器&#xff0c;允许你快速地在 npm源间切换。 什么意思呢&#xff0c;npm默认情况下是使用npm官方源&#xff08;使用npm config ls命令可以查看&#xff09;&#xff0c;在国内用这个源肯定是不靠谱的&#xff0c;一般我们都会…

云原生专栏丨基于服务网格的企业级灰度发布技术

灰度发布&#xff08;又名金丝雀发布&#xff09;是指在黑与白之间&#xff0c;能够平滑过渡的一种发布方式。在其上可以进行A/B testing&#xff0c;即让一部分用户继续用产品特性A&#xff0c;一部分用户开始用产品特性B&#xff0c;如果用户对B没有什么反对意见&#xff0c;…

ssm+vue的高校疫情防控管理系统(有报告)。Javaee项目,ssm vue前后端分离项目。

演示视频&#xff1a; ssmvue的高校疫情防控管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;ssm vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结…

项目自动化构建工具——make/Makefile

目录 一、概念 二、使用实例 三、原理 四、进度条程序 1、缓冲区问题 1、概念 2、\r和\n 2、代码编写 一、概念 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的规则来指定&#xff0c;哪些文件需要先…

Open AI开发者大会:AI“科技春晚”

ChatGPT的亮相即将满一年之时&#xff0c;OpenAI举行了自己的首次开发者大会。OpenAI首席执行官Sam Altman宣布推出最新的大模型GPT-4 Turbo。正如“Turbo”一词的中文含义“涡轮增压器”一样&#xff0c;本次发布会上&#xff0c;OpenAI的这款最新大模型在长文本、知识库、多模…

Django+Vue项目创建 跑通

参考链接&#xff1a; 【精选】DjangoVue项目构建_django vue-CSDN博客 一、背景 主要介绍如何使用后端Django 前端Vue 的技术栈快速地搭建起一套web项目的框架。 为什么使用Django和Vue? Django是Python体系下最成熟的web框架之一&#xff0c;由于Python语言的易用…

【心得】PHP的文件上传个人笔记

目录 1 php的文件上传绕过 黑名单绕过 2 php文件上传的00截断 3 iconv字符转换异常后造成了字符截断 4 文件后缀是白名单的时候的绕过 web服务器的解析漏洞绕过 5.高级文件上传绕过 1 .htaccess nginx.htaccess 2 服务端内容检测 3 配合伪协议来绕过 4.配合日志包含绕…

Word2Vec浅谈

论文地址&#xff1a;Efficient Estimation of Word Representations in Vector Space 个人认为&#xff0c;word2vec主要解决的问题是one-hot中维度过高并且稀疏的问题。word2vec是Google团队在2013年发表的一篇paper&#xff0c;当时一经问世直接将NLP领域带到了一个新的高度…

038、语义分割

之——介绍与数据集 杂谈 语义分割&#xff0c;语义分割(Semantic Segmentation)方法-CSDN博客&#xff1a; 语义分割是计算机视觉领域的一项重要任务&#xff0c;旨在将图像中的每个像素分配到其对应的语义类别中。与物体检测或图像分类不同&#xff0c;语义分割不仅要识别图像…

Golang起步篇(Windows、Linux、mac三种系统安装配置go环境以及IDE推荐以及入门语法详细释义)

Golang起步篇 Golang起步篇一. 安装Go语言开发环境1. Wondows下搭建Go开发环境(1). 下载SDK工具包(2). 解压下载的压缩包&#xff0c;放到特定的目录下&#xff0c;我一般放在d:/programs下(路径不能有中文或者特殊符号如空格等)(3). 配置环境变量步骤1&#xff1a;先打开环境变…

数据结构【DS】特殊二叉树

完全二叉树 叶子结点只能出现在最下层和次下层, 最下层的叶子结点集中在树的左部完全二叉树中, 度为1的节点数 0个或者1个【计算时可以用这个快速计算, 配合&#x1d45b;0&#x1d45b;21】若n为奇数&#xff0c;则分支节点每个都有左右孩子&#xff1b;若n为偶数&#xff0…

【STM32】ADC(模拟/数字转换)

一、ADC的简介 1.什么是ADC 1&#xff09;将【电信号】-->【电压】-->【数字量】 2&#xff09;ADC可以将引脚上连续变化的模拟电压转换为内存中存储的数字量&#xff0c;建立模拟电路到数字电路的桥梁。 3&#xff09;12位逐次逼近型ADC&#xff0c;1us转换时间&#xf…

六大排序(插入排序、希尔排序、冒泡排序、选择排序、堆排序、快速排序)未完

文章目录 排序一、 排序的概念1.排序&#xff1a;2.稳定性&#xff1a;3.内部排序&#xff1a;4.外部排序&#xff1a; 二、插入排序1.直接插入排序2.希尔排序 三、选择排序1.直接选择排序方法一方法二直接插入排序和直接排序的区别 2.堆排序 四、交换排序1.冒泡排序2.快速排序…

“新KG”视点 | 知识图谱与大语言模型协同模式探究

OpenKG 大模型专辑 导读 知识图谱和大型语言模型都是用来表示和处理知识的手段。大模型补足了理解语言的能力&#xff0c;知识图谱则丰富了表示知识的方式&#xff0c;两者的深度结合必将为人工智能提供更为全面、可靠、可控的知识处理方法。在这一背景下&#xff0c;OpenKG组织…

数字IC前端学习笔记:异步复位,同步释放

相关阅读 数字IC前端https://blog.csdn.net/weixin_45791458/category_12173698.html?spm1001.2014.3001.5482 异步复位 异步复位是一种常见的复位方式&#xff0c;可以使电路进入一个可知的状态。但是不正确地使用异步复位会导致出现意想不到的错误&#xff0c;复位释放便是…

读像火箭科学家一样思考笔记03_第一性原理(上)

1. 思维的两种障碍 1.1. 为什么知识会成为一种缺陷而非一种美德 1.1.1. 知识是一种美德 1.1.2. 知识同样的特质也会把它变成一种缺点 1.1.3. 知识确实是个好东西&#xff0c;但知识的作用应该是给人们提供信息&#xff0c;而不是起约束作用 1.1.4. 知识应该启发智慧&#…