12

I've got a Vec of futures created from calling an async function. After adding all the futures to the vector, I'd like to wait on the whole set, getting either a list of results, or a callback for each one that finishes.

I could simply loop or iterate over the vector of futures and call .await on each future and that would allow me to handle errors correctly and not have futures::future::join_all cancel the others, but I'm sure there's a more idiomatic way to accomplish this task.

I also want to be able to handle the futures as they complete so if I get enough information from the first few, I can cancel the remaining incomplete futures and not wait for them and discard their results, error or not. This would not be possible if I iterated over the vector in order.

What I'm looking for is a callback (closure, etc) that lets me accumulate the results as they come in so that I can handle errors appropriately or cancel the remainder of the futures (from within the callback) if I determine I don't need the rest of them.

I can tell that's asking for a headache from the borrow checker: trying to modify a future in a Vec in a callback from the async engine.

There's a number of Stack Overflow questions and Reddit posts that explain how join_all joins on a list of futures but cancels the rest if one fails, and how async engines may spawn threads or they may not or they're bad design if they do.

John Kugelman
  • 349,597
  • 67
  • 533
  • 578
stu
  • 8,461
  • 18
  • 74
  • 112
  • 1
    It's hard to answer your question because it doesn't include a [MRE]. We can't tell what crates (and their versions), types, traits, fields, etc. are present in the code. It would make it easier for us to help you if you try to reproduce your error on the [Rust Playground](https://play.rust-lang.org) if possible, otherwise in a brand new Cargo project, then [edit] your question to include the additional info. There are [Rust-specific MRE tips](//stackoverflow.com/tags/rust/info) you can use to reduce your original code for posting here. Thanks! – Shepmaster Apr 28 '20 at 13:21
  • There's no error. I didn't supply a code sample because I don't have one yet. I don't know how to solve this problem, so that's why I'm asking. I am not asking for 'how to fix an error', I'm asking how to approach the problem since I can't find any tools that seem to do what I want. – stu Apr 28 '20 at 13:29
  • That suggests that you are still on the research phase. The other problem is that it is unclear which `join_all` you speak of. For what it's worth, the `join_all` I found is the one in the `futures` crate, but since [`future::join_all`](https://docs.rs/futures/0.3.4/futures/future/fn.join_all.html) is agnostic to the future's item, it won't cancel existing futures if one of them yields an error. – E_net4 Apr 28 '20 at 13:34
  • 1
    *I didn't supply a code sample because I don't have one yet.* vs *I've got a `Vec` of futures created from calling an `async` function* — the latter suggests that you've got code that you've chosen to not share. – Shepmaster Apr 28 '20 at 13:48
  • 1
    @E_net4willhelpyouout An [older version of `future::join_all`](https://docs.rs/futures/0.2.0/futures/future/fn.join_all.html) canceled the other futures on failure. – John Kugelman Apr 28 '20 at 15:03
  • perhaps I was reading old documentation. – stu Apr 28 '20 at 16:18
  • `the latter suggests that you've got code that you've chosen to not share` So... I have a program yes, but it doesn't yet do what I'm trying to do because I don't know how. It did not occur to me that that code might be useful in answering the question because it doesn't exhibit any problem. I'm planning on trying something new, so I was asking about the new thing. With all that, somebody was able to answer apparently. – stu Apr 28 '20 at 16:20
  • actually to be more specific, I have a program that doesn't use futures at all, and I am PLANNING on having a vector of futures. – stu Apr 28 '20 at 16:46
  • Shepmaster: `It's hard to answer your question because it doesn't include a minimal reproducible example` Code is not required to ask a question on Stack Overflow. `That suggests that you are still on the research phase.` Not being on the "research phase" is also not a requirement to post to Stack Overflow. This sort of attitude is why SO is declining. Please be more welcoming to posters in the future. – Qix - MONICA WAS MISTREATED Jul 11 '21 at 03:16

4 Answers4

12

Use futures::select_all:

use futures::future; // 0.3.4

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn might_fail(fails: bool) -> Result<i32> {
    if fails {
        Ok(42)
    } else {
        Err("boom".into())
    }
}

async fn many() -> Result<i32> {
    let raw_futs = vec![might_fail(true), might_fail(false), might_fail(true)];
    let unpin_futs: Vec<_> = raw_futs.into_iter().map(Box::pin).collect();
    let mut futs = unpin_futs;

    let mut sum = 0;

    while !futs.is_empty() {
        match future::select_all(futs).await {
            (Ok(val), _index, remaining) => {
                sum += val;
                futs = remaining;
            }
            (Err(_e), _index, remaining) => {
                // Ignoring all errors
                futs = remaining;
            }
        }

        if sum > 42 {
            // Early exit
            return Ok(sum);
        }
    }

    Ok(sum)
}

This will poll all the futures in the collection, returning the first one that is not pending, its index, and the remaining pending or unpolled futures. You can then match on the Result and handle the success or failure case.

Call select_all inside of a loop. This gives you the ability to exit early from the function. When you exit, the futs vector is dropped and dropping a future cancels it.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
1

As mentioned in comments by @kmdreko, @John Kugelman and @e-net4-the-comment-flagger, use join_all from the futures crate version 0.3. The join_all in futures version 0.2 has the cancel behavior described in the question, but join_all in futures crate version 0.3 does not.

Bryan Larsen
  • 9,468
  • 8
  • 56
  • 46
0

For a quick/hacky solution without having to mess with select_all. You can use join_all, but instead of returning Result<T> return Result<Result<T>> - always returning Ok for the top-level result

Sam Denty
  • 3,693
  • 3
  • 30
  • 43
-3

Or you can use join_all

The example from the documentation:

use futures::future::join_all;

async fn foo(i: u32) -> u32 { i }

let futures = vec![foo(1), foo(2), foo(3)];

assert_eq!(join_all(futures).await, [1, 2, 3]);
iovanom
  • 110
  • 2
  • 3
    The original question specifically complains about `join_all` saying that it stops at the first error. This is untrue in the current version of the function (and even when posted judging by the comments), but you should at least remark that the premise is not true. – kmdreko Feb 02 '22 at 08:34