rust学习-异步流

Stream trait

trait Stream {// 由 `stream` 产生的值的类型type Item;// 尝试解析 `stream` 中的下一项fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)-> Poll<Option<Self::Item>>;
}

Stream trait 是一个定义了异步流(asynchronous stream)的接口。
异步流表示一个异步可迭代集合,每个元素都可由异步代码异步生成
该模式常用于流式处理或与数据吞吐量无关的处理,带来性能提升。

Stream 定义了一个具有一个相关联类型 Self::Item 的抽象类型
这个类型是流内部元素的类型。
通过将此类型与 Option 结合使用,可以有效地表示流终止的状态。

Stream trait 的最重要的方法是 poll_next()
该方法是一个异步实现的访问器,它用于从流中获取下一个元素
在异步编程中,self 参数必须是 Pin<&mut Self> 类型
以确保安全地存储 stream 的内存,同时保持它们可变性和引用关系。

poll_next() 中,需要使用 Context 类型来跟踪和管理对流的异步访问。
Context 是一个包含有关异步上下文的上下文,例如调度器、唤醒器和运行程序的线程等。
了解上下文非常重要,因为它是在执行异步任务期间进行重要决策的关键因素。

poll_next() 方法返回一个 Poll<OptionSelf::Item> 枚举值
它表示将下一个元素轮询到结果的状态:

  • 如果流仍具有元素,则返回 Poll::Ready(Some(element))
  • 如果流结束,则返回 Poll::Ready(None)
  • 如果无法轮询下一个元素,则返回 Poll::Pending

Stream trait 还定义了许多其他方法,如 map() 和 filter() 等
它们与 Iterator trait 相似,并允许转换、过滤和操作流中的元素。

例如
可以使用如下代码来实现 Stream trait,从而使类型成为异步流:


