2

I'm trying to make a thread pool that blocks the main thread until all it's children have completed. The real-world use-case for this is a "Controller" process that spawns independent processes for the user to interact with.

Unfortunately, when the main exits, a segmentation fault is encountered. I cannot figure out the cause of this segmentation fault.

I've authored a Process class which is little more than opening a shell script (called waiter.sh that contains a sleep 5) and waiting for the pid to exit. The Process class is initialized and then the Wait() method is placed in one of the threads in the thread pool.

The problem arises when ~thread_pool() is called. The std::queue cannot properly deallocate the boost::function passed to it, even though the reference to Process is still valid.

#include <sys/types.h>
#include <sys/wait.h>
#include <spawn.h>

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

extern char **environ;

class Process {
private:
    pid_t pid;
    int status;
public:

    Process() : status(0), pid(-1) {
    }

    ~Process() {
        std::cout << "calling ~Process" << std::endl;
    }

    void Spawn(char **argv) {
        // want spawn posix and wait for th epid to return
        status = posix_spawn(&pid, "waiter.sh", NULL, NULL, argv, environ);
        if (status != 0) {
            perror("unable to spawn");
            return;
        }
    }

    void Wait() {
        std::cout << "spawned proc with " << pid << std::endl;
        waitpid(pid, &status, 0);
        //        wait(&pid);
        std::cout << "wait complete" << std::endl;
    }

};

Below is the thread_pool class. This is loosely adapted from the accepted answer for this question

class thread_pool {
private:
    std::queue<boost::function<void() >> tasks;
    boost::thread_group threads;
    std::size_t available;
    boost::mutex mutex;
    boost::condition_variable condition;
    bool running;
public:

thread_pool(std::size_t pool_size) : available(pool_size), running(true) {
    std::cout << "creating " << pool_size << " threads" << std::endl;
    for (std::size_t i = 0; i < available; ++i) {
        threads.create_thread(boost::bind(&thread_pool::pool_main, this));
    }
}

~thread_pool() {
    std::cout << "~thread_pool" << std::endl;
    {
        boost::unique_lock<boost::mutex> lock(mutex);
        running = false;
        condition.notify_all();
    }

    try {
        threads.join_all();
    } catch (const std::exception &) {
        // supress exceptions
    }
}

template <typename Task>
void run_task(Task task) {

    boost::unique_lock<boost::mutex> lock(mutex);
    if (0 == available) {
        return; //\todo err
    }

    --available;

    tasks.push(boost::function<void()>(task));
    condition.notify_one();
    return;
}

private:

void pool_main() {

    // wait on condition variable while the task is empty and the pool is still 
    // running
    boost::unique_lock<boost::mutex> lock(mutex);
    while (tasks.empty() && running) {
        condition.wait(lock);
    }

    // copy task locally and remove from the queue. this is
    // done within it's own scope so that the task object is destructed 
    // immediately after running the task. This is useful in the
    // event that the function contains shared_ptr arguments
    // bound via 'bind'
    {
        auto task = tasks.front();
        tasks.pop();

        lock.unlock();

        // run the task
        try {
            std::cout << "running task" << std::endl;
            task();
        } catch (const std::exception &) {
            // supress
        }
    }

    // task has finished so increment count of availabe threads
    lock.lock();
    ++available;

    }
};

Here is the main:

int main() {

    // input arguments are not required
    char *argv[] = {NULL};
    Process process;
    process.Spawn(argv);

    thread_pool pool(5);

    pool.run_task(boost::bind(&Process::Wait, &process));

    return 0;
}

The output for this is

creating 5 threads
~thread_pool
I am waiting... (from waiting.sh)
running task
spawned proc with 2573
running task
running task
running task
running task
wait complete
Segmentation fault (core dumped)

And here is the stack trace:

Starting program: /home/jandreau/NetBeansProjects/Controller/dist/Debug/GNU-    Linux/controller 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
creating 5 threads
[New Thread 0x7ffff691d700 (LWP 2600)]
[New Thread 0x7ffff611c700 (LWP 2601)]
[New Thread 0x7ffff591b700 (LWP 2602)]
[New Thread 0x7ffff511a700 (LWP 2603)]
[New Thread 0x7ffff4919700 (LWP 2604)]
~thread_pool
running task
running task
spawned proc with 2599
[Thread 0x7ffff611c700 (LWP 2601) exited]
running task
[Thread 0x7ffff591b700 (LWP 2602) exited]
running task
[Thread 0x7ffff511a700 (LWP 2603) exited]
running task
[Thread 0x7ffff4919700 (LWP 2604) exited]
I am waiting...
wait complete
[Thread 0x7ffff691d700 (LWP 2600) exited]

Thread 1 "controller" received signal SIGSEGV, Segmentation fault.
0x000000000040f482 in boost::detail::function::basic_vtable0<void>::clear (
this=0xa393935322068, functor=...)
at /usr/include/boost/function/function_template.hpp:509
509           if (base.manager)
(gdb) where
#0  0x000000000040f482 in boost::detail::function::basic_vtable0<void>::clear (
this=0xa393935322068, functor=...)
at /usr/include/boost/function/function_template.hpp:509
#1  0x000000000040e263 in boost::function0<void>::clear (this=0x62ef50)
at /usr/include/boost/function/function_template.hpp:883
#2  0x000000000040cf20 in boost::function0<void>::~function0 (this=0x62ef50, 
__in_chrg=<optimized out>)
at /usr/include/boost/function/function_template.hpp:765
#3  0x000000000040b28e in boost::function<void ()>::~function() (
this=0x62ef50, __in_chrg=<optimized out>)
at /usr/include/boost/function/function_template.hpp:1056
#4  0x000000000041193a in std::_Destroy<boost::function<void ()> >(boost::function<void ()>*) (__pointer=0x62ef50)
at /usr/include/c++/5/bits/stl_construct.h:93
#5  0x00000000004112df in  std::_Destroy_aux<false>::__destroy<boost::function<void ()>*>(boost::function<void ()>*, boost::function<void ()>*) (
__first=0x62ef50, __last=0x62ed50)
at /usr/include/c++/5/bits/stl_construct.h:103
#6  0x0000000000410d16 in std::_Destroy<boost::function<void ()>*>(boost::function<void ()>*, boost::function<void ()>*) (__first=0x62edd0, __last=0x62ed50)
at /usr/include/c++/5/bits/stl_construct.h:126
#7  0x0000000000410608 in std::_Destroy<boost::function<void ()>*, boost::function<void ()> >(boost::function<void ()>*, boost::function<void ()>*, std::allocat---Type <return> to continue, or q <return> to quit---
or<boost::function<void ()> >&) (__first=0x62edd0, __last=0x62ed50)
at /usr/include/c++/5/bits/stl_construct.h:151
#8  0x000000000040fac5 in std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > >::_M_destroy_data_aux(std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>) (this=0x7fffffffdaf0, __first=..., __last=...)
at /usr/include/c++/5/bits/deque.tcc:845
#9  0x000000000040e6e4 in std::deque<boost::function<void ()>,  std::allocator<boost::function<void ()> > >::_M_destroy_data(std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::allocator<boost::function<void ()> > const&) (
this=0x7fffffffdaf0, __first=..., __last=...)
at /usr/include/c++/5/bits/stl_deque.h:2037
#10 0x000000000040d0c8 in std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > >::~deque() (this=0x7fffffffdaf0, 
__in_chrg=<optimized out>) at /usr/include/c++/5/bits/stl_deque.h:1039
#11 0x000000000040b3ce in std::queue<boost::function<void ()>, std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > > >::~queue() (
this=0x7fffffffdaf0, __in_chrg=<optimized out>)
at /usr/include/c++/5/bits/stl_queue.h:96
#12 0x000000000040b6c0 in thread_pool::~thread_pool (this=0x7fffffffdaf0, 
---Type <return> to continue, or q <return> to quit---
__in_chrg=<optimized out>) at main.cpp:63  
#13 0x0000000000408b60 in main () at main.cpp:140

