3

My goal is to run N functions concurrently but don't want to spawn more until all of them have finished. This is what I have so far:

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

Every second I spawning 5 functions, but I now would like to wait until all of the functions have finished before spawning more.

From my understanding (I am probably getting the idea wrong), I am returning a Future within another future

task (Interval ----------------------+ (outer future)
    for i in 0..5 {                  |
        tokio::spawn(  ----+         | 
            // my function | (inner) |
            Ok(())         |         |
        )              ----+         |
    }                                |
    Ok(()) --------------------------+

I am stuck trying to wait for the inner future to finish.

nbari
  • 25,603
  • 10
  • 76
  • 131

2 Answers2

6

You can achieve this by joining your worker futures such that they all run in parallel, but must all finish together. You can then join that with a delay of 1 second for the same rationale. Wrap that into a loop to run it forever (or 5 iterations, for the demo).

Tokio 1.3

use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0

#[tokio::main]
async fn main() {
    let now = Instant::now();
    let forever = stream::unfold((), |()| async {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;
        
        Some(((), ()))
    });

    forever.take(5).for_each(|_| async {}).await;
    eprintln!("Took {:?}", now.elapsed());
}

fn all_pages() -> Vec<BoxFuture<'static, ()>> {
    vec![page("a", 100).boxed(), page("b", 200).boxed()]
}

async fn page(name: &'static str, time_ms: u64) {
    eprintln!("page {} starting", name);
    time::sleep(Duration::from_millis(time_ms)).await;
    eprintln!("page {} done", name);
}
Loop starting at Instant { t: 1022680437923626 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022681444390534 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022682453240399 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022683469924126 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022684493522592 }
page a starting
page b starting
page a done
page b done
Took 5.057315596s

Tokio 0.1

use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay};  // 0.1.18

fn main() {
    let repeat_count = Some(5);

    let forever = future::loop_fn(repeat_count, |repeat_count| {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        let wait = Future::join(batch_of_pages, ez_delay_ms(1000));

        // Run all this again
        wait.map(move |_| {
            if let Some(0) = repeat_count {
                Loop::Break(())
            } else {
                Loop::Continue(repeat_count.map(|c| c - 1))
            }
        })
    });

    tokio::run(forever.map_err(drop));
}

fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
    vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}

fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    future::ok(())
        .inspect(move |_| eprintln!("page {} starting", name))
        .and_then(move |_| ez_delay_ms(time_ms))
        .inspect(move |_| eprintln!("page {} done", name))
}

fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Loop starting at Instant { tv_sec: 4031391, tv_nsec: 806352322 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031392, tv_nsec: 807792559 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031393, tv_nsec: 809117958 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031394, tv_nsec: 813142458 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031395, tv_nsec: 814407116 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031396, tv_nsec: 815342642 }
page a starting
page b starting
page a done
page b done

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Thanks but I still trying to get my head around the use of `tokio::spawn()` your answer helped me to get the idea about how to "delay" instead of using `thread::sleep` . But in the case I would like to call the system command `sleep N` , is there a way to "group & wait" for all the returns of multiple `tokio::spawn()` and once finished just proceed with another batch or wait until the defined interval time passed and then call again all the functions? – nbari May 23 '19 at 17:19
  • For example how to mock with something like `Command::new("sleep").arg("3").output().expect("failed to execute process");` – nbari May 23 '19 at 17:44
  • @nbari such a question doesn't make sense because the [returned value from `tokio::spawn` "doesn't actually provide any functionality"](https://docs.rs/tokio/0.1.20/tokio/executor/struct.Spawn.html). – Shepmaster May 23 '19 at 18:01
  • @nbari I've added links to existing Q&A about how to run generic blocking code (tl;dr: use a threadpool) as well as `Command` (tl;dr: use tokio-process). – Shepmaster May 23 '19 at 18:04
  • Thanks, however still not very clear how I could use Tokio to run something forever at a defined interval (scheduler) and concurrently spawn X blocking functions and wait for them to finish before calling them again, in the end, I just want to do something like this https://play.golang.org/p/ZLw6ESioqfu in Rust, I know is unfair to compare languages but It help to clarify exactly what I want to achieve. – nbari May 23 '19 at 18:54
  • @nbari Replace the calls to `page` with a call to tokio-threadpool, as described in the linked Q&A. – Shepmaster May 23 '19 at 19:01
1

From my understanding (I am probably getting the idea wrong), I am returning a Future within another future

You are not wrong, but in the code that you provided the only returned future is Ok(()) which implements IntoFuture. tokio::spawn just spawns the new task into the DefaultExecutor of Tokio.

