5

I would like to use Rust to create a simple scheduler in order to run multiple concurrent functions at a defined time but do not start more if they haven't finished.

For example, if the defined interval is one second, the scheduler should run the functions and don't start more if the previous functions have not returned. The goal is to prevent running the same function multiple times.

I created a working example with Go like this:

package main

import (
    "fmt"
    "sync"
    "time"
)

func myFunc(wg *sync.WaitGroup) {
    fmt.Printf("now: %+s\n", time.Now())
    time.Sleep(3 * time.Second)
    wg.Done()
}

func main() {
    quit := make(chan bool)

    t := time.NewTicker(time.Second)
    go func() {
        for {
            select {
            case <-t.C:
                var wg sync.WaitGroup
                for i := 0; i <= 4; i++ {
                    wg.Add(1)
                    go myFunc(&wg)
                }
                wg.Wait()
                fmt.Printf("--- done ---\n\n")
            case <-quit:
                return
            }
        }
    }()

    <-time.After(time.Minute)
    close(quit)
}

Since I didn't find something like Go's NewTicker within the Rust standard library, I used Tokio and came up with this

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("I am i: {}", i);
                    thread::sleep(time::Duration::from_secs(3));
                    Ok(())
                }));
            }
            Ok(())
        })
        .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

The problem I have with this approach is that the tasks don't wait for the previous functions to be called therefore the functions start again no matter if previously they were running, I am missing here something like Go's sync.WaitGroup. What could be used to achieve the same results as in the working example?

Is it possible to achieve this by only using the standard library? This is mainly for learning purposes, probably there is a pretty straightforward way of doing it and I could avoid extra complexity.

In the end, I would like to periodically monitor some sites via HTTP (get just the returned status code) but don't query all of them again until I have all the responses.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
nbari
  • 25,603
  • 10
  • 76
  • 131
  • Do *not* use `thread::sleep` in a future: [Why does Future::select choose the future with a longer sleep period first?](https://stackoverflow.com/q/48735952/155423) – Shepmaster May 22 '19 at 14:48
  • If you are running every second and a function takes 10 seconds, should there be a break of 1 second before the function runs again, or will it run again immediately? – Shepmaster May 22 '19 at 14:59
  • @Shepmaster thanks for all your comments but due to my lack of understanding and experience with RUST that's why I came with what you describe as an inconsistent question if it helps at the end I just would like to know what is the RUST way of doing something like in the working example https://play.golang.org/p/ZLw6ESioqfu, my point of asking about how to do it only with std lib only is because of the simplicity of this with other programming languages and could be considered probably a "best practice" but totally fine to use a crate,I'm just trying to learn how to do things right :-) – nbari May 22 '19 at 15:41
  • *what is the RUST way* — why do you believe that there is **one** right way? You can probably do it with threads and you can probably do it with futures (and probably other ways). Neither of these is "the right way"; that's why this question feels overly broad. If you can narrow it to something like "how can I do using " then it seems more on-topic for Stack Overflow. (And it's just "Rust", not all caps, not all lowercase). – Shepmaster May 22 '19 at 16:05
  • @Shepmaster got it, thanks for the heads up, I would appreciate a constructive answer that could help me to achieve what described in the question, example. – nbari May 22 '19 at 16:52

1 Answers1

6

Since you want concurrency and will only use the standard library, then you basically must use threads.

Here, we start a thread for every function for every iteration of the scheduler loop, allowing them to run in parallel. We then wait for all functions to finish, preventing ever running the same function twice concurrently.

use std::{
    thread,
    time::{Duration, Instant},
};

fn main() {
    let scheduler = thread::spawn(|| {
        let wait_time = Duration::from_millis(500);

        // Make this an infinite loop
        // Or some control path to exit the loop
        for _ in 0..5 {
            let start = Instant::now();
            eprintln!("Scheduler starting at {:?}", start);

            let thread_a = thread::spawn(a);
            let thread_b = thread::spawn(b);

            thread_a.join().expect("Thread A panicked");
            thread_b.join().expect("Thread B panicked");

            let runtime = start.elapsed();

            if let Some(remaining) = wait_time.checked_sub(runtime) {
                eprintln!(
                    "schedule slice has time left over; sleeping for {:?}",
                    remaining
                );
                thread::sleep(remaining);
            }
        }
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

You could also make use of a Barrier to start each function in a thread once and then synchronize all of them at the end of execution:

use std::{
    sync::{Arc, Barrier},
    thread,
    time::Duration,
};

fn main() {
    let scheduler = thread::spawn(|| {
        let barrier = Arc::new(Barrier::new(2));

        fn with_barrier(barrier: Arc<Barrier>, f: impl Fn()) -> impl Fn() {
            move || {
                // Make this an infinite loop
                // Or some control path to exit the loop
                for _ in 0..5 {
                    f();
                    barrier.wait();
                }
            }
        }

        let thread_a = thread::spawn(with_barrier(barrier.clone(), a));
        let thread_b = thread::spawn(with_barrier(barrier.clone(), b));

        thread_a.join().expect("Thread A panicked");
        thread_b.join().expect("Thread B panicked");
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

I wouldn't use either of these solutions, personally. I'd find a crate where someone else has written and tested the code I need.

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • pretty much appreciated, I will post another question regarding the usage of Tokio following your advice – nbari May 23 '19 at 09:59