This code runs to completion and prints out the values of my_data
if I uncomment the sleep
line in the do_work
function. If I leave it commented out, my executable hangs every time.
Why does a Condvar not wake the last thread? mentions collecting the handles and waiting for them to join in the main thread, but that should be taken care of by the rayon scope, correct?
How can I have this code complete without the sleep
statement in do_work()
?
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Barrier, Condvar, Mutex,
},
thread,
time::Duration,
};
fn do_work(
mtx: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
barrier: Arc<Barrier>,
quitting: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
barrier.wait();
//thread::sleep(Duration::from_micros(1));
let mut started = mtx.lock().unwrap();
while !*started && !quitting.load(Ordering::SeqCst) {
started = cond_var.wait(started).unwrap();
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i += 1.0);
}
}
println!("{:?} Joining", thread::current().id());
}
fn start_work(mtx: Arc<Mutex<bool>>, cond_var: Arc<Condvar>) {
let mut started = mtx.lock().unwrap();
*started = true;
cond_var.notify_all();
}
fn reset_work(mtx: Arc<Mutex<bool>>) {
let mut started = mtx.lock().unwrap();
*started = false;
}
fn main() {
let num_threads = 4;
let test_barrier = Arc::new(Barrier::new(num_threads + 1));
let test_mutex = Arc::new(Mutex::new(false));
let test_cond_var = Arc::new(Condvar::new());
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_mtx = test_mutex.clone();
let thread_cond_var = test_cond_var.clone();
let thread_barrier = Arc::clone(&test_barrier);
let temp = &quitting;
s.spawn(move |_| do_work(thread_mtx, thread_cond_var, thread_barrier, &temp, chunk));
}
test_barrier.wait();
let _upper_bound = 1024 / num_threads;
for _i in 0..10 {
start_work(test_mutex.clone(), test_cond_var.clone());
test_barrier.wait();
reset_work(test_mutex.clone());
}
quitting.store(true, Ordering::SeqCst);
});
println!("my_data is: {:?}", my_data);
}
Cargo.toml dependencies:
rayon = "*"
This is a test for more complicated math that do_work
will do later on, but I am trying to just get a series of threads that successfully modify a piece of a larger Vec
.