14

I have a vector of objects that have a resolve() method that uses reqwest to query an external web API. After I call the resolve() method on each object, I want to print the result of every request.

Here's my half-asynchronous code that compiles and works (but not really asynchronously):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

I've tried to use tokio::join! to spawn all async calls and wait for them to finish, but I'm probably doing something wrong:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

Here's the error I'm getting:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

How can I call the resolve() methods for all instances at once?


This code reflects the answer - now I'm dealing with borrow checker errors that I don't really understand - should I annotate some of my variables with 'static?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Djent
  • 2,877
  • 10
  • 41
  • 66
  • `for_each` is intended to execute in place, without returning anything. What you would want is to `map` each item to call `resolve` which yields `Future`s, then collect them in a vector. From there you need to wait for all tasks to completion. – Alexey S. Larionov Aug 16 '20 at 09:58
  • The last part is not as easy though, with `join` you can achieve concurrency, not necessarily parallelism. Some workarounds might be found [there](https://www.reddit.com/r/rust/comments/dt6u0s/joined_futures_will_run_concurrently_not_in/f6w5my2/?context=8&depth=9) – Alexey S. Larionov Aug 16 '20 at 09:59
  • It's hard to answer your question because it doesn't include a [MRE]. We can't tell what crates (and their versions), types, traits, fields, etc. are present in the code. It would make it easier for us to help you if you try to reproduce your error on the [Rust Playground](https://play.rust-lang.org) if possible, otherwise in a brand new Cargo project, then [edit] your question to include the additional info. There are [Rust-specific MRE tips](//stackoverflow.com/tags/rust/info) you can use to reduce your original code for posting here. Thanks! – Shepmaster Aug 18 '20 at 15:09

1 Answers1

15

Since you want to await on the futures in parallel, you can spawn them into individual tasks that run in parallel. Since they run independently of each other and of the thread that spawned them, you can await their handles in any order.

Ideally you'd write something like this:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

But this will be rejected by the borrow checker because the future returned by item.resolve() holds a borrowed reference to item. The reference is passed to tokio::spawn() which hands it off to another thread, and the compiler cannot prove that item will outlive that thread. (The same kind of problem is encountered when you want to send reference to local data to a thread.)

There are several possible solutions to this; the one I find most elegant is to move items into the async closure passed to tokio::spawn(), and have the task hand them back to you once it's done. Basically you consume the items vector to create the tasks and immediately reconstitute it from the awaited results:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

Runnable code in the playground.

Note that the futures crate contains a join_all function which is similar to what you need, except it polls the individual futures without ensuring that they run in parallel. We can write a generic join_parallel that uses join_all, but also uses tokio::spawn to get parallel execution:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

Using this function the code needed to answer the question boils down to just:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

Again, runnable code in the playground.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • 1
    I don't really care about the results, as the `resolve()` method doesn't return anything but changes the internal value in the object instead. I'm trying to use spawn, but since I need the object mutable I have some issues regarding the borrow checker. – Djent Aug 16 '20 at 18:23
  • 1
    @Djent You can safely ignore the results as long as you await the handles after they've all been created (see edited answer). Can you edit the question to include the borrow checker error? – user4815162342 Aug 16 '20 at 18:52
  • 1
    I've updated the answer. I'm still quite new to rust and don't really understand all the stuff with the borrow checker (good knowledge sources other than the book are welcome) – Djent Aug 16 '20 at 19:05
  • 1
    @Djent Try `for item in &items` or `for item in items.iter()` when printing them. As written, the `for` loop is consuming `items` whose elements are still borrowed by the tasks. – user4815162342 Aug 16 '20 at 19:10
  • I'm more concerned about the `borrowed value does not live long enough` error. It's like the `tokio::spawn()` would pass the reference to the item to a scope where `items` is already freed. Also using `items.iter()` or `&items` changes the error to `cannot borrow `items` as immutable because it is also borrowed as mutable` which confuses me more when combined with the `value does not live long enough` error. – Djent Aug 18 '20 at 14:33
  • @Djent I believe I've now figured out the borrow checker issue; the problem is that Rust couldn't prove that the tasks won't outlive `items`. See edited answer for a solution. – user4815162342 Aug 18 '20 at 14:58
  • 2
    Thank you for your patience. Now I understand more what is happening. The solution is very clever, I like the fact that I don't have to change the return type of `resolve()` method as it has been handled by an anonymous function. – Djent Aug 18 '20 at 18:03