10

Given a collection of Futures, say a Vec<impl Future<..>>, how can I block and run all of the Futures concurrently until the first Future is ready?

The closest feature I can find is the select macro (which is also available in Tokio). Unfortunately it only works with an explicit number of Futures, instead of handling a collection of them.

There is an equivalent of this feature in Javascript, called Promise.race. Is there a way to do this in Rust?

Or perhaps there's a way to fulfill this use case using another pattern, perhaps with channels?

modulitos
  • 14,737
  • 16
  • 67
  • 110
  • 3
    I can't write a proper answer right now, but check if the `FuturesUnordered` (https://docs.rs/futures/0.3.5/futures/stream/struct.FuturesUnordered.html) is what you want. – Cerberus Sep 03 '20 at 03:12
  • 3
    I think you might want [`futures::future::select_all`](https://docs.rs/futures/0.3.5/futures/future/fn.select_all.html) – Herohtar Sep 03 '20 at 04:22
  • Does this answer your question? [How can I spawn asynchronous methods in a loop?](https://stackoverflow.com/questions/63434977/how-can-i-spawn-asynchronous-methods-in-a-loop) In particular, see the `join_parallel` function defined in the answer. – user4815162342 Sep 03 '20 at 08:06
  • 1
    @user4815162342 I don't think that suggestion answers the question, because `join_parallel` requires waiting for *all* of the futures to finish executing. Instead, I only want to wait until the fastest future is finished, then unblock the thread. Also, executing the futures in parallel is not a requirement for this question. But thanks for your comment! – modulitos Sep 08 '20 at 06:00

1 Answers1

11

I figured out a solution using the select_all function from the futures library.

Here is a simple example to demonstrate how it can be used to race a collection of futures:

use futures::future::select_all;
use futures::FutureExt;
use tokio::time::{delay_for, Duration};

async fn get_async_task(task_id: &str, seconds: u64) -> &'_ str {
    println!("starting {}", task_id);
    let duration = Duration::new(seconds, 0);

    delay_for(duration).await;

    println!("{} complete!", task_id);
    task_id
}

#[tokio::main]
async fn main() {
    let futures = vec![

        // `select_all` expects the Futures iterable to implement UnPin, so we use `boxed` here to
        // allocate on the heap:
        // https://users.rust-lang.org/t/the-trait-unpin-is-not-implemented-for-genfuture-error-when-using-join-all/23612/3
        // https://docs.rs/futures/0.3.5/futures/future/trait.FutureExt.html#method.boxed

        get_async_task("task 1", 5).boxed(),
        get_async_task("task 2", 4).boxed(),
        get_async_task("task 3", 1).boxed(),
        get_async_task("task 4", 2).boxed(),
        get_async_task("task 5", 3).boxed(),
    ];

    let (item_resolved, ready_future_index, _remaining_futures) =
        select_all(futures).await;

    assert_eq!("task 3", item_resolved);
    assert_eq!(2, ready_future_index);
}

Here's a link to the code above: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=f32b2ed404624c1b0abe284914f8658d

Thanks to @Herohtar for suggesting select_all in the comments above!

modulitos
  • 14,737
  • 16
  • 67
  • 110