c++boost-threadboost-bindboost-function

boost::function deallocation segmentation fault in thread pool


I'm trying to make a thread pool that blocks the main thread until all it's children have completed. The real-world use-case for this is a "Controller" process that spawns independent processes for the user to interact with.

Unfortunately, when the main exits, a segmentation fault is encountered. I cannot figure out the cause of this segmentation fault.

I've authored a Process class which is little more than opening a shell script (called waiter.sh that contains a sleep 5) and waiting for the pid to exit. The Process class is initialized and then the Wait() method is placed in one of the threads in the thread pool.

The problem arises when ~thread_pool() is called. The std::queue cannot properly deallocate the boost::function passed to it, even though the reference to Process is still valid.

#include <sys/types.h>
#include <sys/wait.h>
#include <spawn.h>

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

extern char **environ;

class Process {
private:
    pid_t pid;
    int status;
public:

    Process() : status(0), pid(-1) {
    }

    ~Process() {
        std::cout << "calling ~Process" << std::endl;
    }

    void Spawn(char **argv) {
        // want spawn posix and wait for th epid to return
        status = posix_spawn(&pid, "waiter.sh", NULL, NULL, argv, environ);
        if (status != 0) {
            perror("unable to spawn");
            return;
        }
    }

    void Wait() {
        std::cout << "spawned proc with " << pid << std::endl;
        waitpid(pid, &status, 0);
        //        wait(&pid);
        std::cout << "wait complete" << std::endl;
    }

};

Below is the thread_pool class. This is loosely adapted from the accepted answer for this question

class thread_pool {
private:
    std::queue<boost::function<void() >> tasks;
    boost::thread_group threads;
    std::size_t available;
    boost::mutex mutex;
    boost::condition_variable condition;
    bool running;
public:

thread_pool(std::size_t pool_size) : available(pool_size), running(true) {
    std::cout << "creating " << pool_size << " threads" << std::endl;
    for (std::size_t i = 0; i < available; ++i) {
        threads.create_thread(boost::bind(&thread_pool::pool_main, this));
    }
}

~thread_pool() {
    std::cout << "~thread_pool" << std::endl;
    {
        boost::unique_lock<boost::mutex> lock(mutex);
        running = false;
        condition.notify_all();
    }

    try {
        threads.join_all();
    } catch (const std::exception &) {
        // supress exceptions
    }
}

template <typename Task>
void run_task(Task task) {

    boost::unique_lock<boost::mutex> lock(mutex);
    if (0 == available) {
        return; //\todo err
    }

    --available;

    tasks.push(boost::function<void()>(task));
    condition.notify_one();
    return;
}

private:

void pool_main() {

    // wait on condition variable while the task is empty and the pool is still 
    // running
    boost::unique_lock<boost::mutex> lock(mutex);
    while (tasks.empty() && running) {
        condition.wait(lock);
    }

    // copy task locally and remove from the queue. this is
    // done within it's own scope so that the task object is destructed 
    // immediately after running the task. This is useful in the
    // event that the function contains shared_ptr arguments
    // bound via 'bind'
    {
        auto task = tasks.front();
        tasks.pop();

        lock.unlock();

        // run the task
        try {
            std::cout << "running task" << std::endl;
            task();
        } catch (const std::exception &) {
            // supress
        }
    }

    // task has finished so increment count of availabe threads
    lock.lock();
    ++available;

    }
};

Here is the main:

int main() {

    // input arguments are not required
    char *argv[] = {NULL};
    Process process;
    process.Spawn(argv);

    thread_pool pool(5);

    pool.run_task(boost::bind(&Process::Wait, &process));

    return 0;
}

The output for this is

creating 5 threads
~thread_pool
I am waiting... (from waiting.sh)
running task
spawned proc with 2573
running task
running task
running task
running task
wait complete
Segmentation fault (core dumped)

And here is the stack trace:

Starting program: /home/jandreau/NetBeansProjects/Controller/dist/Debug/GNU-    Linux/controller 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
creating 5 threads
[New Thread 0x7ffff691d700 (LWP 2600)]
[New Thread 0x7ffff611c700 (LWP 2601)]
[New Thread 0x7ffff591b700 (LWP 2602)]
[New Thread 0x7ffff511a700 (LWP 2603)]
[New Thread 0x7ffff4919700 (LWP 2604)]
~thread_pool
running task
running task
spawned proc with 2599
[Thread 0x7ffff611c700 (LWP 2601) exited]
running task
[Thread 0x7ffff591b700 (LWP 2602) exited]
running task
[Thread 0x7ffff511a700 (LWP 2603) exited]
running task
[Thread 0x7ffff4919700 (LWP 2604) exited]
I am waiting...
wait complete
[Thread 0x7ffff691d700 (LWP 2600) exited]

