0

I implemented a worker pool based on the rust book, but when I tried to use criterion to benchmark it, I found the parallel version is very slow then sequential version, Is there any logical error in my worker pool implement?

The most part of my worker pool is just following the rust book.

use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};

pub type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct Woker {
    pub id: usize,
    pub thread: Option<JoinHandle<()>>,
}

impl Woker {
    pub fn new(
        id: usize, 
        job_receiver: Arc<Mutex<Receiver<Job>>>,
    ) -> Self {
        Self {
            id,
            thread: Some(thread::spawn(move || {
                loop {
                    let result = job_receiver.lock().unwrap().recv();
                    match result {
                        Ok(job) => {
                            job();
                        }
                        Err(_) => {
                            break;
                        }
                    }
                }
            }))
        }
    }
}

pub struct WokerPool {
    workers: Vec<Woker>,
    job_sender: Option<Sender<Job>>,
}

pub type ThreadSafeWorkPool = Arc<Mutex<WokerPool>>;

impl WokerPool {
    pub fn new(size: usize) -> Self {
        let (job_sender, job_receiver) = channel::<Job>();
        let thread_safe_job_receiver = Arc::new(Mutex::new(job_receiver));
        let mut workers = Vec::with_capacity(size);
        for i in 0..size {
            workers.push(Woker::new(i,Arc::clone(&thread_safe_job_receiver)))
        }
        Self {
            job_sender: Some(job_sender),
            workers,
        }
    }
    pub fn execute<F>(&mut self, f: F) 
    where
        F: FnOnce() + Send + 'static
    {
        self.job_sender.as_mut().unwrap().send(Box::new(f)).unwrap();
    }
}
impl Drop for WokerPool {
    fn drop(&mut self) {
        drop(self.job_sender.take());
        for worker in &mut self.workers {
            // println!("Work id {} stop", worker.id);
            if let Some(thread) = worker.thread.take() {
                 match thread.join() {
                    Ok(_) => {},
                    Err(_) => { println!("Error when join {}", worker.id) }
                 }
            }
        }
    }
}

My benchmark code uses the example from criterion :

use criterion::{criterion_group, criterion_main, Criterion};
use learning_rust_parallel::work_pool::WokerPool;
use std::sync::{Arc, Mutex};

fn criterion_benchmark(c: &mut Criterion) {
    let iter_time = 500000000;
    let size = 10;
    c.bench_function("parallel sum", |b| {
        b.iter(|| {
            let mut worker_pool_for_parallel = WokerPool::new(size);
            let sum = Arc::new(Mutex::new(0 as u128));
            for _i in 0..size {
                let sum_clone = Arc::clone(&sum);
                worker_pool_for_parallel.execute(move || {
                        let mut innner_sum = 0 ;
                        for _index in 0..iter_time/ size  {
                            innner_sum += 1;
                        }
                        *sum_clone.lock().unwrap() += innner_sum
                })
            }
        })
    });
    c.bench_function("sequential sum", |b| {
        b.iter(|| { 
            // create same size pool to make sequential version contain time of create threads.
            let _worker_pool_for_parallel = WokerPool::new(5);
            let mut sum: u128 = 0;
            for _i in 0..iter_time {
                sum += 1;
            }
        })
    });

}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
steven-lie
  • 75
  • 2
  • 7
  • You may want to look at [MPMC channels](https://docs.rs/crossbeam-channel). – Chayim Friedman Aug 30 '23 at 07:24
  • 1
    The compiler probably optimizes the loop to one addition. Try with `black_box()`. – Chayim Friedman Aug 30 '23 at 07:28
  • @ChayimFriedman thanks for your reply, I changed loop into recursive function and used ```black_box```, the worker pool version now is faster than the sequential, but when I increase the size of the worker pool, the performance decreases, do you have any clue about this behavior ? – steven-lie Aug 30 '23 at 07:58
  • 1
    @steven-lie Using more threads than you have CPU cores will slow down your computation. That's because more threads mean more synchronization overhead, but each CPU core can only execute one thread at a time, so the extra threads will have to wait while the CPU cores are busy and won't improve performance. – Jmb Aug 30 '23 at 09:21
  • This is expected and described by the [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law). Sequential overheads can be the thread creation (due to the OS data structures), atomics (serialized by the hardware itself), locks (same thing) or thread synchronization (as pointed out by Jmb). By the way, `u128` is apparently [deprecated](https://doc.rust-lang.org/std/u128/index.html) and not efficient since [it is not a native integer](https://stackoverflow.com/questions/57340308/how-does-rusts-128-bit-integer-i128-work-on-a-64-bit-system) on most systems. – Jérôme Richard Aug 30 '23 at 19:15
  • 2
    @JérômeRichard the `u128` type is not deprecated. The `u128` _module_ is deprecated because everything it contained is now available directly on the type (same as [other](https://doc.rust-lang.org/std/i8/index.html) [integer](https://doc.rust-lang.org/std/i32/index.html) [types](https://doc.rust-lang.org/std/i64/index.html)). – Jmb Aug 31 '23 at 13:22

1 Answers1

0

There is nothing to benchmark here, as the code

pub fn foo(iter_time: usize) -> usize{
    let mut sum = 0;
    for _i in 0..iter_time {
       sum += 1;
    }
    sum
}

compiles down to single (de-facto) no-op: The sum of (1+1+1+1..n) for any n is n, and the compiler smart enough to figure that out. So the "sequential" version is actually of complexity O(1), independent of n.

In the more general case: Be aware that CPUs love to read memory sequentially, because latency can be hidden very effectively. More than one thread may yield more instructions per wall-clock-second, but if addition is the only operation to be performed anyway, the memory latency introduced due to varying access patterns destroys any benefit.

user2722968
  • 13,636
  • 2
  • 46
  • 67
  • For proof: https://rust.godbolt.org/z/o1daqMEPT the `foo` function does indeed compile to a simple `mov rax, rdi`, as in "copy the argument to the return value" – Colonel Thirty Two Aug 31 '23 at 19:01