Rust实现基于Tokio的限制内存占用的channel

Rust实现基于Tokio的限制内存占用的channel

简介

本文介绍如何基于tokio的channel实现一个限制内存占用的channel。

Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:boundedunbounded,其中ubbounded的channel可以无限的发送数据,而bounded的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。

异步网络编程中常用一个channel连接两个task,其中业务task与业务交互:将要发送的数据发送到channel,而网络task与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task从channel中接收的速度小于业务task向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM

实现

  1. 获取数据大小

    要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个get_size方法,用于获取数据的大小。

    pub trait GetSize {/// get total sizefn get_size(&self) -> usize;
    }
    

    要发送的内容必须实现GetSize的Trait,并实现get_size方法。注意:get_size方法获取到的大小需包括栈空间和堆空间,例如:

     struct MyData {data: Vec<u8>,}impl GetSize for MyData {fn get_size(&self) -> usize {return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size}}
    
  2. 创建SizedSenderSizedReceiver

    SizedSenderSizedReceiver都可以基于tokio的UnboundedSenderUnboundedReceiver实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。

       
    pub struct SizedSender<T: GetSize> {inner: mpsc::UnboundedSender<T>,size_semaphore: Arc<(Semaphore, usize)>,
    }   pub struct SizedReceiver<T: GetSize> {inner: mpsc::UnboundedReceiver<T>,size_semaphore: Arc<(Semaphore, usize)>,
    }/// Limit space usage but not limit the number of messages, bytes_size must bigger than 0.
    pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {let (tx, rx) = mpsc::unbounded_channel::<T>();let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));(SizedSender::new(tx, semaphore.clone()),SizedReceiver::new(rx, semaphore),)
    }          
  3. SizedSender实现

    发送端发送时需要调用get_size方法获取数据的大小,然后调用Semaphore::available_permits方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。

    impl<T: GetSize> SizedSender<T> {pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}}fn do_send(&self,message: T,permits: Option<SemaphorePermit<'_>>,) -> Result<(), SendError<T>> {match self.inner.send(message) {Ok(r) => {if let Some(permits) = permits {permits.forget();}Ok(r)}Err(e) => {log::debug!("send value error!");Err(e)}}}pub async fn send(&self, message: T) -> Result<(), SendError<T>> {let message_size = message.get_size();if message_size > self.size_semaphore.1 {return Err(SendError(message));}let size = match u32::try_from(message_size) {Ok(size) => size,Err(_) => {return Err(SendError(message));}};if self.size_semaphore.0.available_permits() < size as usize {// The buffer is about to be depleted, sending may be blocked.}let permits = match self.size_semaphore.0.acquire_many(size).await {Ok(perimits) => Some(perimits),Err(_) => {return Err(SendError(message));}};self.do_send(message, permits)}}
    
  4. SizedReceiver的实现

    接收端接收时需要调用get_size方法获取数据的大小,然后将相应大小的permits还给信号量即可。

    impl<T: GetSize> SizedReceiver<T> {
    pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}
    }pub async fn recv(&mut self) -> Option<T> {self.inner.recv().await.map(|r| {let message_size = r.get_size();self.size_semaphore.0.add_permits(message_size);r})
    }
    }
  5. 其他

    在上述实现的基础上,还可以实现更多方法,比如try_sendtry_recv等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/116905.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络基础知识100问

1.什么是链接? 链接是指两个设备之间的连接。它包括用于一个设备能够与另一个设备通信的电缆类型和协议。 2.OSI 参考模型的层次是什么? 有 7 个 OSI 层&#xff1a;物理层&#xff0c;数据链路层&#xff0c;网络层&#xff0c;传输层&#xff0c;会话层&#xff0c;表示…

记一次gitlab平台任意用户注册引发的源代码泄漏

文章目录 一、漏洞原因二、漏洞利用1、任意用户注册2、成功进入后台3、越权查看其他用户的仓库源代码4、发现源代码仓库泄漏5、通讯录的地方,发现账号泄漏泄漏三、漏洞进一步利用四、总结五、免责声明一、漏洞原因 可以任意注册账号通过越权,查看其他用户仓库内的源代码造成源…

小白必看,手把手教你重装系统

一&#xff0c;安装步骤 二&#xff0c;重装之前需要做的准备 1、重装之前请大家务必注意备份重要资料。电脑有价&#xff0c;数据无价——重要数据备份之后&#xff0c;电脑随便折腾都没问题。大不了就是重装不成功。系统软件问题多试几次总能解决的&#xff0c;但重要数据一…

前端性能优化 - 虚拟滚动

一 需求背景 需求&#xff1a;在一个表格里面一次性渲染全部数据&#xff0c;不采用分页形式&#xff0c;每行数据都有Echart图插入。 问题&#xff1a;图表渲染卡顿 技术栈&#xff1a;Vue、Element UI 卡顿原因&#xff1a;页面渲染时大量的元素参与到了重排的动作中&#x…

flutter复制口令返回app监听粘贴板

