i implemented a thread pool as described in this discussion Boost group_threads Maximal number of parallel thread
with one change i have a function which wait till all threads are finished:
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (optional<job_t> job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
//LOG_INFO_MESSAGE << "Number of possible Threads: " << boost::thread::hardware_concurrency() << std::endl;
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i){
pool.create_thread(bind(worker_thread, ref(*this)));
}
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(job);
cv.notify_one();
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
job_t job = _queue.front();
_queue.pop_front();
return job;
}
void WaitTillAllJobsAreDone(){
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
My usage:
class Foo{
public:
Foo(std::vector<boost::shared_ptr<Class B> > data):m_data(data),m_maxDepth(5)
{
}
void initializateThreads(){
thread_pool threadPool;
std::vector<std::vector<double> > result(m_data.size());
std::vector<std::vector<double> >::iterator it;=result.begin();
for(auto d:m_data){
threadPool.enqueue(boost::bind(&Foo::Work,this,d,boost::ref(*it))):
++it;
}
threadPool.WaitTillAllJobsAreDone();
//do something with results;
}
void Work(boost::shared_ptr<Class B> ptr,std::vector<double>& resultThread,int currentDepth){
if(currentDepth>m_maxDepth) return;
//do some calculation with ptr and add it to resultThread
resultThread.push_back((double) some result of B);
Work(ptr,resultThread,(currentDepth+1));
}
}
When I use my thread pool my program use a lot of memory after this section and never free it. Without thread pool i have no problems with the same function. Is there any error in the thread pool? I have to free the created threads ?
--------- EDIT --------------
int main()
{
{
std::vector<Class B> data; //filled
Foo foo(data);
foo.LoadSomedata();
foo.initializateThreads();
} //< all memory should be freed or ?
while(1){}//< let process alive but memory in ressource manager should be very small or?
}
i wrote this test programm. Valgrind also says no memory leaks. When my program is inside the while loop the memory of my process should be very small or? But on the system monitor it has 3GB for this process. Do i have a mind error ?
VAlGrind output:
==24210== HEAP SUMMARY:
==24210== in use at exit: 0 bytes in 0 blocks
==24210== total heap usage: 2,055,546 allocs, 2,055,546 frees, 220,359,375 bytes allocated
==24210==
==24210== All heap blocks were freed -- no leaks are possible
==24210==
==24210== For counts of detected and suppressed errors, rerun with: -v
==24210== Use --track-origins=yes to see where uninitialised values come from
==24210== ERROR SUMMARY: 8964228 errors from 69 contexts (suppressed: 2 from 2)
I have a few known uninitialised pointer that are the errors.