6

This MWE shows the use of tokio::spawn in for in loop. The commented code sleepy_futures.push(sleepy.sleep_n(2)); works fine, but does not run/poll the async function.

Basically, I would like to run a bunch of async functions at the same time. I am happy to change the implementation of Sleepy or use another library/technique.

pub struct Sleepy;
impl Sleepy {
    pub async fn sleep_n(self: &Self, n: u64) -> String {
        sleep(Duration::from_secs(n));
        "test".to_string()
    }
}

#[tokio::main(core_threads = 4)]
async fn main() {
    let sleepy = Sleepy{};

    let mut sleepy_futures = vec::Vec::new();
    for _ in 0..5 {
        // sleepy_futures.push(sleepy.sleep_n(2));
        sleepy_futures.push(tokio::task::spawn(sleepy.sleep_n(2)));
    }

    let results = futures::future::join_all(sleepy_futures).await;
    for result in results {
        println!("{}", result.unwrap())
    }
}
chmoder
  • 876
  • 10
  • 19
  • This is a great suggestion, yes I can. But in the actual project, `Sleepy` has a connection pool so I would like to share it. – chmoder Aug 10 '20 at 20:26
  • 2
    It looks like since `sleep_n` references `&self` then it needs to have access to `sleepy` even once kicked off. If you get rid of that it should be fine. Otherwise it will need to have access to it. You can wrap it in an `Rc<_>` if you need to give access to multiple tasks. – tadman Aug 10 '20 at 20:27
  • Why aren't you declaring it conventionally like `pub async fn sleep_n(&self, ...)`? – tadman Aug 10 '20 at 20:28
  • You could also push the promise straight into your `sleepy_futures` and then [`join_all`](https://docs.rs/futures/0.3.5/futures/future/fn.join_all.html) on that. You'd only want to use `task::spawn` if you're doing some heavy lifting that potentially requires separate threads. – tadman Aug 10 '20 at 20:29
  • 1
    Thanks for all these suggestions, `join_all` seems to run them synchronously, that is when I found `task::spawn`. – chmoder Aug 10 '20 at 20:33
  • 2
    If you're using a hard `sleep` you're going to jam the reactor so it will appear to run them sequentially. Use Tokio sleep tools like [`delay_for`](https://docs.rs/tokio/0.2.22/tokio/time/fn.delay_for.html). Rule #1 in asynchronous code: **Never block the event loop**. This is an easy thing to forget as Rust won't warn you about it, even though it should. – tadman Aug 10 '20 at 20:34

1 Answers1

2

Here's a rough stab at fixing it:

use tokio::time::delay_for;

pub struct Sleepy;
impl Sleepy {
    pub async fn sleep_n(n: u64) -> String {
        delay_for(Duration::from_secs(n)).await;
        "test".to_string()
    }
}

Where now it's no longer anchored to any particular Sleepy instance, eliminating the lifetime issue. You'd call it like Sleepy::sleep_n.

It takes a little more work if that &self is required:

use std::sync::Arc;
use std::time::Duration;
use std::vec;
use tokio;
use tokio::time::delay_for;

pub struct Sleepy;
impl Sleepy {
    pub async fn sleep_n(&self, n: u64) -> String {
        // Call .await here to delay properly
        delay_for(Duration::from_secs(n)).await;
        "test".to_string()
    }
}

#[tokio::main(core_threads = 4)]
async fn main() {
    env_logger::init();

    let sleepy = Arc::new(Sleepy {});

    let mut sleepy_futures = vec::Vec::new();
    for _ in 0..5 {
        let sleepy = sleepy.clone();

        // Dictate that values are moved into the task instead of 
        // being borrowed and dropped.  
        sleepy_futures.push(tokio::task::spawn(async move {
            sleepy.sleep_n(2).await
        }));
    }

    let results = futures::future::join_all(sleepy_futures).await;
    for result in results {
        println!("{}", result.unwrap())
    }
}

Here Arc is used to wrap the object since task may use threads, so Rc isn't sufficient.

tadman
  • 208,517
  • 23
  • 234
  • 262
  • This is a tough one. [Here](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=d89fe431c24a082eb6dd1dbc7fcc2c4d) is a rust playground with the code sample. `sleepy` does not live long enough – chmoder Aug 10 '20 at 20:48
  • I've updated the code with a [working version](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=dc1049457c8c72d310dce5615497d77c). Thanks for making the sample. – tadman Aug 10 '20 at 20:59
  • 1
    Thank you, well done! I even verified it works with the _actual_ project. – chmoder Aug 10 '20 at 21:05