17

In Node.js I can set the interval that a certain event should be triggered,

function intervalFunc() {
  console.log('whelp, triggered again!');
}

setInterval(intervalFunc, 1500);

However the interface for Tokio's interval is a bit more complex. It seems to be a something to do with a much more literal definition of an interval, and rather than calling a function at an interval, it simply stalls the thread until the time passes (with .await).

Is there a primitive in Tokio that calls a function "every x seconds" or the like? If not, is there an idiom that has emerged to do this?

I only need to run one function on a recurring interval... I don't care about other threads either. It's just one function on Tokio's event loop.

kmdreko
  • 42,554
  • 6
  • 57
  • 106
Evan Carroll
  • 78,363
  • 46
  • 261
  • 468

2 Answers2

29

Spawn a Tokio task to do something forever:

use std::time::Duration;
use tokio::{task, time}; // 1.3.0

#[tokio::main]
async fn main() {
    let forever = task::spawn(async {
        let mut interval = time::interval(Duration::from_millis(10));

        loop {
            interval.tick().await;
            do_something().await;
        }
    });

    forever.await;
}

You can also use tokio::time::interval to create a value that you can tick repeatedly. Perform the tick and call your function inside of the body of stream::unfold to create a stream:

use futures::{stream, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0

#[tokio::main]
async fn main() {
    let interval = time::interval(Duration::from_millis(10));

    let forever = stream::unfold(interval, |mut interval| async {
        interval.tick().await;
        do_something().await;
        Some(((), interval))
    });

    let now = Instant::now();
    forever.for_each(|_| async {}).await;
}

async fn do_something() {
    eprintln!("do_something");
}

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • What does this even do: `for_each(|_| async {}).await;` – Evan Carroll Mar 30 '21 at 01:16
  • @EvanCarroll https://docs.rs/futures/0.3.13/futures/stream/trait.StreamExt.html#method.for_each – Shepmaster Mar 30 '21 at 01:19
  • man that's confusing.. you can't do this with `.repeat`? – Evan Carroll Mar 30 '21 at 01:42
  • [`repeat`](https://docs.rs/futures/0.3.13/futures/stream/fn.repeat.html) says: *Create a stream which produces the same item repeatedly*, which doesn't do what you want. – Shepmaster Mar 30 '21 at 01:52
  • Well, it would seem like I could create a stream of function pointers which sleeps for the interval and runs them, or create a stream a of functions pointers and stream of intervals and combine them and await or something. – Evan Carroll Mar 30 '21 at 01:59
  • Your second example does not run for ever. Can you modify this example to do that? – Evan Carroll Mar 30 '21 at 07:45
  • 1
    @EvanCarroll `forever.await` instead of the `timeout`. – Shepmaster Mar 30 '21 at 12:13
  • 1
    I think we could demonstrate these two things substantially clearer if we reduced both to looping infinitely rather than 10 times and such, and dropped the elapsed calculation which isn't really relative to what you're trying to show. Just some points (i upvoted and marked this as chosen though). I also asked a follow up, https://stackoverflow.com/q/66898839/124486 – Evan Carroll Apr 01 '21 at 05:32
1

I am still a rust/tokio beginner, but I did find this solution helpful for myself:

use std::time::Duration;
use tokio::time;
use tokio_stream::wrappers::IntervalStream;

#[tokio::main]
async fn main() {
    let mut stream = IntervalStream::new(time::interval(Duration::from_secs(1)));

    while let Some(_ts) = stream.next().await {
        println!("whelp, triggered again!");
    }
}

Please note that _ts holds the execution timestamp.

  • albeit this is a less general solution, it is more suitable for most cases of time-repeated streams in our codebase. – sukhmel May 01 '23 at 16:09