rust里mp5a4_Rust源码分析:channel内部mpsc队列

首先,之前的upgrade过程中内存的回收要稍微注意下。因为Receiver现在指向shared::Packet之后,那个new_port需要被析构,也就是调用drop函数,我们看下drop的实现:

implDropforReceiver{fn drop(&mutself){match*unsafe{self.inner()}{Flavor::Oneshot(refp)=>p.drop_port(),Flavor::Stream(refp)=>p.drop_port(),Flavor::Shared(refp)=>p.drop_port(),Flavor::Sync(refp)=>p.drop_port(),}}}

由于之前的swap操作,走Flavor::Oneshot路径:

pubfn drop_port(&self){matchself.state.swap(DISCONNECTED,Ordering::SeqCst){// An empty channel has nothing to do, and a remotely disconnected// channel also has nothing to do b/c we're about to run the drop// glueDISCONNECTED|EMPTY=>{}// There's data on the channel, so make sure we destroy it promptly.// This is why not using an arc is a little difficult (need the box// to stay valid while we take the data).DATA=>unsafe{(&mut*self.data.get()).take().unwrap();},// We're the only ones that can block on this port_=>unreachable!()}}

同样是DISCONNECTED替换DISCONNECTED而已,没有过多操作。

同时不再需要的oneshot::Packet也要被析构:

implDropforPacket{fn drop(&mutself){assert_eq!(self.state.load(Ordering::SeqCst),DISCONNECTED);}}

只是个DISCONNECTED的检验操作。

所以现在Sender/Receiver都存放了Flavor::Shared(Arc<:packet>>),之前的Flavor::Oneshot(Arc<:packet>>>和临时产生的Sender/Receiver都不存在了。

并发队列

所以我们接着关注内在的数据结构,通过跟踪以下函数来分析:Sender::send(&self, t: T)

Receiver::recv(&self)

Receiver::recv_timeout(&self, timeout: Duration)

Sender::send(&self, t: T):

pubfn send(&self,t: T)-> Result>{let(new_inner,ret)=match*unsafe{self.inner()}{Flavor::Oneshot(refp)=>{if!p.sent(){returnp.send(t).map_err(SendError);}else{leta=Arc::new(stream::Packet::new());letrx=Receiver::new(Flavor::Stream(a.clone()));matchp.upgrade(rx){oneshot::UpSuccess=>{letret=a.send(t);(a,ret)}oneshot::UpDisconnected=>(a,Err(t)),oneshot::UpWoke(token)=>{// This send cannot panic because the thread is// asleep (we're looking at it), so the receiver// can't go away.a.send(t).ok().unwrap();token.signal();(a,Ok(()))}}}}Flavor::Stream(refp)=>returnp.send(t).map_err(SendError),Flavor::Shared(refp)=>returnp.send(t).map_err(SendError),Flavor::Sync(..)=>unreachable!(),};unsafe{lettmp=Sender::new(Flavor::Stream(new_inner));mem::swap(self.inner_mut(),tmp.inner_mut());}ret.map_err(SendError)}

事实上,对于我们的case,只有需要关注一句代码即可:

Flavor::Shared(refp)=>returnp.send(t).map_err(SendError),

这里的p是Arc<:packet>>的一个引用。我们继续看p.send(t):

pubfn send(&self,t: T)-> Result{// See Port::drop for what's going onifself.port_dropped.load(Ordering::SeqCst){returnErr(t)}ifself.cnt.load(Ordering::SeqCst){self.take_to_wake().signal();}nifn{// see the comment in 'try' for a shared channel for why this// window of "not disconnected" is ok.self.cnt.store(DISCONNECTED,Ordering::SeqCst);ifself.sender_drain.fetch_add(1,Ordering::SeqCst)==0{loop{// drain the queue, for info on the thread yield see the// discussion in try_recvloop{matchself.queue.pop(){mpsc::Data(..)=>{}mpsc::Empty=>break,mpsc::Inconsistent=>thread::yield_now(),}}ifself.sender_drain.fetch_sub(1,Ordering::SeqCst)==1{break}}}}// Can't make any assumptions about this case like in the SPSC case._=>{}}Ok(())}

同时,我们再看下shared::Packet的数据结构跟初始化信息:

constDISCONNECTED: isize =isize::MIN;constFUDGE: isize =1024;pubstruct Packet{queue: mpsc::Queue,cnt: AtomicIsize,// How many items are on this channelsteals: UnsafeCell,// How many times has a port received without blocking?to_wake: AtomicUsize,// SignalToken for wake upchannels: AtomicUsize,port_dropped: AtomicBool,sender_drain: AtomicIsize,select_lock: Mutex,}pubfn new()-> Packet{Packet{queue: mpsc::Queue::new(),cnt: AtomicIsize::new(0),steals: UnsafeCell::new(0),to_wake: AtomicUsize::new(0),channels: AtomicUsize::new(2),port_dropped: AtomicBool::new(false),sender_drain: AtomicIsize::new(0),select_lock: Mutex::new(()),}}

我们发现:port_dropped用于标记接收端是否已经drop。

cnt会计数当前存入多少个数据。同时cnt通过跟DISCONNECTED的比较来判断消费者是否已断开。

如果send中发现消费的一方已经断开,则会自己尝试pop所有的数据,将他们清理掉。

主要的操作是通过self.queue.push(t)来完成。

那这个self.queue是怎么实现的呢?看下它的代码,位于文件sync/mpsc/mpsc_queue.rs:

pubstruct Queue{head: AtomicPtr>,tail: UnsafeCell>,}unsafeimplSendforQueue{}unsafeimplSyncforQueue{}implQueue{pubfn new()-> Queue{letstub=unsafe{Node::new(None)};Queue{head: AtomicPtr::new(stub),tail: UnsafeCell::new(stub),}}pubfn push(&self,t: T){unsafe{letn=Node::new(Some(t));letprev=self.head.swap(n,Ordering::AcqRel);(*prev).next.store(n,Ordering::Release);}}pubfn pop(&self)-> PopResult{unsafe{lettail=*self.tail.get();letnext=(*tail).next.load(Ordering::Acquire);if!next.is_null(){*self.tail.get()=next;assert!((*tail).value.is_none());assert!((*next).value.is_some());letret=(*next).value.take().unwrap();let_: Box>=Box::from_raw(tail);returnData(ret);}ifself.head.load(Ordering::Acquire)==tail{Empty}else{Inconsistent}}}............}

事实上,它采用了Non-intrusive MPSC node-based queue的算法,构造了一个mpsc的单向链表,感兴趣的可以通过这个链接详细了解。

这个算法的优点是:push:并发特别快,无等待并且几乎仅仅一个swap(XCHG指令)操作,通过不断地先swap成为head,然后再链接prev_head.next = head来构造链表。

缺点是:non-Linearability:不具备线性一致性,push操作会阻塞pop操作,pop操作中如果发现head != tail 同时 tail.next还没来得变为非null,那么就观察到整个队列处于不一致的状态,这种情况下这里的实现返回Inconsistent。

同时我们看一下Node的代码:

struct Node{next: AtomicPtr>,value: Option,}implNode{unsafefn new(v: Option)-> *mutNode{Box::into_raw(boxNode{next: AtomicPtr::new(ptr::null_mut()),value: v,})}}

相对以往不同的是new操作返回的是*mut Node,这里通过Box::into_raw让使用者自己负责Node的内存释放。

另一方面,当我们Receiver.recv()时假如channel中没有数据,那么就需要等待,所以我们再看下相关的代码:

pubfn recv(&self)-> Result{loop{letnew_port=match*unsafe{self.inner()}{Flavor::Oneshot(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(oneshot::Disconnected)=>returnErr(RecvError),Err(oneshot::Upgraded(rx))=>rx,Err(oneshot::Empty)=>unreachable!(),}}Flavor::Stream(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(stream::Disconnected)=>returnErr(RecvError),Err(stream::Upgraded(rx))=>rx,Err(stream::Empty)=>unreachable!(),}}Flavor::Shared(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(shared::Disconnected)=>returnErr(RecvError),Err(shared::Empty)=>unreachable!(),}}Flavor::Sync(refp)=>returnp.recv(None).map_err(|_|RecvError),};unsafe{mem::swap(self.inner_mut(),new_port.inner_mut());}}}

只要看:

pubfn recv(&self)-> Result{loop{letnew_port=match*unsafe{self.inner()}{.........Flavor::Shared(refp)=>{matchp.recv(None){Ok(t)=>returnOk(t),Err(shared::Disconnected)=>returnErr(RecvError),Err(shared::Empty)=>unreachable!(),}}};...........}}

接着看p.recv(),它的返回值决定了调用结果:

pubfn recv(&self,deadline: Option)-> Result{// This code is essentially the exact same as that found in the stream// case (see stream.rs)matchself.try_recv(){Err(Empty)=>{}data=>returndata,}let(wait_token,signal_token)=blocking::tokens();ifself.decrement(signal_token)==Installed{ifletSome(deadline)=deadline{lettimed_out=!wait_token.wait_max_until(deadline);iftimed_out{self.abort_selection(false);}}else{wait_token.wait();}}matchself.try_recv(){data@Ok(..)=>unsafe{*self.steals.get()-=1;data},data=>data,}}

这里的逻辑是,前面的self.try_recv假如返回了数据,那么直接返回数据即可。否则很可能channel为空,所以通过blocking::tokens()为Receiver准备阻塞相关的数据,然后通过decrement方法再次判断是否有数据,从而进入阻塞状态,decrement代码:

fn decrement(&self,token: SignalToken)-> StartResult{unsafe{assert_eq!(self.to_wake.load(Ordering::SeqCst),0);letptr=token.cast_to_usize();self.to_wake.store(ptr,Ordering::SeqCst);letsteals=ptr::replace(self.steals.get(),0);matchself.cnt.fetch_sub(1+steals,Ordering::SeqCst){DISCONNECTED=>{self.cnt.store(DISCONNECTED,Ordering::SeqCst);}n=>{assert!(n>=0);ifn-steals<=0{returnInstalled}}}self.to_wake.store(0,Ordering::SeqCst);drop(SignalToken::cast_from_usize(ptr));Abort}}

如上所示,将token: SignalToken的指针放入to_wake中,等待将来被唤醒。

所以这里通过self.cnt字段减除1+ steals来判断队列是否为空,原因在于这里的计数方式并不是每次pop一个数据就将cnt-1,也许是为了性能考虑,我们将pop的数据个数汇总在了steals字段中,然后等到steals足够大或者发现channel为空了才去修改cnt的值。所以这里通过self.cnt - (1+ steals) 与 0 比较来判断是否已有数据,如果没有则返回Installed,否则清理数据再返回Abort。

我们先看下Installed之后的逻辑:

ifself.decrement(signal_token)==Installed{ifletSome(deadline)=deadline{lettimed_out=!wait_token.wait_max_until(deadline);iftimed_out{self.abort_selection(false);}}else{wait_token.wait();}}

对于我们的情况它只是调用 wait_token.wait(),代码为:

implWaitToken{pubfn wait(self){while!self.inner.woken.load(Ordering::SeqCst){thread::park()}}...........

先检查woken再调用park(),注意这里是与之前Send的send操作相匹配的:

pubfn send(&self,t: T)-> Result{.............self.queue.push(t);matchself.cnt.fetch_add(1,Ordering::SeqCst){-1=>{self.take_to_wake().signal();}..........

我们看下相关的代码:

fn take_to_wake(&self)-> SignalToken{letptr=self.to_wake.load(Ordering::SeqCst);self.to_wake.store(0,Ordering::SeqCst);assert!(ptr!=0);unsafe{SignalToken::cast_from_usize(ptr)}implSignalToken{pubfn signal(&self)-> bool {letwake=!self.inner.woken.compare_and_swap(false,true,Ordering::SeqCst);ifwake{self.inner.thread.unpark();}wake}....}

先设置woken再调用unpark()。如此一来确保等待的Receiver不会永远睡眠。

我们再看下decrement返回Abort的情况:

pubfn recv(&self,deadline: Option)-> Result{matchself.try_recv(){Err(Empty)=>{}data=>returndata,}let(wait_token,signal_token)=blocking::tokens();ifself.decrement(signal_token)==Installed{.............}matchself.try_recv(){data@Ok(..)=>unsafe{*self.steals.get()-=1;data},data=>data,}}

只是再次调用self.try_recv()而已,至于这里为什么会有*self.steals.get()-=1的操作,那是要看try_recv操作本身了,它有一个默认steals+1的操作,但是这里的第二个self.try_recv()的计数已经cnt汇总了,所以这个不需要steals+1,我们通过-1来平衡:

pubfn try_recv(&self)-> Result{letret=matchself.queue.pop(){mpsc::Data(t)=>Some(t),mpsc::Empty=>None,mpsc::Inconsistent=>{letdata;loop{thread::yield_now();matchself.queue.pop(){mpsc::Data(t)=>{data=t;break}mpsc::Empty=>panic!("inconsistent => empty"),mpsc::Inconsistent=>{}}}Some(data)}};matchret{Some(data)=>unsafe{if*self.steals.get()>MAX_STEALS{matchself.cnt.swap(0,Ordering::SeqCst){DISCONNECTED=>{self.cnt.store(DISCONNECTED,Ordering::SeqCst);}n=>{letm=cmp::min(n,*self.steals.get());*self.steals.get()-=m;self.bump(n-m);}}assert!(*self.steals.get()>=0);}*self.steals.get()+=1;Ok(data)},None=>{matchself.cnt.load(Ordering::SeqCst){nifn!=DISCONNECTED=>Err(Empty),_=>{matchself.queue.pop(){mpsc::Data(t)=>Ok(t),mpsc::Empty=>Err(Disconnected),// with no senders, an inconsistency is impossible.mpsc::Inconsistent=>unreachable!(),}}}}}}

从代码中可以看到,如果pop()取得数据则直接返回;如果Empty则返回None,从而让Receiver可以陷入等待;如果Inconsistent 则说明队列处于push操作稍慢的不一致状态,我们的办法就是通过thread::yield_now(),一直调用pop()直到返回数据或者None。

另外,的确是通过MAX_STEALS 这个字段先汇总steals的值:

matchret{Some(data)=>unsafe{if*self.steals.get()>MAX_STEALS{matchself.cnt.swap(0,Ordering::SeqCst){DISCONNECTED=>{self.cnt.store(DISCONNECTED,Ordering::SeqCst);}n=>{letm=cmp::min(n,*self.steals.get());*self.steals.get()-=m;self.bump(n-m);}}assert!(*self.steals.get()>=0);}*self.steals.get()+=1;Ok(data)},...............}

假如steals足够大,大于MAX_STEALS 我们才通过与cnt比较,然后从cnt中减除它。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/505963.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

android settext 参数,Android TextView.setTextColor()的参数设置方式

摘要&#xff1a;Android TextView.setTextColor()的参数设置方式查了下资料发现setTextColor()的参数应该写成以下的这种形式&#xff1a;setTextColor(0xFF0000FF);//0xFF0000FF是int类型的数据&#xff0c;分组一下0x|FF|0000FF&#xff0c;0x是代表颜色整数的标记&#xff…

vscode angular智能提示_【线下活动】手把手教你玩转 VS Code 插件开发

感谢 Google Developer Group 的邀请&#xff0c;3 月 30 号下午&#xff0c;韩老师将手把手带你玩转 VS Code 插件开发。 Angular 使用了 TypeScript&#xff0c;VS Code 使用了 Chromium。感谢这个开放与包容的时代&#xff0c;技术无界&#xff0c;正是大家对技术有着执着的…

ext js如何动态更改xtype_K8S ConfigMap 用于动态应用程序的实践

编辑&#xff1a;小君君技术校对&#xff1a;星空下的文仔、bot在 Kubernetes 中&#xff0c;ConfigMap 是允许管理员将配置组件与镜像内容解耦&#xff0c;使容器化应用程序产生可移植性的一种资源。ConfigMap 可以与 Kubernetes Pod 一起使用&#xff0c;用于动态添加或更改容…

android contacts 编辑,如何在Android中的.csv文件中逐行编写contactn...

编辑.import java.io.File;import java.io.FileWriter;import java.io.IOException;import android.app.Activity;import android.content.Intent;import android.database.Cursor;import android.net.Uri;import android.os.Bundle;import android.os.Environment;import andr…

python教材答案第六章_python第六章{输入和输出}

输出 用print加上字符串&#xff0c;就可以向屏幕上输出指定的文字。比如输出hello, world&#xff0c;用代码实现如下&#xff1a; >>>print hello, world print语句也可以跟上多个字符串&#xff0c;用逗号“,”隔开&#xff0c;就可以连成一串输出&#xff1a; >…

字长16位的计算机表示最大整数_废话不多说跪送计算机选择8前十题

1.字长是CPU的主要性能指标之一,它表示(a)a.CPU—一次能处理二进制数据的位数b.最长的十进制整数的位数c.最大的有效数字位数d.计算结果的有效数字长度答案解析【解析】字长是指计算机运算部件一次能同时处理的二进制数据的位数。2.字长为7位的无符号二进制整数能表示的十进制整…

python程序结构框架_Python——Flask框架——程序的基本结构

一、安装 pip install flask 二、初始化 from flask importFlask app Flash(__name__) 三、路由&#xff1a;处理URL和函数之间的关系的程序称为路由 &#xff08;1&#xff09;路由装饰器 app.route(/)defindex():return ( Hello World) &#xff08;2&#xff09;动态路由 ap…

阿里云python服务器_Python服务器

{"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],"search_count":[{"count_phone":6,"count":6}]},"card":[{"des":"云服务器 ECS(Elastic Compute Service)是一…

单片机传输浮点数给android,请问单片机怎么接收从串口发送过来的浮点数?

如题&#xff0c;单片机接收串口发送的浮点数&#xff0c;然后进行处理&#xff0c;我想的是建立一个二维数组&#xff0c;想把浮点数一位一位的存到数组里&#xff0c;因为要接收多个浮点数所以用了二维数组&#xff0c;可是实际发现是不可行的&#xff0c;请问到底应该怎么接…

节点name在graph中无法展示_图节点分类与消息传递

Message passing and node classification本文主要解决的问题&#xff1a;给定一个网络&#xff0c; 其中部分节点有label&#xff0c; 如何能将其他的节点分配对应的节点label呢&#xff1f; &#xff08;在生活中有很多这样的例子&#xff0c; 比如通过交互行为来判断用户是否…

iview 输入框_使用iview框架,如何进行输入框或者按钮的关联验证

iview框架的Form 组件基于 async-validator 实现数据验证,给 Form 设置属性 rules&#xff0c;同时给需要验证的 FormItem 设置属性 prop 指向对应字段即可。简单的验证北京上海深圳男女提交重置export default{data () {return{formValidate: {name:,mail:,city:,gender:,inte…

华为荣耀v20是android10,荣耀V20和荣耀V10买哪个好

随着华为NOVA 4的发布&#xff0c;接下来要期待的就是荣耀V20了。从目前曝光的信息来看&#xff0c;这款手机确实憋了不少大招&#xff0c;而且比华为NOVA 4更好的一点是它搭载的是麒麟980处理器&#xff0c;至于它的价格应该会与华为NOVA 4相近。虽然荣耀V20很诱人&#xff0c…

git配置全局用户名和密码_还在手动打包,手动传jar包?那你确实应该学一下jekins配置了...

本文为小编原创文章&#xff0c;首发于Java识堂微信公众号&#xff0c;一个高原创&#xff0c;高收藏的公众号&#xff0c;转载请联系作者先说jekins能干啥&#xff0c;你把代码放到git上&#xff0c;jekins就能帮你编译代码&#xff0c;并且把jar包放到相应的服务器上&#xf…

data 谷歌浏览器更改user 路径_Chrome浏览器自定义设置个人信息存储路径

序言Chrome浏览器很好用&#xff0c;感觉也很快&#xff0c;但是&#xff0c;也是有那么几个小瑕疵的。例如&#xff0c;Chrome浏览器无法设置安装路径&#xff0c;只能安装在默认的C盘&#xff0c;个人信息默认放在C盘&#xff0c;详细路径如下&#xff1a;对于我这种对C盘有洁…

android progressdialog 样式,android之修改系统自带ProgressDialog样式

1.ProgressDialog默认样式是当前Activity的theme所定义的ProgressDialog样式。继承自父类AlertDialog的style.AlertDialog的源码static int resolveDialogTheme(Context context, int resid) {if (resid THEME_TRADITIONAL) {return com.android.internal.R.style.Theme_Dialo…

python图片横向合并_[宜配屋]听图阁

起因&#xff1a; 有一批数据需要每个月进行分析&#xff0c;数据存储在excel中&#xff0c;行标题一致&#xff0c;需要横向合并进行分析。 数据示意&#xff1a;具有多个代码&#xff1a; # -*- coding: utf-8 -*- """ Created on Sun Nov 12 11:19:03 2017 a…

微信小程序是否有路由拦截_微信小程序--路由拦截器

背景由于最近公司要求开发小程序,一直很感兴趣,也是趁着这次机会,边文档边开发,遇到一些好玩的东西,留此作为笔记介绍在开发小程序,构建路由时,因为需要权限判断,最常见的就是在路由跳转时判断用户是否有权限访问或使用该功能,而在官网找了很久也没有看到,诸如 vue-router 的 b…

vue项目android,Android与Vue项目交互

1. Android代码class MainActivity : AppCompatActivity() {private lateinit var callJSBtn: Buttonprivate lateinit var webView: WebViewprivate var ajObject: AjObject AjObject()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceSt…

updatebyprimarykeyselective返回什么是成功_嫦娥五号发射升空成功!!!

嫦娥五号探测器发射成功1聚焦发射当时2020年11月24日4时30分&#xff0c;我国在中国文昌航天发射场&#xff0c;用长征五号遥五运载火箭成功发射探月工程嫦娥五号探测器&#xff0c;火箭飞行约2200秒后&#xff0c;顺利将探测器送入预定轨道&#xff0c;开启我国首次地外天体采…

基于php的外卖订餐系统开题报告_订餐系统开题报告.doc

订餐系统开题报告订餐系统开题报告附件6&#xff1a;广东工商职业学院毕业设计(论 文)开题报告题目校园订餐系统设计与实现系 (部)计算机应用技术系专业班级姓名学号指导老师2015年10月30日毕业设计(论文)开题报告题目校园订餐系统设计与实现时间2015年10月30日至2015年11月10日…