自己编写一个impl future来看一下Tokio的是如何实现的。
第一步:
代码:
struct TExecuteTask {count:u32
}impl Future for TExecuteTask {type Output = ();fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {println!("future require");if self.count == 2 {return Ready(());}unsafe {self.get_unchecked_mut().count += 1;}Pending}
}pub fn test1() {let v = TExecuteTask{count:0};let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();println!("test trace1");rt.block_on(v);println!("test trace2");
}
运行结果:
貌似调用block_on之后会回调一次future的poll,如果这个时候还没有计算出结果,那就直接卡死了,所以需要找个地方能唤醒。
第二步:
原来代码上添加上wake
impl Future for TExecuteTask {type Output = ();fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {println!("future require");let mut v = self.core.lock().unwrap();v.waker = Some(cx.waker().clone());Pending}
}pub fn test1() {let waker: Arc<Mutex<TExecuteTaskCore>> = Arc::new(Mutex::new(TExecuteTaskCore {waker:None}));let task: TExecuteTask = TExecuteTask {core:waker.clone()};let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();let closure1 = waker.clone();thread::spawn(move|| {thread::sleep(Duration::from_secs(1));let mut core: std::sync::MutexGuard<'_, TExecuteTaskCore> = closure1.lock().unwrap();if let Some(waker) = core.waker.take() {println!("detect future is ready, wakeup the future task to executor.");waker.wake() // wakeup the future task to executor.}});println!("test trace1");rt.block_on(task);println!("test trace2");
}
关键是在poll返回pending的时候,获取waker。然后等计算完成后再wake一下哈哈。
然后结果呢:
还是卡住了。。。这个是为啥呢?原来是我poll的代码有问题,无论如何处理都是返回pending..
所以从目前的试验来看有如下结论:
1.当调用block_on的时候,会直接调用一次future的poll确认是否运算结束。
2.当调用waker.wake的时候貌似还会调用一次poll确认是否运算结束。
所以我们现在要加一个标志位来表示运算结束,这样block_on应该就能正常走下去了吧。
第三步:
impl Future for TExecuteTask {type Output = ();fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {println!("future require");let mut v = self.core.lock().unwrap();if v.complete == true {return Ready(())}v.waker = Some(cx.waker().clone());Pending}
}pub fn test1() {let waker: Arc<Mutex<TExecuteTaskCore>> = Arc::new(Mutex::new(TExecuteTaskCore {waker:None,complete:false}));let task: TExecuteTask = TExecuteTask {core:waker.clone()};let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();let closure1 = waker.clone();thread::spawn(move|| {thread::sleep(Duration::from_secs(1));let mut core: std::sync::MutexGuard<'_, TExecuteTaskCore> = closure1.lock().unwrap();if let Some(waker) = core.waker.take() {println!("detect future is ready, wakeup the future task to executor.");core.complete = true;waker.wake() // wakeup the future task to executor.}});println!("test trace1");rt.block_on(task);println!("test trace2");
}
添加了一个complete标志运算是否结束。
运算完成后,complete赋值为true,再尝试wake一下等待线程(协程?)
poll中也添加了complete判断,如果为true的话,直接返回Ready,然后我们看一下运行结果:
运行结束~~。哈哈。