I implemented a worker pool based on the rust book, but when I tried to use criterion to benchmark it, I found the parallel version is very slow then sequential version, Is there any logical error in my worker pool implement?
The most part of my worker pool is just following the rust book.
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};
pub type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct Woker {
pub id: usize,
pub thread: Option<JoinHandle<()>>,
}
impl Woker {
pub fn new(
id: usize,
job_receiver: Arc<Mutex<Receiver<Job>>>,
) -> Self {
Self {
id,
thread: Some(thread::spawn(move || {
loop {
let result = job_receiver.lock().unwrap().recv();
match result {
Ok(job) => {
job();
}
Err(_) => {
break;
}
}
}
}))
}
}
}
pub struct WokerPool {
workers: Vec<Woker>,
job_sender: Option<Sender<Job>>,
}
pub type ThreadSafeWorkPool = Arc<Mutex<WokerPool>>;
impl WokerPool {
pub fn new(size: usize) -> Self {
let (job_sender, job_receiver) = channel::<Job>();
let thread_safe_job_receiver = Arc::new(Mutex::new(job_receiver));
let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Woker::new(i,Arc::clone(&thread_safe_job_receiver)))
}
Self {
job_sender: Some(job_sender),
workers,
}
}
pub fn execute<F>(&mut self, f: F)
where
F: FnOnce() + Send + 'static
{
self.job_sender.as_mut().unwrap().send(Box::new(f)).unwrap();
}
}
impl Drop for WokerPool {
fn drop(&mut self) {
drop(self.job_sender.take());
for worker in &mut self.workers {
// println!("Work id {} stop", worker.id);
if let Some(thread) = worker.thread.take() {
match thread.join() {
Ok(_) => {},
Err(_) => { println!("Error when join {}", worker.id) }
}
}
}
}
}
My benchmark code uses the example from criterion :
use criterion::{criterion_group, criterion_main, Criterion};
use learning_rust_parallel::work_pool::WokerPool;
use std::sync::{Arc, Mutex};
fn criterion_benchmark(c: &mut Criterion) {
let iter_time = 500000000;
let size = 10;
c.bench_function("parallel sum", |b| {
b.iter(|| {
let mut worker_pool_for_parallel = WokerPool::new(size);
let sum = Arc::new(Mutex::new(0 as u128));
for _i in 0..size {
let sum_clone = Arc::clone(&sum);
worker_pool_for_parallel.execute(move || {
let mut innner_sum = 0 ;
for _index in 0..iter_time/ size {
innner_sum += 1;
}
*sum_clone.lock().unwrap() += innner_sum
})
}
})
});
c.bench_function("sequential sum", |b| {
b.iter(|| {
// create same size pool to make sequential version contain time of create threads.
let _worker_pool_for_parallel = WokerPool::new(5);
let mut sum: u128 = 0;
for _i in 0..iter_time {
sum += 1;
}
})
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);