overridevoid didChangeAppLifecycleState(AppLifecycleState state) {switch (state) {case AppLifecycleState.inactive: // 处于这种状态的应用程序应该假设它们可能在任何时候暂停。break;case AppLifecycleState.resumed: //从后台切换前台&#xff0c;界面可见handle();b…

MySQL数据库(三)

文章目录 MySQL数据库一、约束条件二、约束条件之主键三、补充一些其它SQL语句四、表查找关键字Select与from五、查询关键字之where筛选六、查询关键字之group by分组七、分组补充函数八、关键字之having过滤九、关键字之distinct去重十、关键字之order by排序十一、关键字之li…

苹果开发者 Xcode发布TestFlight全流程

打包前注意事项 使用Xcode导出安装包之前&#xff0c;必须先确认账户的所有合约是否全部同意&#xff0c;如果有不同意的&#xff0c;在出包的时候会弹出报错 点击前往苹果开发者官网https://appstoreconnect.apple.com/agreements/ 登录自己的开发者账户后&#xff0c;可以看…

react项目实现文件预览,比如PDF、txt、word、Excel、ppt等常见文件(腾讯云cos)

使用腾讯云文档预览&#xff0c;需要开通文档预览功能&#xff0c;该功能需要收费的。 使用限制 如果需要图片预览、视频或音频可以使用获取下载链接。 页面代码 <button onClick() > {handleClick(myself/文档.xlsx)}>预览</button><div style{{ height:…

谈谈你对spring boot 3.0的理解

谈谈你对spring boot 3.0的理解 一&#xff0c;Spring Boot 3.0 的兼容性 Spring Boot 3.0 在兼容性方面做出了很大的努力&#xff0c;以支持存量项目和老项目。尽管如此&#xff0c;仍需注意以下几点&#xff1a; Java 版本要求&#xff1a;Spring Boot 3.0 要求使用 Java 1…

Boundary-Aware RGBD Salient Object Detection With Cross-Modal Feature Sampling

方法 体会 实验做得比较详细&#xff0c;但未公布代码

六、Python类的高级知识

一、类 情景&#xff1a; 如果不实例化一个类能否调用其中的函数类里面的函数如何减弱跟类的关系&#xff0c;在一起只是为了代码方便整洁 1.实例方法 示例&#xff1a; class TEST():def __init__(self):self.hellonihaodef printf(self):for i in range(1,5):print(str(…

浏览器标签上添加icon图标;html引用ico文件

实例 <link rel"shortcut icon" href"./XXX.ico" type"image/x-icon">页面和图标在同一目录内 则 <link rel"shortcut icon" type"text/css" href"study.ico"/>可以阿里矢量图库关键字搜索下载自己…

H3C SecParh堡垒机 data_provider.php 远程命令执行漏洞

构造poc执行远程命令&#xff1a; /audit/data_provider.php?ds_y2019&ds_m04&ds_d02&ds_hour09&ds_min40&server_cond&service$(id)&identity_cond&query_typeall&formatjson&browsetrue漏洞证明&#xff1a; 文笔生疏&#xff0c…

windows开机自启动和忘记密码-备忘

windows开机自启动和忘记密码-备忘 文章目录 windows开机自启动和忘记密码-备忘1.自启动网址定时任务方式 2.忘记windows用户密码 1.自启动 网址 参考博文&#xff1a;https://blog.csdn.net/wwzmvp/article/details/113656544&#xff0c;感谢博主。 定时任务方式 如图&#…

spring boot项目运行jar包读取包内resources目录下的文件

spring boot项目运行jar包读取包内resources目录下的文件 摘要码代码相关文章 摘要 Spring Boot 项目打包成 jar 包后&#xff0c;resources 目录下的文件将会被打包到 jar 包中。如果需要在 Spring Boot 项目运行 jar 包后读取 resources 目录下的文件&#xff0c;可以使用 t…

使用链表实现栈操作

源码如下&#xff1a; #include <stdio.h> #include <stdlib.h> struct node {int info;struct node *link; }; struct node *top NULL, *temp; void push(struct node *); void pop(struct node *); void display(struct node *);int main() {int x 0, item;pr…

【记录】1024徽章

对spring的理解 1、Spring是什么? Spring是一个轻量级的IoC和AOP容器框架。是为Java应用程序提供基础性服务的一套框架&#xff0c;目的是用于简化企业应用程序的开发&#xff0c;它使得开发者只需要关心业务需求。常见的配置方式有三种&#xff1a;基于XML的配置、基于注解…

小游戏外包开发流程及费用

小游戏的开发流程和费用会因项目的规模、复杂性和所选技术平台而有所不同。以下是一般的小游戏开发流程和可能的费用因素&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 开发流程&#xff1a; 概念和…

云原生微服务实战 Spring Cloud Alibaba 之 Nacos

系列文章目录 第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Sprin…

ES 8.x 向量检索性能测试 把向量检索性能提升100倍!

向量检索不仅在的跨模态检索场景中应用广泛&#xff0c;随着chat gpt的或者&#xff0c;利用es的向量检索&#xff0c;在Ai领域发挥着越来越大的作用。 本文&#xff0c;主要测试es的向量检索性能。我从8.x就开始关注ES的向量检索了。当前ES已经发布到 8.10 版本。以下是官方文…