You can achieve this by joining your worker futures such that they all run in parallel, but must all finish together. You can then join that with a delay of 1 second for the same rationale. Wrap that into a loop to run it forever (or 5 iterations, for the demo).
Tokio 1.3
use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0
#[tokio::main]
async fn main() {
let now = Instant::now();
let forever = stream::unfold((), |()| async {
eprintln!("Loop starting at {:?}", Instant::now());
// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());
// Resolves when both all pages and a delay of 1 second is done
future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;
Some(((), ()))
});
forever.take(5).for_each(|_| async {}).await;
eprintln!("Took {:?}", now.elapsed());
}
fn all_pages() -> Vec<BoxFuture<'static, ()>> {
vec![page("a", 100).boxed(), page("b", 200).boxed()]
}
async fn page(name: &'static str, time_ms: u64) {
eprintln!("page {} starting", name);
time::sleep(Duration::from_millis(time_ms)).await;
eprintln!("page {} done", name);
}
Loop starting at Instant { t: 1022680437923626 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022681444390534 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022682453240399 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022683469924126 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022684493522592 }
page a starting
page b starting
page a done
page b done
Took 5.057315596s
Tokio 0.1
use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay}; // 0.1.18
fn main() {
let repeat_count = Some(5);
let forever = future::loop_fn(repeat_count, |repeat_count| {
eprintln!("Loop starting at {:?}", Instant::now());
// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());
// Resolves when both all pages and a delay of 1 second is done
let wait = Future::join(batch_of_pages, ez_delay_ms(1000));
// Run all this again
wait.map(move |_| {
if let Some(0) = repeat_count {
Loop::Break(())
} else {
Loop::Continue(repeat_count.map(|c| c - 1))
}
})
});
tokio::run(forever.map_err(drop));
}
fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}
fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
future::ok(())
.inspect(move |_| eprintln!("page {} starting", name))
.and_then(move |_| ez_delay_ms(time_ms))
.inspect(move |_| eprintln!("page {} done", name))
}
fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Loop starting at Instant { tv_sec: 4031391, tv_nsec: 806352322 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031392, tv_nsec: 807792559 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031393, tv_nsec: 809117958 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031394, tv_nsec: 813142458 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031395, tv_nsec: 814407116 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031396, tv_nsec: 815342642 }
page a starting
page b starting
page a done
page b done
See also: