3

I have some async function

async fn get_player(name: String, i: Instant) -> Option<Player> {
// some code here that returns a player structs
}

in my main function i want to run the above function concurrently in a loop, this function takes about 1sec to complete and I need to run it at least 50 times, hence I would like to make it concurrently run this function 50 times. in my main function i have a lazy_static custom Client struct which should not be created more than once.

main function

#[tokio::main]
async fn main() {
    client.init().await;

    println!("start");
    for i in 0..10 {
        println!("{}", i);
        let now = Instant::now();

        tokio::spawn(async move  {
            client.get_player("jay".to_string(), now).await;
        });
    }
    loop {}
}

the reason I am passing instant is because in my get_player function i have a println!() that just prints the execution time.

the above main method takes around 500ms for each function call, however the below code only takes 100ms.

#[tokio::main]
async fn maain(){
    client.init().await;

    for i in 0..10 {
        let now = Instant::now();
        client.get_player("jay".to_string(), now).await.expect("panic");
    }
}

but this function is still synchronous code, how do I actually run the async function concurrently without the time cost?

  • To better understand what am after is an implemention similar to this (its in java btw),
     CompleteableFuture.thenAccept(x -> x.SayHello(););

or in Js its something like .then after an async function.

is there any similar implementation in rust?

SomeOnionGamer
  • 203
  • 1
  • 4
  • 8
  • The 500ms for each function call — are those happening concurrently or serially? Can one function call start during another function's 500ms? – BallpointBen Jul 20 '22 at 16:55
  • If you want concurrency, I don't think `async` is the way to go. [Here is a description of what the differences between these approaches are](https://stackoverflow.com/a/4844774/13843935). In Rust, futures don't make progress unless they are actively polled. `tokio` (or other async runtimes) abstract and manage that for you, so the best you can do is store the future in a variable to `await` it later. For real concurrency, you should use threads. – Jeremy Meadows Jul 20 '22 at 16:55
  • @PitaJ Parallelism implies concurrency, but you can have multiple (concurrent) threads of execution running on a single processor utilizing context switching, never running in parallel. – Jeremy Meadows Jul 20 '22 at 19:44
  • 2
    Does this answer your question? [tokio join multiple tasks in rust](https://stackoverflow.com/questions/63589668/tokio-join-multiple-tasks-in-rust) – PitaJ Jul 20 '22 at 19:54
  • Does [this playground](https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=2c2989b344a7c5f5a30401a401c55266) help? Whether you need to `spawn` will depend on whether you need parallelism from the default multi-threaded runtime. – PitaJ Jul 20 '22 at 19:58
  • Whether async in the correct abstraction also depends on whether you're trying to execute IO operations or long-running calculations. The first case could be a good fit for async. The second is a better fit for something like [rayon](https://docs.rs/rayon/latest/rayon/) – PitaJ Jul 20 '22 at 20:00
  • Note that if `get_player` is doing any kind of blocking (non-`await`) operations that it will block an entire tokio worker thread, which could be part of the problem. This includes e.g. `std::thread::sleep()`. If it's doing blocking operations it should _not_ be declared `async`, but you can make it "safe" to use from `async` blocks by calling it from `tokio::task::spawn_blocking()`. – cdhowie Jul 20 '22 at 21:53
  • How are you able to move a single `client` object into an async block repeatedly like that? Is it wrapped in `Arc>` or something like that? Maybe you are seeing lock contention? – harmic Jul 21 '22 at 02:06
  • I'm a little confused. You say `async fn get_player(name: String, i: Instant)`, but then you call `client.get_player(...)`, which isn't possible with the function signature you gave before. Are you sure it's not `async fn get_player(&mut self, name: String, i: Instant)` instead? If it is, then there is no way of doing this asynchronously, because you can only ever have one mutable borrow. If it is `async fn get_player(&self, name: String, i: Instant)`, however, it should be no problem. – Finomnis Jul 21 '22 at 09:12
  • With other words: please show us the code of the `Client` in more detail, it's very confusing right now. – Finomnis Jul 21 '22 at 09:16

1 Answers1

6

I assume that your get_player function takes one second because it waits for a network interaction, and not because some computation takes that long. If it's compute-bound instead, asynchronism is the wrong approach and you want to go with parallelism instead.

Further, I assume that the function signature of get_player is actually async fn get_player(&self, name: String, i: Instant) -> Option<Player> instead, because otherwise none of your main code samples would make any sense. Although I'm confused why it would be &self and not &mut self.

With those assumptions, I tried to reproduce your minimal reproducible example:

use std::time::{Duration, Instant};

#[derive(Debug)]
struct Player {
    name: String,
}

struct Client {}

impl Client {
    async fn init(&self) {}

    async fn get_player(&self, name: String, _now: Instant) -> Option<Player> {
        // Dummy code that simulates a delay of 1 second
        tokio::time::sleep(Duration::from_millis(1000)).await;
        Some(Player { name })
    }
}

static client: Client = Client {};

#[tokio::main]
async fn main() {
    let begin = Instant::now();
    client.init().await;

    for i in 0..10 {
        let now = Instant::now();
        let player = client
            .get_player(format!("Player #{}", i), now)
            .await
            .expect("panic");
        println!(
            "[{} ms] Retreived player: {:?}",
            begin.elapsed().as_millis(),
            player.name
        );
    }
}
[1002 ms] Retreived player: "Player #0"
[2004 ms] Retreived player: "Player #1"
[3005 ms] Retreived player: "Player #2"
[4008 ms] Retreived player: "Player #3"
[5010 ms] Retreived player: "Player #4"
[6011 ms] Retreived player: "Player #5"
[7013 ms] Retreived player: "Player #6"
[8014 ms] Retreived player: "Player #7"
[9016 ms] Retreived player: "Player #8"
[10018 ms] Retreived player: "Player #9"

This is based on your last main example. As you can see, it takes 10 seconds to retrieve all players, because they all run in sequence.

Now let's run them all asynchronously. The problem here is joining them all simultaneously. Tokio sadly doesn't offer an easy way for that; you could tokio::spawn all of them, collect the JoinHandles and then join them one by one. The crate futures, however, offers exactly what you want:

use std::time::{Duration, Instant};

#[derive(Debug)]
struct Player {
    name: String,
}

struct Client {}

impl Client {
    async fn init(&self) {}

    async fn get_player(&self, name: String, _now: Instant) -> Option<Player> {
        // Dummy code her that simulates a delay of 1 second
        tokio::time::sleep(Duration::from_millis(1000)).await;
        Some(Player { name })
    }
}

static client: Client = Client {};

#[tokio::main]
async fn main() {
    let begin = Instant::now();
    client.init().await;

    let get_player_futures = (0..10).into_iter().map(|i| async move {
        let now = Instant::now();
        let player = client
            .get_player(format!("Player #{}", i), now)
            .await
            .expect("panic");
        println!(
            "[{} ms] Retreived player: {:?}",
            begin.elapsed().as_millis(),
            player.name
        );
    });

    futures::future::join_all(get_player_futures).await;
}
[1002 ms] Retreived player: "Player #0"
[1002 ms] Retreived player: "Player #1"
[1002 ms] Retreived player: "Player #2"
[1002 ms] Retreived player: "Player #3"
[1002 ms] Retreived player: "Player #4"
[1002 ms] Retreived player: "Player #5"
[1002 ms] Retreived player: "Player #6"
[1002 ms] Retreived player: "Player #7"
[1003 ms] Retreived player: "Player #8"
[1003 ms] Retreived player: "Player #9"

As you can see, the entire program only took one second, and all of them got retrieved simultaneously.

get_player_futures here is an iterator over all the futures that need to be awaited for in order to retrieve the players. futures::future::join_all then awaits all of them simultaneously. You can even use join_all's return value to retrieve the values of the futures, but we don't use that here.

I hope that helped somehow; it was hard to create an answer as parts of your question were incoherent.

Finomnis
  • 18,094
  • 1
  • 20
  • 27
  • Thank you that seems to have work! Is there a way to limit the number of future happen at once? since the operation is quite cpu heavily my vps is not able to keep up. @Finommis – SomeOnionGamer Jul 21 '22 at 14:04
  • 1
    If it is CPU heavy you probably want to use threads instead. Async is primarily for IO limited problems. – Finomnis Jul 22 '22 at 06:36