If I understand from your question, you want to spawn the next batch when the previous one is done, but if the previous is done before 1 second you want to finish that 1 second before spawning the next batch.

Implementing your own future and handling the poll by yourself would be a better solution but this can be done roughly:

  • By using join_all to collect batch tasks. This is a new future which waits for the collected futures to complete.
  • For the 1 second wait you can use atomic state. If it is locked for the tick, it waits until the state released.

Here is the code (Playground):

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use std::time::{self, Duration, Instant};

use tokio::prelude::*;
use tokio::timer::{Delay, Interval};

use futures::future::join_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

fn main() {
    let locker = Arc::new(AtomicBool::new(false));

    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .map_err(|e| panic!("interval errored; err={:?}", e))
        .for_each(move |interval| {
            let is_locked = locker.load(Ordering::SeqCst);
            println!("Interval: {:?} --- {:?}", interval, is_locked);

            if !is_locked {
                locker.store(true, Ordering::SeqCst);
                println!("locked");

                let futures: Vec<_> = (0..5)
                    .map(|i| {
                        lazy(move || {
                            println!("Running Task-{}", i);
                            // mock delay
                            Delay::new(Instant::now() + Duration::from_millis(100 - i))
                                .then(|_| Ok(()))
                        })
                        .and_then(move |_| {
                            println!("Task-{} is done", i);
                            Ok(())
                        })
                    })
                    .collect();

                let unlocker = locker.clone();
                tokio::spawn(join_all(futures).and_then(move |_| {
                    unlocker.store(false, Ordering::SeqCst);
                    println!("unlocked");

                    Ok(())
                }));
            }

            Ok(())
        });

    tokio::run(task.then(|_| Ok(())));
}

Output :

Interval: Instant { tv_sec: 4036783, tv_nsec: 211837425 } --- false
locked
Running Task-0
Running Task-1
Running Task-2
Running Task-3
Running Task-4
Task-4 is done
Task-3 is done
Task-2 is done
Task-1 is done
Task-0 is done
unlocked
Interval: Instant { tv_sec: 4036784, tv_nsec: 211837425 } --- false
locked
Running Task-0
Running Task-1
Running Task-2
Running Task-3
Running Task-4
Task-3 is done
Task-4 is done
Task-0 is done
Task-1 is done
Task-2 is done
unlocked

Warning ! : Please check Shepmaster's comment

Even for demonstration, you should not use thread:sleep in futures. There are better alternatives

Ömer Erden
  • 7,680
  • 5
  • 36
  • 45
  • Even for demonstration, you should [not use `thread:sleep` in futures](https://stackoverflow.com/q/48735952/155423). There are better alternatives – Shepmaster May 23 '19 at 13:32
  • @Shepmaster it was in the question, i copied it directly. And it was marked as mock delay so i haven't mentioned it in the answer. – Ömer Erden May 23 '19 at 13:33
  • 1
    The original question also had other problems that you *didn't* copy directly (rather the *point* of Stack Overflow). Realize that most people are going to copy-paste your code with little to no thought applied to the answer. It's better to encourage good practices everywhere. – Shepmaster May 23 '19 at 13:35
  • @ÖmerErden thanks, but how to spawn concurrently all the functions, from your example they run one after the other, so if each function is delayed 5 seconds, the next batch will be run `N functions * 5 seconds` instead of only 5 seconds the whole. – nbari May 23 '19 at 14:20
  • @nbari Yes it wasn't executing in parallel because of the usage of `thread::sleep`, if you read the [post in comments](https://stackoverflow.com/a/48736148/1601571) you can see the reason. I updated the code with proper mock delaying and delayed for Duration::from_millis(100 - i) to see the tasks completed in arbitrary order. – Ömer Erden May 23 '19 at 16:26
  • If I mock the delay with something like `Command::new("sleep").arg("3").output().expect("failed to execute process");` still not concurrent therefore wondering how to call "anything" concurrent ? – nbari May 23 '19 at 17:43
  • @nbari maybe i am missing something but looks like tokio is running our futures in same thread, i guess thread sleep prevents running other futures, please check this [code](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=8b41ef4090a572ed80e7a96308658243). – Ömer Erden May 23 '19 at 18:45
  • @ÖmerErden Could you give me a hint about how to use `tokio_threadpool` within your last example, probably that could do the trick, thanks in advance – nbari May 25 '19 at 05:54
  • @nbari with sleep or with tokio-timer ? – Ömer Erden May 25 '19 at 15:06
  • @ÖmerErden based on Shepmaster answer there seems to be a way of doing it with `tokio_threadpool` keeping the `Interval` to be running every X seconds but spawning processes without blocking with the `threadpool` – nbari May 25 '19 at 18:12