1

This code runs to completion and prints out the values of my_data if I uncomment the sleep line in the do_work function. If I leave it commented out, my executable hangs every time.

Why does a Condvar not wake the last thread? mentions collecting the handles and waiting for them to join in the main thread, but that should be taken care of by the rayon scope, correct?

How can I have this code complete without the sleep statement in do_work()?

use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, Barrier, Condvar, Mutex,
    },
    thread,
    time::Duration,
};

fn do_work(
    mtx: Arc<Mutex<bool>>,
    cond_var: Arc<Condvar>,
    barrier: Arc<Barrier>,
    quitting: &AtomicBool,
    range: &mut [f32],
) {
    while !quitting.load(Ordering::SeqCst) {
        barrier.wait();
        //thread::sleep(Duration::from_micros(1));
        let mut started = mtx.lock().unwrap();
        while !*started && !quitting.load(Ordering::SeqCst) {
            started = cond_var.wait(started).unwrap();
        }
        if quitting.load(Ordering::SeqCst) {
            break;
        } else {
            range.iter_mut().for_each(|i| *i += 1.0);
        }
    }
    println!("{:?} Joining", thread::current().id());
}

fn start_work(mtx: Arc<Mutex<bool>>, cond_var: Arc<Condvar>) {
    let mut started = mtx.lock().unwrap();
    *started = true;
    cond_var.notify_all();
}

fn reset_work(mtx: Arc<Mutex<bool>>) {
    let mut started = mtx.lock().unwrap();
    *started = false;
}
fn main() {
    let num_threads = 4;
    let test_barrier = Arc::new(Barrier::new(num_threads + 1));

    let test_mutex = Arc::new(Mutex::new(false));
    let test_cond_var = Arc::new(Condvar::new());

    let mut my_data = vec![0.0; 1024];
    my_data
        .iter_mut()
        .enumerate()
        .for_each(|(i, iter)| *iter = i as f32);
    let chunk_size = my_data.len() / num_threads;
    let quitting = AtomicBool::new(false);
    rayon::scope(|s| {
        for chunk in my_data.chunks_mut(chunk_size) {
            let thread_mtx = test_mutex.clone();
            let thread_cond_var = test_cond_var.clone();
            let thread_barrier = Arc::clone(&test_barrier);
            let temp = &quitting;
            s.spawn(move |_| do_work(thread_mtx, thread_cond_var, thread_barrier, &temp, chunk));
        }
        test_barrier.wait();
        let _upper_bound = 1024 / num_threads;
        for _i in 0..10 {
            start_work(test_mutex.clone(), test_cond_var.clone());
            test_barrier.wait();
            reset_work(test_mutex.clone());
        }
        quitting.store(true, Ordering::SeqCst);
    });
    println!("my_data is: {:?}", my_data);
}

Cargo.toml dependencies:

rayon = "*"

This is a test for more complicated math that do_work will do later on, but I am trying to just get a series of threads that successfully modify a piece of a larger Vec.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • *modify a piece of a larger Vec* — there's frequently easier ways to do that with Rayon, like using [`ParallelSliceMut::par_chunks_mut`](https://docs.rs/rayon/latest/rayon/slice/trait.ParallelSliceMut.html#method.par_chunks_mut). See also [How do I pass disjoint slices from a vector to different threads?](https://stackoverflow.com/q/33818141/155423); [How can I pass a reference to a stack variable to a thread?](https://stackoverflow.com/q/32750829/155423). – Shepmaster Feb 15 '22 at 21:06
  • I tried that first. Unfortunately the latency in spinning everything up was too much. I implemented things this way so that the threads are already spun up and ready to number crunch whenever the condition variable wakes up. – Matthew Pittenger Feb 15 '22 at 21:07
  • *the latency in spinning everything up was too much* — It's unrelated to this question, but that seems pretty questionable. I'd want to see benchmarks before I fully believed it. – Shepmaster Feb 15 '22 at 21:10
  • I did benchmark it, there was a measurable difference between the condition variable waking up and par_chunks_mut. Which is what moved me to keep my threads alive and ready for data the entire time. I am also attempting to avoid a spinlock, which would be much simpler, but needlessly tie resources down when the threads could be sleeping. Thank you for providing the other references, I had read those which is what lead me to the scope layout and the chunks_mut. I didn't reference those as they seem to be working and aren't in question at the moment. – Matthew Pittenger Feb 15 '22 at 21:14
  • Unrelated: none of your `Arc`s are needed. Since you are using Rayon, you may pass in references (as you already read in "How can I pass a reference to a stack variable to a thread?") – Shepmaster Feb 15 '22 at 21:38
  • I think your race condition is that one or more threads starts waiting for the barrier and then the main thread sets the `quitting` variable. Nothing will get the thread out of the `barrier.wait()` call. You can add many prints to the program. When I did so, I see the final output of one thread is `[ThreadId(8)] barrier waiting...` – Shepmaster Feb 15 '22 at 21:40
  • And running the non-sleep code does occasionally succeed — it's all up to the whims of the OS scheduler. – Shepmaster Feb 15 '22 at 21:48
  • Yes, what I am wondering is how to deterministically solve the problem. Adding a sleep is only a probable solution. But it seems like there should be a way with this architecture to make things work without sleep statements. – Matthew Pittenger Feb 16 '22 at 04:16

