1

Given several threads that complete with an Output value, how do I get the first Output that's produced? Ideally while still being able to get the remaining Outputs later in the order they're produced, and bearing in mind that some threads may or may not terminate.

Example:

struct Output(i32);

fn main() {
    let mut spawned_threads = Vec::new();

    for i in 0..10 {
        let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
            // pretend to do some work that takes some amount of time
            ::std::thread::sleep(::std::time::Duration::from_millis(
                (1000 - (100 * i)) as u64,
            ));
            Output(i) // then pretend to return the `Output` of that work
        });
        spawned_threads.push(join_handle);
    }

    // I can do this to wait for each thread to finish and collect all `Output`s
    let outputs_in_order_of_thread_spawning = spawned_threads
        .into_iter()
        .map(::std::thread::JoinHandle::join)
        .collect::<Vec<::std::thread::Result<Output>>>();

    // but how would I get the `Output`s in order of completed threads?
}

I could solve the problem myself using a shared queue/channels/similar, but are there built-in APIs or existing libraries which could solve this use case for me more elegantly?

I'm looking for an API like:

fn race_threads<A: Send>(
    threads: Vec<::std::thread::JoinHandle<A>>
) -> (::std::thread::Result<A>, Vec<::std::thread::JoinHandle<A>>) {
    unimplemented!("so far this doesn't seem to exist")
}

(Rayon's join is the closest I could find, but a) it only races 2 closures rather than an arbitrary number of closures, and b) the thread pool w/ work stealing approach doesn't make sense for my use case of having some closures that might run forever.)

It is possible to solve this use case using pointers from How to check if a thread has finished in Rust? just like it's possible to solve this use case using an MPSC channel, however here I'm after a clean API to race n threads (or failing that, n closures on n threads).

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Caspar
  • 7,039
  • 4
  • 29
  • 41
  • 1
    you could just pass mpsc senders to reach thread and consume only one result on the receiver side. – the8472 Apr 15 '18 at 20:09
  • I believe your question is already answered by [How to check if a thread has finished in Rust?](https://stackoverflow.com/q/35883390/155423). If you disagree, please [edit] your question to explain how this question is not answered by those answers. – Shepmaster Apr 15 '18 at 20:26
  • @the8472 yes, that is what I meant by "using a shared queue/channels/similar" - I'm after a neater approach than that. – Caspar Apr 17 '18 at 04:07
  • @Shepmaster I have edited the question to clarify that I'm after an existing solution for collecting results; I think with your linked Q I would still need to roll "collecting the results in a queue" myself (thanks for the pointer though!). – Caspar Apr 17 '18 at 04:09
  • A side note on idiomatic Rust: you really should import some types. Having so many fully-qualified things is just ugly. – Shepmaster Apr 17 '18 at 12:47
  • @Shepmaster yep, am aware. This was meant to be a cut down example with some types fully qualified for clarity & ease of copy-pasting, but at some point I went a bit overboard :) – Caspar Apr 18 '18 at 04:15

2 Answers2

1

These problems can be solved by using a condition variable:

use std::sync::{Arc, Condvar, Mutex};

#[derive(Debug)]
struct Output(i32);

enum State {
    Starting,
    Joinable,
    Joined,
}

fn main() {
    let pair = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
    let mut spawned_threads = Vec::new();

    let &(ref lock, ref cvar) = &*pair;
    for i in 0..10 {
        let my_pair = pair.clone();
        let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
            // pretend to do some work that takes some amount of time
            ::std::thread::sleep(::std::time::Duration::from_millis(
                (1000 - (100 * i)) as u64,
            ));

            let &(ref lock, ref cvar) = &*my_pair;
            let mut joinable = lock.lock().unwrap();
            joinable[i] = State::Joinable;
            cvar.notify_one();
            Output(i as i32) // then pretend to return the `Output` of that work
        });
        lock.lock().unwrap().push(State::Starting);
        spawned_threads.push(Some(join_handle));
    }

    let mut should_stop = false;
    while !should_stop {
        let locked = lock.lock().unwrap();
        let mut locked = cvar.wait(locked).unwrap();

        should_stop = true;
        for (i, state) in locked.iter_mut().enumerate() {
            match *state {
                State::Starting => {
                    should_stop = false;
                }
                State::Joinable => {
                    *state = State::Joined;
                    println!("{:?}", spawned_threads[i].take().unwrap().join());
                }
                State::Joined => (),
            }
        }
    }
}

(playground link)

I'm not claiming this is the simplest way to do it. The condition variable will awake the main thread every time a child thread is done. The list can show the state of each thread, if one is (about to) finish, it can be joined.

mcarton
  • 27,633
  • 5
  • 85
  • 95
  • This does not, in any way, guarantee that the outputs will be shown in order though. There is a race condition between setting the variable and reading it: multiple threads could be "ready" when the main thread acts, and it will never know which was first. – Matthieu M. Apr 16 '18 at 15:27
  • The concept of "first" thread finishing doesn't really make sense anyway. This will join the threads roughly in the order they are finished, with threads that finish around the same time treated in creation order, which is sufficient in most cases. This is for example what the `WaitAny` method guarantees in C#: "If more than one object becomes signaled during the call, the return value is the array index of the signaled object with the smallest index value of all the signaled objects." – mcarton Apr 16 '18 at 16:04
  • I *fully* agree. However, I think this (inherent) limitation should be mentioned in the answer itself. For example, a MPSC queue to communicate the index of the thread (posted from the thread) would be more precise... and move the race to whoever locks the queue first. – Matthieu M. Apr 16 '18 at 16:16
  • Is there any advantage to using a `Condvar` over using an `mpsc::channel`? I appreciate the pointer in that direction (especially since Condvar's API is a bit tricky) but it seems like it has more moving parts than a channel-based solution, and I haven't spotted any upside yet. – Caspar Apr 17 '18 at 04:19
0

No, there is no such API.

You've already been presented with multiple options to solve your problem:

Sometimes when programming, you have to go beyond sticking pre-made blocks together. This is supposed to be a fun part of programming. I encourage you to embrace it. Go create your ideal API using the components available and publish it to crates.io.


I really don't see what's so terrible about the channels version:

use std::{sync::mpsc, thread, time::Duration};

#[derive(Debug)]
struct Output(i32);

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 0..10 {
        let tx = tx.clone();
        thread::spawn(move || {
            thread::sleep(Duration::from_millis((1000 - (100 * i)) as u64));
            tx.send(Output(i)).unwrap();
        });
    }
    // Don't hold on to the sender ourselves
    // Otherwise the loop would never terminate
    drop(tx);

    for r in rx {
        println!("{:?}", r);
    }
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • re not just sticking blocks together - with respect, it's not necessary to talk down to me :) I was asking with the idea in mind that, if it didn't already exist, I could write that crate, or if someone else writes it later then I'd like to make it easy to find via this question. But if it did already exist, then I would much rather not reinvent the wheel. – Caspar Apr 18 '18 at 03:55
  • To answer your implied question, nothing terrible. But to give an idea of why I'd rather reuse an existing solution than roll my own, your snippet doesn't handle panics in spawned threads, so maybe `Output` needs to be `Result`, but what if `panic=abort` so that catching panics doesn't work, maybe set a panic hook instead, except then I need to preserve any existing panic hook.. maybe instead use a shared `Arc`'s decreased reference count to detect the panic, hmm - etc. All solvable, but it's easier to reuse a good solution. After all, I wouldn't reinvent `lazy_static` either! – Caspar Apr 18 '18 at 04:11