rust tokio select!宏详解

rust tokio select!宏详解

简介

本文介绍Tokioselect!的用法,重点是使用过程中可能遇到的问题,比如阻塞问题、优先级问题、cancel safe问题。在Tokio 中,select! 是一个宏,用于同时等待多个异步任务,并在其中任意一个任务完成时执行相应的逻辑。

基本用法

如下代码演示了如何使用 Tokio 库实现一个异步的消息传递系统,其中包括三个无限通道和一个关闭通道。程序使用了 select! 宏来等待通道和关闭通道的事件,并在事件发生时执行相应的操作。

程序的主要步骤如下:

  1. 创建三个无限通道和一个用于传递关闭信号的通道。
  2. 向三个通道中发送一些数据。
  3. 开启一个异步任务并在两秒后发送关闭信号。
  4. 在主循环中使用 select! 宏等待通道和关闭通道的事件。
  5. 当一个通道接收到数据时,打印出数据。
  6. 当关闭通道接收到信号时,退出循环。

程序中的 select! 宏使用了类似于 match 的语法,但是它可以同时等待多个异步事件。当其中一个事件发生时,宏将执行相应的代码块,并跳出循环。在本例中,当一个通道接收到数据时,打印出数据;当关闭通道接收到信号时,退出循环。
select!经常与loop搭配使用,循环地从多个通道中接收事件并处理。

use std::time::Duration;use tokio::select;#[tokio::main]
async fn main() {let (sender1, mut receiver1) = tokio::sync::mpsc::unbounded_channel::<String>();let (sender2, mut receiver2) = tokio::sync::mpsc::unbounded_channel::<String>();let (sender3, mut receiver3) = tokio::sync::mpsc::unbounded_channel::<String>();let (shutdown_sender, mut shutdown_receiver) = tokio::sync::watch::channel(());for i in 0..3 {sender1.send(i.to_string()).unwrap();sender2.send(i.to_string()).unwrap();sender3.send(i.to_string()).unwrap();}tokio::spawn(async move {tokio::time::sleep(Duration::from_secs(2)).await;shutdown_sender.send(()).unwrap(); //两秒后关闭});loop {select! {ret = receiver1.recv() => {println!("channel 1 received: {:?}", ret);},ret = receiver2.recv() => {println!("channel 2 received: {:?}", ret);},ret = receiver3.recv() => {println!("channel 3 received: {:?}", ret);},_ = shutdown_receiver.changed() => {println!("shutdown received");break;}};}
}

可能遇到的坑

阻塞

select中的各个分支是并行执行的,这里的并行是指分支中的各个future在并行执行。不过一旦某个分支的future完成并进入了分支代码块,如果在分支代码中有一些阻塞的操作,则其他分支是没有机会执行的。
比如下面代码,在receiver1.recv()完成时,sleep了10s,sleep期间其他的分支是不会执行的。即使在2s后发送了shutdown信号,select!因为无法及时处理此信号,实际上循环也无法退出。

 loop {select! {ret = receiver1.recv() => {println!("channel 1 received: {:?}", ret);tokio::time::sleep(Duration::from_secs(10)).await;//这里等待期间,其他的分支是无法被执行的},ret = receiver2.recv() => {println!("channel 2 received: {:?}", ret);},ret = receiver3.recv() => {println!("channel 3 received: {:?}", ret);},_ = shutdown_receiver.changed() => {println!("shutdown received");break;}};}

这个坑在网络编程中比较容易踩到,比如select这里是从channel中取出上层应用传来的数据,并将其写入到socket中,而写socket的操作是有可能阻塞的,阻塞期间其他的分支是无法执行的。

顺序

1、默认情况下select中的各个分支执行顺序是随机的,比如上面例子中三个channel都有消息的情况下,具体去执行哪个分支是随机的。执行结果如下:
在这里插入图片描述
2、如果想要区分优先级,可以加标志biased,这样每次select将会按照从上到下的顺序去poll每个future,也就是说优先级顺序是从上往下的。比如某些场景下需要按优先级处理各个channel中的数据时这个特性就很有用。代码如下:

    loop {select! {biased;//按顺序优先执行ret = receiver1.recv() => {println!("channel 1 received: {:?}", ret);},ret = receiver2.recv() => {println!("channel 2 received: {:?}", ret);},ret = receiver3.recv() => {println!("channel 3 received: {:?}", ret);},_ = shutdown_receiver.changed() => {println!("shutdown received");break;}};}

运行结果如下:
在这里插入图片描述
3、顺序执行时注意饿死问题
添加了biased标志后,顺序靠前的future总是先被执行,在上述例子中,极端情况下如果靠前的channel总是有数据,那后面的channel就没有机会被执行。比如例子中如果前三个channel中一直有数据,那shutdown_receiver就无法收到shutdown信号,导致程序功能不符合预期。
解决这个问题很简单,就是把更关键的控制性的future放在最前方。

关于cancel safe

select!中如果某个分支future completed了,会将其他分支的future cancel掉,这个cancel操作要格外小心,因为如果future不是cancel safe的可能会丢数据。tokio的官方文档中给出了常见的cancel safe和不safefuture
那么如何判断自己实现的future是否是cancel safe的呢? 很简单、只需要思考如果future中的代码执行到.await时被cancel了,是否是安全的。我们来看下cancel unsafe的代码长啥样:

pub async fn read_and_write(mut message_recevier: UnboundedReceiver<Bytes>, mut file: File) {let message = message_recevier.recv().await.unwrap();file.write(&message).await.unwrap();
}

该方法从一个channel中读取消息,并将此消息写入到文件中,这个future就明显不是cancel safe的。为啥呢?试想一下,此futurechannel中读到消息之后,在写文件时被cancel掉了,那message岂不是就丢了。
实际项目中一定要格外小心这个cancel safe问题,很容易造成丢数据或者数据重复等不良反应,而且一旦出现了还很难复现、不太容易想到是这里的问题。网络编程中尤其要注意tokio::io::AsyncWriteExt::write_all不是cancel safe的,因为它内部可能是多次调用write操作才将所有缓冲区写入。

数量

1、首先select!中的分支仅支持显式地用代码书写,无法动态增减。就是说在写代码时select中的futures数量就固定了,程序运行过程中无法动态删减。
2、目前最多支持64个分支。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/172888.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索 Vue 中的 bus.$emit:实现组件通信的强大工具

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

运维高级--centos7源码安装Apache

安装必要的依赖项&#xff1a; sudo yum groupinstall "Development Tools" sudo yum install pcre pcre-devel zlib zlib-devel openssl openssl-devel这将安装编译和构建所需的基本工具&#xff0c;以及 Apache HTTP Server 所需的一些依赖项。 下载 Apache HTT…

Pycharm Available Packages显示Noting to show

使用Pycharm安装依赖包时Available packages 页面点击添加按钮后,没有任何包显示,并且无法搜索安装. 在各种网站查看到的方法如下: 1.网络问题,需要添加镜像源 点击Manage Repositories 添加一个可用的镜像源地址即可 2.打开了anaconda(那个绿色圈圈小图标),再点一下把它点…

如何在 Vim 中剪切、复制和粘贴

目录 前言 如何在 Vim 编辑器中复制文本 如何在 Vim 编辑器中剪切文本 如何在 Vim 编辑器中粘贴文本 如何通过选择文本来剪切和复制文本 通过选择文本复制 在 Vim 中选择文本来剪切文本 前言 在本篇 Vim 快速技巧中&#xff0c;你将学习到剪切和复制粘贴的相关知识。 剪…

PgSQL技术内幕-Analyze做的那些事-pg_stat_all_tables

PgSQL技术内幕-Analyze做的那些事-pg_stat_all_tables pg_stat_all_tables视图中记录有analyze信息&#xff0c;比如何时做的analyze、表元组个数&#xff08;活元组、死元组&#xff09;等。重启后发现该视图中表的统计信息重置不见了&#xff0c;发生了什么&#xff1f; 1、p…

HarmonyOS开发者工具DevEco Studio-汉化

HarmonyOS DevEco Studio 简介 下载安装及汉化 打开开发者工具 安装语言包重启 然后设置页搜索“chinese”&#xff0c;选中中文语言包&#xff0c;点击后面的install&#xff1b; 或者 汉化按照IDEA的汉法风格&#xff0c;需要安装插件重启就可以汉化&#xff0c;步骤为&…

在云服务器上搭建个人版chatGPT及后端Spring Boot集成chat GPT

原创/朱季谦 本文分成两部分&#xff0c;包括【国内服务器上搭建chat GPT】和【后端Spring Boot集成chat GPT】。 无论是在【国内服务器上搭建chat GPT】和【后端Spring Boot集成chat GPT】&#xff0c;两个方式都需要魔法访问&#xff0c;否则是无法正常使用的&#xff0c;即…

Linux uname命令教程:如何打印linux操作系统名称和硬件的基本信息(附实例教程和注意事项)

Linux uname命令介绍 uname命令是一个在Linux中常用的命令行工具&#xff0c;用于打印有关操作系统名称和系统硬件的基本信息。uname这个名字来源于"UNIX name"。它最常用于确定处理器架构&#xff0c;系统主机名和系统上运行的内核版本。 Linux uname命令适用的Li…

基于SSM的企业订单跟踪管理系统(有报告)。Javaee项目

演示视频&#xff1a; 基于SSM的企业订单跟踪管理系统&#xff08;有报告&#xff09;。Javaee项目 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringM…

Python---函数的数据---拆包的应用案例(两个变量值互换,*args, **kwargs调用时传递参数用法)

案例&#xff1a; 使用至少3种方式交换两个变量的值 第一种方式&#xff1a;引入一个临时变量 c1 10 c2 2# 引入临时变量temp temp c2 c2 c1 c1 tempprint(c1, c2) 第二种方式&#xff1a;使用加法与减法运算交换两个变量的值&#xff08;不需要引入临时变量&#xff09…

python--获取每张切片的不同PEF区间值的百分比

在全直径数字岩心中&#xff0c;如何获取每张切片的不同PEF区间值的百分比&#xff1f; import os import datetime from PIL import Image import numpy as np import csv import easygui as gclass Table(object):def __init__(self, table_data_path):self.table_data_path…

ClickHouse中的物化视图

技术主题 技术原理 物化视图&#xff08;Materialized View&#xff09;是一种预先计算并缓存结果的视图&#xff0c;存储在磁盘上自动更新&#xff0c;空间换时间的思路。物化视图是一种优化技术&#xff0c;本质上就是为了加速查询操作&#xff0c;降低系统负载&#xff0c…

5、Qt:项目中包含多个子项目(.pro)/子模块(.pri)

一、说明&#xff1a; 在进行项目开发过程中&#xff0c;会涉及子项目/子模块的问题 Qt中使用TEMPLATE subdirs添加多个子项目&#xff1b;子项目可以单独编译生成可执行文件&#xff08;exe&#xff09;或者动态链接库&#xff08;dll&#xff09;等&#xff0c;供其他模块…

C#学习-9课时

P11 IF判断(上) P11 IF判断(中 ) bool→true or false&#xff1b; 为&#xff1a;变量赋值 为&#xff1a;等于(判断) !为&#xff1a;≠ 优先级&#xff1a;大于 using System; using System.Collections.Generic; using System.Linq; using System.Text; usin…

论文笔记——FasterNet

为了设计快速神经网络,许多工作都集中在减少浮点运算(FLOPs)的数量上。然而,作者观察到FLOPs的这种减少不一定会带来延迟的类似程度的减少。这主要源于每秒低浮点运算(FLOPS)效率低下。 为了实现更快的网络,作者重新回顾了FLOPs的运算符,并证明了如此低的FLOPS主要是由…

路径规划之D*算法

系列文章目录 路径规划之Dijkstra算法 路径规划之Best-First Search算法 路径规划之A*算法 路径规划之D *算法 路径规划之D*算法 系列文章目录前言一、D*算法1.1 起源1.2 思想1.3 阶段1.4 个人理解1.5 应用 前言 之前说过A是目前应用最广泛的寻路算法&#xff0c;但是A算法存…

深度学习第2天:RNN循环神经网络

☁️主页 Nowl &#x1f525;专栏《机器学习实战》 《机器学习》 &#x1f4d1;君子坐而论道&#xff0c;少年起而行之 文章目录 介绍 记忆功能对比展现 任务描述 导入库 处理数据 前馈神经网络 循环神经网络 编译与训练模型 模型预测 可能的问题 梯度消失 梯…

【古诗生成AI实战】之一——实战项目总览

[1] 总览 【古诗生成AI实战】系列共五篇文章&#xff1a; 【古诗生成AI实战】之一——实战项目总览   【古诗生成AI实战】之二——项目架构设计   【古诗生成AI实战】之三——任务加载器与预处理器   【古诗生成AI实战】之四——模型包装器与模型的训练   【古诗生成AI…

【双指针】三数之和

三数之和 在做这道题之前&#xff0c;建议建议先将两数之和做完再做&#xff0c;提升更大~ 文章目录 三数之和题目描述算法原理解法一解法二思路如下&#xff1a;处理细节问题&#xff1a; 代码编写Java代码编写C代码编写 15. 三数之和 - 力扣&#xff08;LeetCode&#xff0…

knife4j集合化postman

knife4j集合化postman 01 knife4j的介绍 基于 JavaMVC的集成框架swagger的进一步强化&#xff0c;在原有通过注释就能生成文档的前身swagger-bootstrap-ui之上&#xff0c;增加了postman的测试功能&#xff0c;优化了文档的UI界面&#xff0c;在测试api接口的方面有了极大的进…