1 Answers1

0

I got the intended behavior working as desired. This seems especially convoluted and like there should be a better way. I would happily accept an answer that is less convoluted than what I have, but it at least produces the desired behavior, a threadpool with prespun threads that produce work in bursts whenever it receives the proper signal from the master thread and can shutdown in a deterministic way. The extra starting and finishing threads provide a handshaking mechanism to ensure there are no race conditions coming up to the barriers.

use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Barrier,
    },
    thread,
};

fn do_work(
    start_barrier: &Barrier,
    finish_barrier: &Barrier,
    quitting: &AtomicBool,
    starting: &AtomicBool,
    finishing: &AtomicBool,
    range: &mut [f32],
) {
    while !quitting.load(Ordering::SeqCst) {
        start_barrier.wait();
        while !starting.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
        {
            // let mut started = mtx.lock().unwrap();
            // while !*started && !quitting.load(Ordering::SeqCst) {
            //     started = cond_var.wait(started).unwrap();
            // }
        }
        if quitting.load(Ordering::SeqCst) {
            break;
        } else {
            range.iter_mut().for_each(|i| *i += 1.0);
        }
        finish_barrier.wait();
        while !finishing.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
    }
    println!("{:?} Joining", thread::current().id());
}

fn main() {
    let num_threads = 4;
    let start_barrier = Barrier::new(num_threads + 1);
    let finish_barrier = Barrier::new(num_threads + 1);
    let mut my_data = vec![0.0; 1024];
    my_data
        .iter_mut()
        .enumerate()
        .for_each(|(i, iter)| *iter = i as f32);
    let chunk_size = my_data.len() / num_threads;
    let quitting = AtomicBool::new(false);
    let starting = AtomicBool::new(false);
    let finishing = AtomicBool::new(false);


    rayon::scope(|s| {
        for chunk in my_data.chunks_mut(chunk_size) {
            let thread_start_barrier = &start_barrier;
            let thread_finish_barrier = &finish_barrier;

            let thread_quitting = &quitting;
            let thread_starting = &starting;
            let thread_finishing = &finishing;

            s.spawn(move |_| do_work(   thread_start_barrier,
                                        thread_finish_barrier,
                                        thread_quitting, 
                                        thread_starting,
                                        thread_finishing,
                                        chunk));
        }
        let num_rounds = 10;
        for i in 0..num_rounds {
            let start = std::time::Instant::now();
            start_barrier.wait();
            finishing.store(false, Ordering::SeqCst);
            starting.store(true, Ordering::SeqCst);
            finish_barrier.wait();
            if i == num_rounds-1 {
                quitting.store(true, Ordering::SeqCst);
            }
            finishing.store(true, Ordering::SeqCst);
            starting.store(false, Ordering::SeqCst);
            println!("Round {} took: {:?}", i, std::time::Instant::now() - start);
        }
    });
    println!("my_data is: {:?}", my_data);
}
  • I'm not sure I need both the barrier and the condition variables. The starting and finishing atomic books are necessary to prevent a race condition to the barrier/condition variable. Tomorrow I will attempt to remove either the barrier or the condition variable to confirm this. – Matthew Pittenger Feb 16 '22 at 21:53
  • It appears that the condition_variable is redundant with the barrier. I will edit the above and to remove the condition_variable and mark as answer unless a better response comes along. – Matthew Pittenger Feb 17 '22 at 16:37