I'm puzzled by this because the Process hasn't yet gone out of scope and I'm passing a copy of the boost::function<void()> to the thread pool for processing.

Any ideas?

Community
  • 1
  • 1
Tyler Jandreau
  • 4,245
  • 1
  • 22
  • 47
  • Shouldn't you synchronize access to `tasks` in `pool_main`? – JohnB Oct 15 '16 at 17:32
  • @JohnB How would I do that? I'm a threading neophyte. – Tyler Jandreau Oct 15 '16 at 17:40
  • Call `lock.lock()` before accessing `tasks`, I suppose. – JohnB Oct 15 '16 at 18:09
  • No, did not read your code correctly. Locking seems fine. However, if I understand your code correctly, you add 1 task to the queue, yet 'running task' is printed 5 times? So you are accessing an empty queue? – JohnB Oct 15 '16 at 19:40
  • It's a thread pool. They immediately exit since they have nothing to do. The one that fails is the one that has a bound member function. – Tyler Jandreau Oct 15 '16 at 21:29
  • Yet count the pushes and pops to member variable `tasks`. There is one `push` happening throughout the program, but five `pops`. Replace `{ auto task = tasks.front(); tasks.pop(); ... ` by `if (!tasks.empty ()) { auto task = tasks.front(); tasks.pop(); ... ` and look what happens. – JohnB Oct 16 '16 at 16:29
  • @JohnB that was the problem. Thank you. Make that an answer and I'll accept it. – Tyler Jandreau Oct 17 '16 at 13:23

2 Answers2

1

The stack trace indicates that you are destroying a std::function that has not been properly initialized (e.g. some random memory location that is treated as being a std::function) or that you are destroying a std::function twice.

The problem is that your program pushes to tasks only once, but pops five times, hence you remove elements from an empty deque, which is undefined behaviour.

The while loop in pool_main terminates if running is false, and running may be false even if the deque is empty. Then you pop unconditionally. You might consider correcting pool_main as follows:

void pool_main() {

    // wait on condition variable
    // while the task is empty and the pool is still 
    // running
    boost::unique_lock<boost::mutex> lock(mutex);
    while (tasks.empty() && running) {
        condition.wait(lock);
    }

    // copy task locally and remove from the queue. this is
    // done within it's own scope so that the task object is destructed 
    // immediately after running the task. This is useful in the
    // event that the function contains shared_ptr arguments
    // bound via 'bind'
    if (!tasks.empty ()) {  // <--- !!!!!!!!!!!!!!!!!!!!!!!!
        auto task = tasks.front();
        tasks.pop();

        lock.unlock();

        // run the task
        try {
            std::cout << "running task" << std::endl;
            task();
        } catch (const std::exception &) {
            // supress
        }
    }

    // task has finished so increment count of availabe threads
    lock.lock();
    ++available;
};

I am, however, not sure whether the logic regarding available is correct. Shouldn't available be decremented on starting the processing of a task and be incremented when it is finished (hence be changed within pool_main only and only within the newly introduced if clause)?

JohnB
  • 13,315
  • 4
  • 38
  • 65
  • Available, in this case, is the amount of threads available to the pool. When one thread is run to completion, it is popped from the task queue and then made available once again. Your solution with the `tasks.empty()` properly solved the problem. Thank you for your help. – Tyler Jandreau Oct 19 '16 at 12:23
0

You don't seem to be allocating memory for

extern char **environ;

anywhere. Though wouldn't that be a link error?

Cutting this back to be a minimal reproduction case would help a lot. There's a lot of code here that's presumably not necessary to reproduce the problem.

Also, what is this:

    // supress exceptions

If you are getting exceptions while joining your threads, then you presumably haven't joined them all and cleaning up the threads without joining them will cause an error after main exits.

xaxxon
  • 19,189
  • 5
  • 50
  • 80