Skip to content
7 changes: 2 additions & 5 deletions futures-util/src/future/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures_sink::Sink;

/// Combines two different futures, streams, or sinks having the same associated types into a single
/// type.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Either<A, B> {
/// First branch of the type
Left(A),
Expand Down Expand Up @@ -280,10 +280,7 @@ mod if_std {
A: AsyncBufRead,
B: AsyncBufRead,
{
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<&[u8]>> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
unsafe {
match self.get_unchecked_mut() {
Either::Left(x) => Pin::new_unchecked(x).poll_fill_buf(cx),
Expand Down
83 changes: 83 additions & 0 deletions futures-util/src/future/first.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::future::Either;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

/// Future for the [`first()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug, Clone, Default)]
pub struct First<F1, F2> {
future1: F1,
future2: F2,
}

impl<F1: Unpin, F2: Unpin> Unpin for First<F1, F2> {}

impl<F1, F2> First<F1, F2> {
unsafe_pinned!(future1: F1);
unsafe_pinned!(future2: F2);
Comment on lines +18 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use pin_project instead of pin_utils::unsafe_pinned. (Since #2128, we have been using it.)

}

impl<F1: Future, F2: Future> Future for First<F1, F2> {
type Output = Either<F1::Output, F2::Output>;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future1().poll(cx) {
Poll::Ready(out) => Poll::Ready(Either::Left(out)),
Poll::Pending => match self.future2().poll(cx) {
Poll::Ready(out) => Poll::Ready(Either::Right(out)),
Poll::Pending => Poll::Pending,
},
}
}
}

// We don't provide FusedFuture, because the overhead of implementing it (
// which requires a separate bool or Option field) is precisely the same as
// calling .fuse()

/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
/// futures to complete. The returned future will finish with the value of
/// whichever future finishes first.
///
/// The future will discard the future that didn't complete; see `select` for
/// a future that will instead return the incomplete future.
///
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::into_immer` method to
/// conveniently extract out the value at the end.
pub fn first<F1, F2>(future1: F1, future2: F2) -> First<F1, F2> {
First { future1, future2 }
}

#[test]
fn test_first() {
use crate::future::{pending, ready, FutureExt};
use crate::task::noop_waker_ref;

let mut context = Context::from_waker(noop_waker_ref());

assert_eq!(
first(ready(10), ready(20)).poll_unpin(&mut context),
Poll::Ready(Either::Left(10))
);
assert_eq!(
first(ready(10), pending::<()>()).poll_unpin(&mut context),
Poll::Ready(Either::Left(10))
);
assert_eq!(
first(pending::<()>(), ready(20)).poll_unpin(&mut context),
Poll::Ready(Either::Right(20))
);
assert_eq!(
first(pending::<()>(), pending::<()>()).poll_unpin(&mut context),
Poll::Pending
);
}
111 changes: 111 additions & 0 deletions futures-util/src/future/first_all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use alloc::vec::Vec;
use core::iter::FromIterator;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};

/// Future for the [`first_all()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug, Clone)]
pub struct FirstAll<F> {
// Critical safety invariant: after FirstAll is created, this vector can
// never be reallocated, in order to ensure that Pin is upheld.
futures: Vec<F>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to use Pin<Box<[F]>> + this helper function, instead of Vec<F> + Pin::new_unchecked

}

// Safety: once created, the contents of the vector don't change, and they'll
// remain in place permanently.
impl<F> Unpin for FirstAll<F> {}

impl<F: Future> Future for FirstAll<F> {
type Output = F::Output;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this.futures.iter_mut().find_map(move |fut| {
// Safety: we promise that the future is never moved out of the vec,
// and that the vec never reallocates once FirstAll has been created
// (specifically after the first poll)
let pinned = unsafe { Pin::new_unchecked(fut) };
match pinned.poll(cx) {
Poll::Ready(out) => Some(out),
Poll::Pending => None,
}
}) {
Some(out) => Poll::Ready(out),
None => Poll::Pending,
}
}
}

// We don't provide FusedFuture, because the overhead of implementing it (
// which requires clearing the vector after Ready is returned) is precisely
// the same as using .fuse()

impl<Fut: Future> FromIterator<Fut> for FirstAll<Fut> {
fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
first_all(iter)
}
}

/// Creates a new future which will return the result of the first completed
/// future out of a list.
///
/// The returned future will wait for any future within `futures` to be ready.
/// Upon completion the item resolved will be returned.
///
/// The remaining futures will be discarded when the returned future is
/// dropped; see `select_all` for a version that returns the incomplete
/// futures if you need to poll over them further.
///
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Panics
///
/// This function will panic if the iterator specified contains no items.
pub fn first_all<I>(futures: I) -> FirstAll<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
let futures = Vec::from_iter(futures);
assert!(!futures.is_empty(), "Need at least 1 future for first_any");
FirstAll { futures }
}

#[test]
fn test_first_all() {
use crate::future::FutureExt;
use crate::task::noop_waker_ref;
use futures_channel::oneshot::channel;

let mut futures = vec![];
let mut senders = vec![];

for _ in 0..10 {
let (send, recv) = channel();
futures.push(recv);
senders.push(send);
}

let (send, recv) = channel();
futures.push(recv);

for _ in 0..10 {
let (send, recv) = channel();
futures.push(recv);
senders.push(send);
}

let mut fut = first_all(futures);
let mut context = Context::from_waker(noop_waker_ref());

let poll = fut.poll_unpin(&mut context);
assert_eq!(poll, Poll::Pending);

send.send(10).unwrap();
let poll = fut.poll_unpin(&mut context);
assert_eq!(poll, Poll::Ready(Ok(10)));
}
Loading