7

I want to build a program that collects weather updates and represents them as a stream. I want to call get_weather() in an infinite loop, with 60 seconds delay between finish and start.

A simplified version would look like this:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}

Is there any way to do this easily?

Using tokio::timer::Interval will not work when get_weather() takes more than 60 seconds:

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}

If that happens, the next function will start immediately. I want to keep exactly 60 seconds between the previous get_weather() start and the next get_weather() start.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
peku33
  • 3,628
  • 3
  • 26
  • 44

1 Answers1

8

Use stream::unfold to go from the "world of futures" to the "world of streams". We don't need any extra state, so we use the empty tuple:

use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| async {
        time::delay_for(BETWEEN).await;
        let weather = get_weather().await;
        Some((weather, ()))
    })
}

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| async {
            println!("Got the weather");
        })
        .await;
}
% time ./target/debug/example

Got the weather
Got the weather
Got the weather

real    3.085   3085495us
user    0.004   3928us
sys     0.003   3151us

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Is there any way to allow passing a variable to `get_weather()` by reference? I want to pass it to `get_weather_stream()` and then to `get_weather()` in each iteration. Of course the resulting Stream should be lifetime dependent on that variable. – peku33 Nov 05 '19 at 19:41
  • @peku33 I don't see immediately why it wouldn't work. Perhaps you should try it and report back! – Shepmaster Nov 05 '19 at 19:47
  • In the first step I added `<'a>(arg: &'a i32)` to both functions. I also added `+ 'a` to `get_weather_stream` return. However I get `closure may outlive current function`. I would like not to use Arc, Rc, etc if possible. – peku33 Nov 06 '19 at 18:49
  • @peku33 [seems to work](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=c1bc55fa074cd53da526172c72afc733) – Shepmaster Nov 07 '19 at 02:33