use std::pin::Pin;
use std::task::{Context, Poll};pub trait Stream {type Item;fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)-> Poll<Option<Self::Item>>;
}struct MyStream {// implement fields...
}impl Stream for MyStream {type Item = String;fn poll_next(self: Pin<&mut Self>,cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {// implement poll_next() method}
}

在这个例子中,定义了一个 MyStream 类型,它实现了 Stream trait 并包含了一个 String 类型的元素。
使用大多数异步流通常具备的 poll_next() 方法,该方法将 String 类型元素作为下一个生成项返回。

示例

[dependencies]
futures = "0.3"
tokio = { version = "1.16.0", features = ["full"] }
tokio-stream = "0.1.14"

生成1~9的序列

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream; // 引入第三方 crate `tokio_stream`
use tokio_stream::StreamExt;
use tokio::main;struct MyStream {value: i32,
}impl Stream for MyStream {type Item = i32;fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {// 模拟一些计算let dst = self.value + 1;// 这里不需要返回值std::mem::replace(&mut self.value, dst);// 如果值小于 10,返回它,否则返回 Noneif self.value < 10 {Poll::Ready(Some(self.value))} else {Poll::Ready(None)}}
}#[tokio::main]
async fn main() {let stream = MyStream { value: 0 };println!("main begin");// 使用异步方式处理每个生成项tokio::pin!(stream); // 将流钉住以确保安全性// await必须要在async块中while let Some(val) = stream.next().await {println!("Got {}", val);}
}

迭代

有很多不同的方法可以迭代处理 Stream 中的值
map,filter 和 fold,以及“遇错即断”版本 try_map,try_filter 和 try_fold
for 循环不能用在 Stream 上,while let 以及 next/try_next 函数还可以使用

计算 stream 中所有元素的和

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {// StreamExt trait 中包含了一些非常有用的方法// 例如 fold、fold_while、 for_each 和 next 等// 这里是用nextuse futures::stream::StreamExt;let mut sum = 0;// 迭代 stream 中的所有元素,并将它们相加// 每次迭代时调用 next 方法返回下一个元素,并将其存储在 item 的变量中// 如果 next 方法返回了 None,则循环将结束// 这是异步代码,必须使用 await 关键字来等待 next 方法的完成while let Some(item) = stream.next().await {sum += item;}sum
}

Futures 和 Streams 是 Rust 异步编程的础。
Futures 表示异步计算的结果,使用 async 和 await 将它们组合在一起,进行非阻塞的异步计算。
Streams 表示异步生成的一系值,使用 Stream trait 对它们进行操作,并逐渐生成结果。

使用try版本

// 该函数可能返回计算过程中的 IO 错误
async fn sum_with_try_next(mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<i32, io::Error> {use futures::stream::TryStreamExt; // 对于 `try_next`let mut sum = 0;// 在这里使用了 try_next 方法// 因为 stream 的元素类型是 Result<i32, io::Error>// 这个方法可以返回 Err,进而把错误传递到函数的调用方 //// 如果 try_next 方法返回一个 Ok 值// 则 item 变量会包含值// 如果返回值是一个 Err// 则该错误通过 ? 操作符传播,导致整个函数退出并将错误返回给调用方while let Some(item) = stream.try_next().await? {sum += item;}Ok(sum)
}

并发

如果每次只处理一个元素,如上文,就会失去并发的机会
为了并发处理一个 Stream 的多个值,使用 for_each_concurrent 或 try_for_each_concurrent 方法

使用异步并发处理繁重的、耗时的工作,可以更高效地处理大型计算,提高代码的可伸缩性。
但是过度并发也会导致程序性能下降或崩溃

// 参数指向流的可变引用,用于在异步流上提供异步迭代器功能
async fn jump_around(mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>) -> Result<(), io::Error> {// 对于 `try_for_each_concurrent`use futures::stream::TryStreamExt;// 限制最大并发队列长度const MAX_CONCURRENT_JUMPERS: usize = 100;// try_for_each_concurrent 方法需要一个函数作为参数// 该函数接受 num 参数,并返回一个 Future 对象// 使用 async move {...} // 来桥接 jump_n_times 和 report_n_jumps 两个异步函数stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {// 在异步代码中,必须使用异步执行// 因此这里也使用 await 关键字来异步等待该函数的执行结果// 模拟一个人跳跃 num 次jump_n_times(num).await?;// 报告一个人跳跃了 num 次。report_n_jumps(num).await?;// 没有IO错误Ok(())}).await?;Ok(())
}

附录

#[tokio::main]

Rust 宏(macro),用于将一个异步函数标记为使用 Tokio 运行时(runtime)执行。
Tokio 是一种支持异步 I/O 和非阻塞 IO 操作的 Rust 异步编程框架,可以轻松处理并发任务和事件驱动的编程模型。

async fn 指的是一个异步函数,它返回一个 Future 对象,可以在执行期间轻松地挂起和恢复。而异步 I/O 和非阻塞 IO 操作可以提高程序性能,因为它们可以在等待某些操作完成时,同时处理其他任务。

#[tokio::main] 宏可以用于标记一个异步函数作为程序的主函数,并在其中使用 Tokio runtime 来执行异步任务。这个宏用于创建一个可执行程序,就像普通的 main 函数一样,但在 Tokio 的异步环境中运行异步函数。

示例

#[tokio::main]
async fn main() {// 异步代码
}

定义了一个函数 main 并使用 #[tokio::main] 标记它
告诉编译器,这个函数应该被编译为一个异步函数,并使用 Tokio 运行时来执行异步任务。
在函数体内,可以包任意的异步代码,例如调用异步函数、执行异步 I/O 操作、进行并行或流式处理等等。

注意:
在一个 Rust 项目中,只允许一个 tokio::main 函数存在,且必须作为程序入口点使用。
如果编写库,可以使用 #[tokio::main(flavor = “current_thread”)] 注解
以确保库不会创建任何运行时。
此时,调用者代码将需要负责在适当的时间创建和启动任何需要的运行时。

Tokio 只是 Rust 异步编程的其中一种解决方案,并不能解决所有异步编程问题。
其他常见的方案包括 async-std 和 futures-rs。

std::mem::replace

Rust 的标准库 std 中的一个函数,用于在不丢失内存所有权的情况下更改变量的值。replace 函数通常用于对可变变量进行非常规的修改操作,例如交换变量值,插入或移除元素,或在特定条件下强制赋值

replace 函数采用两个参数:第一个参数是待修改的变量名,第二个参数是新值。该函数会返回旧值,同时将新值赋给变量。

pub fn replace<T>(dest: &mut T, mut src: T) -> T

交换两个变量的值:

let mut a = String::from("hello");
let mut b = String::from("world");// 把world赋值给a,然后将hello返回赋值给temp
let temp = std::mem::replace(&mut a, b);
// 然后b的值为temp
let b = temp;println!("a: {}", a);
println!("b: {}", b);

使用 std::mem::replace 函数交换 a 和 b 的值,将变量 b 中的值存储在一个临时变量 temp 中,再将 temp 中的值存储回变量 b 中。

replace 函数会将旧值从变量 dest 中取出,然后将新值 src 存储在该变量中。
因此,它需要完成变量类型之间的复制或移动操作。
在处理大型结构体或复杂数据类型时,这可能会导致性能和内存问题。
此外,replace 函数只适用于可变引用。
如果使用的是非可变引用,则需要使用其他方法来修改变量的值。

Rust 编程语言中的一个语法糖,用于简化错误处理代码的编写。
它一般用于函数的返回值类型为 Result 或 Option 的情况下,可以把一系列可能的错误和状态转换的判断语句,缩减为一行代码。

在 Rust 中,当遇到错误时,常规的处理方式是通过 match 或 if let 等语句进行模式匹配和错误处理。这样会导致每次检查错误的代码严重膨胀,并且会淹没主流程的本质。
同时 Rust 还提供了内置的 Error 类型 Result<T, E>,以及在标准库中为此类型提供的一些方法,例如 unwrap、unwrap_or、map、and_then、or 等。这些方法可以很好地处理错误,但其缺点是将错误的检查和处理条件与主流程代码混为一体。

在这种情况下,? 操作符就是一种更加简单和精简语法,它的作用是将当前函数返回值的 Result 或 Option 类型的内容进行检查,如果是 Ok 则返回 Ok 中的值。如果是 Err,则直接将这个 Err 返回出去,并中断当前函数的执行。

?操作符在某种程度上类似于其他编程语言中的异常的处理机制,但由于 Rust 非常强制性地提供了对错误的抽象和处理,使得最终 Rust 可以避免异常所导致的问题。

use std::fs::File;
use std::io::Read;fn read_file(path: &str) -> Result<String, std::io::Error> {let mut file = File::open(path)?;let mut contents = String::new();file.read_to_string(&mut contents)?;Ok(contents)
}

定义一个 read_file 函数,它打开指定路径的文件,并读取文件内容。

函数的返回类型为 Result<String, std::io::Error>,表示该函数可能会返回一个字符串,或是一个包含 IO 错误的 Err。

使用 ? 操作符,可以将每个 I/O 操作的返回结果都检查一遍,以确保函数总是在遇到 IO 错误时返回错误类型。

在 Rust 中,? 操作符只能在 Result 或 Option 类型中使用,不能在其他类型中使用。

如果错误类型不是这些类型的任何一种,可以将其包装到一个 Result 或 Option 中,然后再使用 ? 进行处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/30079.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【项目学习1】如何将java对象转化为XML字符串

如何将java对象转化为XML字符串 将java对象转化为XML字符串&#xff0c;可以使用Java的XML操作库JAXB&#xff0c;具体操作步骤如下&#xff1a; 主要分为以下几步&#xff1a; 1、创建JAXBContext对象&#xff0c;用于映射Java类和XML。 JAXBContext jaxbContext JAXBConte…

8.10CPI决战日来临,黄金会意外走高吗?

近期有哪些消息面影响黄金走势&#xff1f;黄金多空该如何研判&#xff1f; ​黄金消息面解析&#xff1a;周四(8月10日)亚市早盘&#xff0c;美元指数在102.50维持多头走势&#xff0c;黄金避险情绪消散&#xff0c;金价跌至1916美元&#xff0c;下破1900美元前景深化。周三黄…

分布式 - 服务器Nginx:一小时入门系列之静态网页配置

文章目录 1. 静态文件配置2. nginx listen 命令解析3. nginx server_name 命令解析4. nginx server 端口重复5. nginx location 命令 1. 静态文件配置 在 /home 文件下配置一个静态的AdminLTE后台管理系统&#xff1a; [rootnginx-dev conf.d]# cd /home [rootnginx-dev home…

「2024」预备研究生mem-论证推理强化:评价类

一、论证推理强化&#xff1a;评价类 二、课后题

0基础学习VR全景平台篇 第81篇:全景相机-临云镜如何直播推流

临云镜全景相机是阿里巴巴定制全景设备&#xff0c;实现空间三维信息的快速采集&#xff0c;与阿里云三维空间重建平台搭配&#xff0c;帮助品牌商与平台以较低的成本完成空间的快速采集&#xff0c;并支持对室内/室外空间的三维全景展示及空间漫游&#xff0c;同时支持VR浏览、…

jmeter创建一个压测项目

1.jemeter新建一个项目&#xff1a; 2.接下来对Thread进行描述&#xff0c;也可以先使用默认的Thread进行操作。 3.添加http请求头的信息。按照如图所示操作 4.在请求头里面添加必要的字段&#xff0c;可以只填必要字段就可以 5.添加Http请求信息&#xff0c;如下图&#xff…

LeetCode练习习题集【4月 - 7 月】

LEETCODE习题集【4月-7月总结】 简单 数组部分 1.重复数 题目&#xff1a; 在一个长度u为 n 的数组 nums 里的所有数字都在 0&#xff5e;n-1 的范围内。数组中某些数字是重复的&#xff0c;但不知道有几个数字重复了&#xff0c;也不知道每个数字重复了几次。请找出数组中…

【Java设计模式】建造者模式 注解@Builder

概念 将一个复杂对象的构造与它的表示分离&#xff0c;使同样的构建过程可以创建不同的表示。它使将一个复杂的对象分解成多个简单的对象&#xff0c;然后一步步构建而成。 每一个具体建造者都相对独立&#xff0c;而与其它的具体建造者无关&#xff0c;因此可以很方便地替换具…

nginx 负载均衡

1.环境准备 我使用的说centos7的系统 1.20版本的nginx 另外还有3台虚拟机 主机&#xff1a;192.168.163.142 两台服务器&#xff1a;服务器A--192.168.163.140 服务器B---192.168.163.141 2.配置服务器A和B 找到nginx下的html目录&#xff0c;编辑其中的index.html(在此…

篇五:原型模式:复制对象的秘密

篇五&#xff1a;"原型模式&#xff1a;复制对象的秘密" 设计模式是软件开发中的重要组成部分&#xff0c;原型模式是创建型设计模式中的一种。原型模式旨在通过复制现有对象来创建新的对象&#xff0c;而不是通过调用构造函数来创建。在C中&#xff0c;原型模式广泛…

nginx负载均衡

目录 负载均衡 nginx的七层代理和四层代理 四层代理与七层代理之间的区别 四层和七层谁的速度快&#xff1f; 正向代理与反向代理 负载均衡 upstream 算法 算法总结 stream 负载均衡 通过反向代理来实现 nginx的七层代理和四层代理 七层是最常用的反向代理方式&am…

实践指南-前端性能提升 270% | 京东云技术团队

一、背景 当我们疲于开发一个接一个的需求时&#xff0c;很容易忘记去关注网站的性能&#xff0c;到了某一个节点&#xff0c;猛地发现&#xff0c;随着越来越多代码的堆积&#xff0c;网站变得越来越慢。 本文就是从这样的一个背景出发&#xff0c;着手优化网站的前端性能&a…

Linux 文件与目录管理

nvLinux 文件与目录管理 我们知道 Linux 的目录结构为树状结构&#xff0c;最顶级的目录为根目录 /。 其他目录通过挂载可以将它们添加到树中&#xff0c;通过解除挂载可以移除它们。 在开始本教程前我们需要先知道什么是绝对路径与相对路径。 绝对路径&#xff1a; 路径的写…

JS逆向系列之猿人学爬虫第8题-验证码-图文点选

题目地址 https://match.yuanrenxue.cn/match/8本题的难点就在于验证码的识别,没啥js加密,只要识别对了携带坐标就给返回数据 回过头来看验证码 这里复杂的字体比较多,人看起来都有点费劲(感觉可能对红绿色盲朋友不太又好)&#x

log_softmax比softmax更好?

多类别分类的一个trick 探讨一下在多类别分类场景&#xff0c;如翻译、生成、目标检测等场景下&#xff0c;使用log_softmax的效果优于softmax的原因。 假设词典大小为10&#xff0c;一个词的ID为9&#xff08;即词典的最后一个词&#xff09;&#xff0c;使用交叉熵作为损失函…

PS 2023 安装选项页面显示不全

文章目录 PS 2023 安装选项页面显示不全解决办法 PS 2023 安装选项页面显示不全 解决办法 按住Tab键&#xff0c;点击该安装选项页面即可&#xff0c;如下如所示&#xff1a;

九耶|阁瑞钛伦特 神庙逃亡游戏代码

以下是一个简单的神庙逃亡&#xff08;Temple Run&#xff09;游戏的HTML代码示例&#xff1a; <!DOCTYPE html> <html> <head><title>神庙逃亡</title><style>#game-container {position: absolute;width: 800px;height: 600px;backgrou…

仓储10、20代电子标签接口文档

标签注册 仓储10代注册 右下角左下角组合按键触发注册 注册成功&#xff1a;右上角绿灯变红灯&#xff0c;并显示信号强度的数值 ​ 仓储20代注册 右下角左下角组合按键触发注册 注册成功&#xff1a;右上角绿灯变红灯&#xff0c;并显示信号强度的数值 ​ 查询电子标签信息…

7.Eclipse中改变编码方式及解决部分乱码问题

1、改变整个工作空间的编码方式&#xff1a; 点击Window->Preference->General->workplace&#xff0c;然后选择默认编码方式 2、改变某个项目的编码方式&#xff1a; 右键点击项目名->Properties>Resource&#xff0c;然后选择默认编码方式。 问题&#xff…

node笔记——调用免费qq的smtp发送html格式邮箱

文章目录 ⭐前言⭐smtp授权码获取⭐nodemailer⭐postman验证接口⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享关于node调用免费qq的smtp发送邮箱。 node系列往期文章 node_windows环境变量配置 node_npm发布包 linux_配置node node_nvm安装配置 node笔记_h…