打开tokio-timer可以看到timout
#[must_use = "futures do nothing unless polled"] #[derive(Debug)] pub struct Timeout<T> { value: T, delay: Delay, }
很简单的结构,value就是我们要加timeout的future或者stream,delay就是多久之后就timeout(:
看看他实现的Future和Stream trait
impl<T> Future for Timeout<T> where T: Future, { type Item = T::Item; type Error = Error<T::Error>; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { // First, try polling the future match self.value.poll() { Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), Ok(Async::NotReady) => {} Err(e) => return Err(Error::inner(e)), } // Now check the timer match self.delay.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => Err(Error::elapsed()), Err(e) => Err(Error::timer(e)), } } } impl<T> Stream for Timeout<T> where T: Stream, { type Item = T::Item; type Error = Error<T::Error>; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { // First, try polling the future match self.value.poll() { Ok(Async::Ready(v)) => { if v.is_some() { self.delay.reset_timeout(); } return Ok(Async::Ready(v)); } Ok(Async::NotReady) => {} Err(e) => return Err(Error::inner(e)), } // Now check the timer match self.delay.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => { self.delay.reset_timeout(); Err(Error::elapsed()) } Err(e) => Err(Error::timer(e)), } }
都是正常去poll value,如果ready了就直接返回,如果没有ready,那么再去poll dealy看看是否timeout,如果没有timeout返回notready,如果timeout了那就返回time elapsed error.
非常简单的逻辑实现了timeout(:
如果我要方便的使用timeout可能需要use tokio::prelude::FutureExt;
fn timeout(self, timeout: Duration) -> Timeout<Self> where Self: Sized, { Timeout::new(self, timeout) }
其实自己用Timeout::new也可以,但是xxx.timeout看着舒服一些
Top comments (0)