0

i implemented a thread pool as described in this discussion Boost group_threads Maximal number of parallel thread

with one change i have a function which wait till all threads are finished:

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (optional<job_t> job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          //LOG_INFO_MESSAGE << "Number of possible Threads: " << boost::thread::hardware_concurrency() << std::endl;
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i){
              pool.create_thread(bind(worker_thread, ref(*this)));
          }
      }

      void enqueue(job_t job)
      {

          lock_guard<mutex> lk(mx);
          _queue.push_back(job);

          cv.notify_one();
      }

      optional<job_t> dequeue()
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          job_t job = _queue.front();
          _queue.pop_front();
          return job;
      }

      void WaitTillAllJobsAreDone(){
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }


      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
       }
}; 

My usage:

class Foo{
public: 
  Foo(std::vector<boost::shared_ptr<Class B> > data):m_data(data),m_maxDepth(5)
{

}
void initializateThreads(){
  thread_pool threadPool;
  std::vector<std::vector<double> > result(m_data.size());
  std::vector<std::vector<double> >::iterator it;=result.begin();
  for(auto d:m_data){
    threadPool.enqueue(boost::bind(&Foo::Work,this,d,boost::ref(*it))):
      ++it;
  }
  threadPool.WaitTillAllJobsAreDone();
  //do something with results;
} 
  void Work(boost::shared_ptr<Class B> ptr,std::vector<double>& resultThread,int currentDepth){
    if(currentDepth>m_maxDepth) return;
    //do some calculation with ptr and add it to resultThread
    resultThread.push_back((double) some result of B);


    Work(ptr,resultThread,(currentDepth+1));
  }
}

When I use my thread pool my program use a lot of memory after this section and never free it. Without thread pool i have no problems with the same function. Is there any error in the thread pool? I have to free the created threads ?

--------- EDIT --------------

int main()
{
  {
    std::vector<Class B> data; //filled
    Foo foo(data);
    foo.LoadSomedata();
    foo.initializateThreads();
  } //< all memory should be freed or ?
  while(1){}//< let process alive but memory in ressource manager should be very small or?
}

i wrote this test programm. Valgrind also says no memory leaks. When my program is inside the while loop the memory of my process should be very small or? But on the system monitor it has 3GB for this process. Do i have a mind error ?

VAlGrind output:

 ==24210== HEAP SUMMARY:
 ==24210==     in use at exit: 0 bytes in 0 blocks
 ==24210==   total heap usage: 2,055,546 allocs, 2,055,546 frees, 220,359,375 bytes allocated
 ==24210== 
 ==24210== All heap blocks were freed -- no leaks are possible
 ==24210== 
 ==24210== For counts of detected and suppressed errors, rerun with: -v
 ==24210== Use --track-origins=yes to see where uninitialised values come from
 ==24210== ERROR SUMMARY: 8964228 errors from 69 contexts (suppressed: 2 from 2)

I have a few known uninitialised pointer that are the errors.

Community
  • 1
  • 1
Hunk
  • 479
  • 11
  • 33
  • Make it a self- contained example. Your "my usage" code makes zero sense, full stop. What _is_ this: _`this,vector,boost sharedPtr,boost::ref(vector[i]::iterator), 0)`_? – sehe Nov 03 '14 at 15:59
  • sry i though the parameter of the function are not important. I edited this i hope it is better to understand now. I have a result vector for every Worker, which get modified – Hunk Nov 03 '14 at 16:28
  • Of course it's not unimportant if the arguments hold _resources_ and you are complaining about leaked _resources_. – sehe Nov 03 '14 at 16:41
  • that is true, but if i dont use poolThread this works well with the same functions and paramter – Hunk Nov 03 '14 at 16:53
  • We cannot reproduce the issue. You keep posting invalid code. There's no such thing as `Class B` in c++. I don't know what you expect us to do. You'll have to figure out until you can ask a valid question. – sehe Nov 04 '14 at 15:49

1 Answers1

0

Well, the leak is not in the code you showed.

See it Live On Coliru.

Valgrind says: All heap blocks were freed -- no leaks are possible:

==14398== Memcheck, a memory error detector
==14398== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al.
==14398== Using Valgrind-3.10.0.SVN and LibVEX; rerun with -h for copyright info
==14398== Command: ./test
==14398== 
....................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................==14398== 
==14398== HEAP SUMMARY:
==14398==     in use at exit: 0 bytes in 0 blocks
==14398==   total heap usage: 5,093 allocs, 5,093 frees, 772,512 bytes allocated
==14398== 
==14398== All heap blocks were freed -- no leaks are possible
==14398== 
==14398== For counts of detected and suppressed errors, rerun with: -v
==14398== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Likely Culprits:

Just making things up so you might get ideas:

  • ClassB (or other involved parties) might violate the RuleOfThree. DEMO here's the result with a suitably broken ClassB implementation:

    struct ClassB {
        ClassB() : stuff(new int[256]) { }
        int* stuff; // could be other resources, such as threads, filehandles etc.
    };
    

    You can see the leakage under valgrind:

    ==14458== LEAK SUMMARY:
    ==14458==    definitely lost: 510,976 bytes in 499 blocks
    ==14458==    indirectly lost: 0 bytes in 0 blocks
    ==14458==      possibly lost: 1,024 bytes in 1 blocks
    
  • Between invocations of initializateThreads [sic] you might be holding on to the data vectors.

  • You might be appending to the same data vectors (or output stream buffers) when you were expecting to overwrite

I recommend eliminating source of the leak until you localized it

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Hello thank your for your answer, my valgrind did the same result. I added my test case on my question. Dont know if I think wrong at the moment – Hunk Nov 04 '14 at 15:38
  • @Hunk the system monitor is _not_ a precise measuring device. Use `valgrind --tool=massif` to get some statistics (or google perftools heap-profiler based on libtcmalloc) – sehe Nov 04 '14 at 15:52