I am trying to make a worker pool in rust. The design itself is simple, the idea is you add function pointers to a structure as tasks (assumed to be independent). Then that structure spawns a number of threads equal to the maximum amount of threads on the machine.
Each thread then acquires a task from the queue, executes it and when it is done acquires the next until they are all consumed.
In order to achieve it I tried the following:
use std::sync::{Arc, Mutex};
use std::thread;
pub struct ThreadPool<'a>
{
task_queue: Vec<Task<'a>>,
task_count: Arc<Mutex<usize>>,
}
pub struct Task<'a>(Box<dyn FnMut() + 'a>);
unsafe impl<'a> Send for Task<'a> {}
impl<'a> Task<'a>
{
fn call(&mut self) { self.0() }
}
impl<'a> ThreadPool<'a>
{
fn add_task<T>(&mut self, task: T)
where
T: 'a + FnMut() -> (),
{
self.task_queue.push(Task(Box::new(task)));
}
fn run(&mut self)
{
let thread_count = thread::available_parallelism().unwrap().get();
println!("{}", thread_count);
let mut handlers = Vec::with_capacity(thread_count);
for _ in 0..thread_count
{
unsafe {
let queue = &mut self.task_queue as *mut Vec<Task<'a>>;
let task_count = Arc::clone(&self.task_count);
handlers.push(thread::spawn(move || {
let index = task_count.lock().unwrap().overflowing_add(1).0 - 1;
(*queue)[index].call();
}));
}
}
}
}
This is not even compiling.
The kind of interface I would like would be something like:
let mut thread_pool = ThreadPool {
task_queue: Vec::new(),
task_count: Arc::new(Mutex::new(0)),
};
for i in 0..100
{
thread_pool.add_task(move || println!(r"ran {i} task"));
}
thread_pool.run();
i.e. you register the tasks you want, each tasks captures data outside of it in the surrounding scope then it all runs.
I tried searching examples of something like this but the thread pool example in the docs is very different.
error[E0277]: `*mut std::vec::Vec<Task<'a>>` cannot be sent between threads safely
--> examples/06_fluid/thread_pool.rs:38:45
|
38 | handlers.push(thread::spawn(move || {
| ------------- ^------
| | |
| _______________________________|_____________within this `[closure@examples/06_fluid/thread_pool.rs:38:45: 38:52]`
| | |
| | required by a bound introduced by this call
39 | | let index = task_count.lock().unwrap().overflowing_add(1).0 - 1;
40 | | (*queue)[index].call();
41 | | }));
| |_________________^ `*mut std::vec::Vec<Task<'a>>` cannot be sent between threads safely
|
= help: within `[closure@examples/06_fluid/thread_pool.rs:38:45: 38:52]`, the trait `Send` is not implemented for `*mut std::vec::Vec<Task<'a>>`
note: required because it's used within this closure
--> examples/06_fluid/thread_pool.rs:38:45
|
38 | handlers.push(thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> /home/makogan/.rustup/toolchains/nightly-2022-10-29-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:705:8
|
705 | F: Send + 'static,
| ^^^^ required by this bound in `spawn`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `neverengine` due to previous error