36

Editor's note — this example was created before Rust 1.0 and the specific types have changed or been removed since then. The general question and concept remains valid.

I have spawned a thread with an infinite loop and timer inside.

thread::spawn(|| {
    let mut timer = Timer::new().unwrap();
    let periodic = timer.periodic(Duration::milliseconds(200));
    loop {
        periodic.recv();

        // Do my work here
    }
});

After a time based on some conditions, I need to terminate this thread from another part of my program. In other words, I want to exit from the infinite loop. How can I do this correctly? Additionally, how could I to suspend this thread and resume it later?

I tried to use a global unsafe flag to break the loop, but I think this solution does not look nice.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
fmvin
  • 591
  • 2
  • 5
  • 9

3 Answers3

41

For both terminating and suspending a thread you can use channels.

Terminated externally

On each iteration of a worker loop, we check if someone notified us through a channel. If yes or if the other end of the channel has gone out of scope we break the loop.

use std::io::{self, BufRead};
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;

fn main() {
    println!("Press enter to terminate the child thread");
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || loop {
        println!("Working...");
        thread::sleep(Duration::from_millis(500));
        match rx.try_recv() {
            Ok(_) | Err(TryRecvError::Disconnected) => {
                println!("Terminating.");
                break;
            }
            Err(TryRecvError::Empty) => {}
        }
    });

    let mut line = String::new();
    let stdin = io::stdin();
    let _ = stdin.lock().read_line(&mut line);

    let _ = tx.send(());
}

Suspending and resuming

We use recv() which suspends the thread until something arrives on the channel. In order to resume the thread, you need to send something through the channel; the unit value () in this case. If the transmitting end of the channel is dropped, recv() will return Err(()) - we use this to exit the loop.

use std::io::{self, BufRead};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    println!("Press enter to wake up the child thread");
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || loop {
        println!("Suspending...");
        match rx.recv() {
            Ok(_) => {
                println!("Working...");
                thread::sleep(Duration::from_millis(500));
            }
            Err(_) => {
                println!("Terminating.");
                break;
            }
        }
    });

    let mut line = String::new();
    let stdin = io::stdin();
    for _ in 0..4 {
        let _ = stdin.lock().read_line(&mut line);
        let _ = tx.send(());
    }
}

Other tools

Channels are the easiest and the most natural (IMO) way to do these tasks, but not the most efficient one. There are other concurrency primitives which you can find in the std::sync module. They belong to a lower level than channels but can be more efficient in particular tasks.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Vladimir Matveev
  • 120,085
  • 34
  • 287
  • 296
  • 9
    Don't see how that should work when your thread is running a blocking operation. – hasufell Mar 31 '20 at 21:17
  • 2
    @hasufell it won't, but then the answer does not state so anyway. It's not really possible to stop a thread in a safe and portable way without some cooperation from its side. Channels are one way of such cooperation. – Vladimir Matveev Apr 01 '20 at 07:14
  • What about in an unsafe and non-portable way? – cloudsurfin Aug 16 '23 at 00:05
  • 1
    @cloudsurfin the way to do it depends on the underlying threading API which is used for your system. For example, on systems where pthreads is used, this can be done via `pthread_cancel` and/or `pthread_kill` functions. I believe WinAPI provides similar functionality too. But killing a thread which shares memory with other threads is dangerous, because it may leave a lot of stuff hanging without cleanup. Also, not every system may even have a facility to kill a thread hanging in a blocking operation, similarly to `kill -9` not working in Linux if a process is hanging inside some kernel code. – Vladimir Matveev Aug 18 '23 at 22:23
8

The ideal solution would be a Condvar. You can use wait_timeout in the std::sync module, as pointed out by @Vladimir Matveev.

This is the example from the documentation:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;

let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();

thread::spawn(move|| {
    let &(ref lock, ref cvar) = &*pair2;
    let mut started = lock.lock().unwrap();
    *started = true;
    // We notify the condvar that the value has changed.
    cvar.notify_one();
});

// wait for the thread to start up
let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
// as long as the value inside the `Mutex` is false, we wait
loop {
    let result = cvar.wait_timeout(started, Duration::from_millis(10)).unwrap();
    // 10 milliseconds have passed, or maybe the value changed!
    started = result.0;
    if *started == true {
        // We received the notification and the value has been updated, we can leave.
        break
    }
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Rajeev Ranjan
  • 3,588
  • 6
  • 28
  • 52
  • Just to share with others, this solution suggested by @RajeevRanjan solved a big problem in a Rocket-based project I'm maintaining. The Rocket framework doesn't contain a shutdown feature, so I started a new thread to handle the server and wait for it using `Condvar`. I tried to use `mpsc`, but using `Condvar` was simpler. The code is available [here](https://github.com/risoflora/fiscalidade_server/blob/a7d7bbe5e899121f87f7ec3caeb3cc319a5dc896/src/service_windows.rs#L76). – silvioprog Feb 29 '20 at 21:53
  • 1
    @silvioprog I didn't provide this suggestion; I only edited it. RajeevRanjan is the solution author. – Shepmaster Feb 29 '20 at 21:55
3

Having been back to this question several times myself, here's what I think addresses OP's intent and others' best practice of getting the thread to stop itself. Building on the accepted answer, Crossbeam is a nice upgrade to mpsc in allowing message endpoints to be cloned and moved. It also has a convenient tick function. The real point here is it has try_recv() which is non-blocking.

I'm not sure how universally useful it'd be to put a message checker in the middle of an operational loop like this. I haven't found that Actix (or previously Akka) could really stop a thread without--as stated above--getting the thread to do it itself. So this is what I'm using for now (wide open to correction here, still learning myself).

// Cargo.toml:
// [dependencies]
// crossbeam-channel = "0.4.4"

use crossbeam_channel::{Sender, Receiver, unbounded, tick};
use std::time::{Duration, Instant};

fn main() {
    let (tx, rx):(Sender<String>, Receiver<String>) = unbounded();
    let rx2 = rx.clone();

    // crossbeam allows clone and move of receiver
    std::thread::spawn(move || {
        // OP:
        // let mut timer = Timer::new().unwrap();
        // let periodic = timer.periodic(Duration::milliseconds(200));

        let ticker: Receiver<Instant> = tick(std::time::Duration::from_millis(500));

        loop {
            // OP:
            // periodic.recv();
            crossbeam_channel::select! {
                recv(ticker) -> _ => {

                    // OP: Do my work here
                    println!("Hello, work.");

                    // Comms Check: keep doing work?
                    // try_recv is non-blocking
                    // rx, the single consumer is clone-able in crossbeam
                    let try_result = rx2.try_recv();
                    match try_result {
                        Err(_e) => {},
                        Ok(msg) => {
                            match msg.as_str() {
                                "END_THE_WORLD" => {
                                    println!("Ending the world.");
                                    break;
                                },
                                _ => {},
                            }
                        },
                        _ => {}
                    }
                }
            }
        }
    });

    // let work continue for 10 seconds then tell that thread to end.
    std::thread::sleep(std::time::Duration::from_secs(10));
    println!("Goodbye, world.");
    tx.send("END_THE_WORLD".to_string());
}

Using strings as a message device is a tad cringeworthy--to me. Could do the other suspend and restart stuff there in an enum.

cloudsurfin
  • 2,467
  • 2
  • 25
  • 29