Thread 1 "controller" received signal SIGSEGV, Segmentation fault.
0x000000000040f482 in boost::detail::function::basic_vtable0<void>::clear (
this=0xa393935322068, functor=...)
at /usr/include/boost/function/function_template.hpp:509
509           if (base.manager)
(gdb) where
#0  0x000000000040f482 in boost::detail::function::basic_vtable0<void>::clear (
this=0xa393935322068, functor=...)
at /usr/include/boost/function/function_template.hpp:509
#1  0x000000000040e263 in boost::function0<void>::clear (this=0x62ef50)
at /usr/include/boost/function/function_template.hpp:883
#2  0x000000000040cf20 in boost::function0<void>::~function0 (this=0x62ef50, 
__in_chrg=<optimized out>)
at /usr/include/boost/function/function_template.hpp:765
#3  0x000000000040b28e in boost::function<void ()>::~function() (
this=0x62ef50, __in_chrg=<optimized out>)
at /usr/include/boost/function/function_template.hpp:1056
#4  0x000000000041193a in std::_Destroy<boost::function<void ()> >(boost::function<void ()>*) (__pointer=0x62ef50)
at /usr/include/c++/5/bits/stl_construct.h:93
#5  0x00000000004112df in  std::_Destroy_aux<false>::__destroy<boost::function<void ()>*>(boost::function<void ()>*, boost::function<void ()>*) (
__first=0x62ef50, __last=0x62ed50)
at /usr/include/c++/5/bits/stl_construct.h:103
#6  0x0000000000410d16 in std::_Destroy<boost::function<void ()>*>(boost::function<void ()>*, boost::function<void ()>*) (__first=0x62edd0, __last=0x62ed50)
at /usr/include/c++/5/bits/stl_construct.h:126
#7  0x0000000000410608 in std::_Destroy<boost::function<void ()>*, boost::function<void ()> >(boost::function<void ()>*, boost::function<void ()>*, std::allocat---Type <return> to continue, or q <return> to quit---
or<boost::function<void ()> >&) (__first=0x62edd0, __last=0x62ed50)
at /usr/include/c++/5/bits/stl_construct.h:151
#8  0x000000000040fac5 in std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > >::_M_destroy_data_aux(std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>) (this=0x7fffffffdaf0, __first=..., __last=...)
at /usr/include/c++/5/bits/deque.tcc:845
#9  0x000000000040e6e4 in std::deque<boost::function<void ()>,  std::allocator<boost::function<void ()> > >::_M_destroy_data(std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::_Deque_iterator<boost::function<void ()>, boost::function<void ()>&, boost::function<void ()>*>, std::allocator<boost::function<void ()> > const&) (
this=0x7fffffffdaf0, __first=..., __last=...)
at /usr/include/c++/5/bits/stl_deque.h:2037
#10 0x000000000040d0c8 in std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > >::~deque() (this=0x7fffffffdaf0, 
__in_chrg=<optimized out>) at /usr/include/c++/5/bits/stl_deque.h:1039
#11 0x000000000040b3ce in std::queue<boost::function<void ()>, std::deque<boost::function<void ()>, std::allocator<boost::function<void ()> > > >::~queue() (
this=0x7fffffffdaf0, __in_chrg=<optimized out>)
at /usr/include/c++/5/bits/stl_queue.h:96
#12 0x000000000040b6c0 in thread_pool::~thread_pool (this=0x7fffffffdaf0, 
---Type <return> to continue, or q <return> to quit---
__in_chrg=<optimized out>) at main.cpp:63  
#13 0x0000000000408b60 in main () at main.cpp:140

I'm puzzled by this because the Process hasn't yet gone out of scope and I'm passing a copy of the boost::function<void()> to the thread pool for processing.

Any ideas?


Solution

  • The stack trace indicates that you are destroying a std::function that has not been properly initialized (e.g. some random memory location that is treated as being a std::function) or that you are destroying a std::function twice.

    The problem is that your program pushes to tasks only once, but pops five times, hence you remove elements from an empty deque, which is undefined behaviour.

    The while loop in pool_main terminates if running is false, and running may be false even if the deque is empty. Then you pop unconditionally. You might consider correcting pool_main as follows:

    void pool_main() {
    
        // wait on condition variable
        // while the task is empty and the pool is still 
        // running
        boost::unique_lock<boost::mutex> lock(mutex);
        while (tasks.empty() && running) {
            condition.wait(lock);
        }
    
        // copy task locally and remove from the queue. this is
        // done within it's own scope so that the task object is destructed 
        // immediately after running the task. This is useful in the
        // event that the function contains shared_ptr arguments
        // bound via 'bind'
        if (!tasks.empty ()) {  // <--- !!!!!!!!!!!!!!!!!!!!!!!!
            auto task = tasks.front();
            tasks.pop();
    
            lock.unlock();
    
            // run the task
            try {
                std::cout << "running task" << std::endl;
                task();
            } catch (const std::exception &) {
                // supress
            }
        }
    
        // task has finished so increment count of availabe threads
        lock.lock();
        ++available;
    };
    

    I am, however, not sure whether the logic regarding available is correct. Shouldn't available be decremented on starting the processing of a task and be incremented when it is finished (hence be changed within pool_main only and only within the newly introduced if clause)?