I am trying to make a thread pool class which receives several functions and put them into the queue until they finish and then I can add another function to take advantage of the created threads instead of creating them when I want to run other functions. That's why I include a conditional variable to synchonize all the threads.
However, the code is not working properly because somehow when the function is invoked the object makes a copy. After several tries, I can not figure out what I am missing!
What I expect is that member function greetings
of the hw
object execute in parallel whit his index. But when the line (o.*f)(std::forward<Args>(args)...);
is executed the object is copied, although the copy constructor is deleted. So when it enters into the greetings
member it produces a SEGMENTATION FAULT
.
CMakeLists.txt
cmake_minimum_required(VERSION 3.5)
project(boost_asyo LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_executable(boost_asyo main.cpp)
target_link_libraries(${PROJECT_NAME} boost_thread boost_system)
main.cpp
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore
{
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore()
{
count = 0;
}
void wait()
{
std::unique_lock<std::mutex> m(lock);
while(count > 0)
cond.wait(m, [this]{ return count == 0; });
}
void take()
{
std::unique_lock m(lock);
count++;
}
void give()
{
std::unique_lock m(lock);
count--;
if(count == 0)
{
cond.notify_one();
}
}
};
class ThreadPool
{
private:
boost::asio::io_service m_io_service;
std::unique_ptr<boost::asio::io_service::work> m_work;
boost::thread_group m_threads;
Semaphore m_sem;
public:
ThreadPool(size_t n)
{
this->m_work = std::make_unique<boost::asio::io_service::work>(m_io_service);
for (size_t ii = 0; ii < n; ii++)
{
m_threads.create_thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service));
}
}
ThreadPool(const ThreadPool & v) = delete;
ThreadPool(ThreadPool && v) = delete;
~ThreadPool()
{
m_io_service.stop();
}
template<class type, class T, class T1, class... Args>
auto post(type T::*f, T1 &obj, Args... args)
{
this->m_sem.take();
this->m_io_service.post([&] ()
{
T o = static_cast<T&&>(obj);
(o.*f)(std::forward<Args>(args)...);
this->m_sem.give();
});
}
void wait()
{
this->m_sem.wait();
}
};
class HelloWorld
{
private:
public:
std::string m_str;
HelloWorld(std::string str) : m_str(str) {};
HelloWorld(const HelloWorld& v) = delete;
HelloWorld(HelloWorld&& v) = default;
~HelloWorld() = default;
void greetings(int ii)
{
for (int jj = 0; jj < 5; jj++)
{
std::cout << this->m_str << " " << ii << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
}
};
int main()
{
ThreadPool tp(8);
HelloWorld hw("Hola mundo");
for (int ii = 0; ii < 5; ii++)
{
tp.post(&HelloWorld::greetings, hw, ii);
}
tp.wait();
return 0;
}
This code is based on this one, which it works properly, and this is something similar to what I want to do with classes and members.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
class Semaphore
{
std::mutex lock;
std::condition_variable cond;
int count;
public:
Semaphore()
{
count = 0;
}
void wait()
{
std::unique_lock<std::mutex> m(lock);
while(count > 0)
cond.wait(m, [this]{ return count == 0; });
}
void take()
{
std::unique_lock m(lock);
count++;
}
void give()
{
std::unique_lock m(lock);
count--;
if(count == 0)
{
cond.notify_one();
}
}
};
int main()
{
boost::asio::io_service io_service;
std::unique_ptr<boost::asio::io_service::work> work = std::make_unique<boost::asio::io_service::work>(io_service);
boost::thread_group threads;
for (size_t ii = 0; ii < 2; ii++)
{
std::cout << "id: " << ii << std::endl;
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
Semaphore sem;
for (size_t ii = 0; ii < 3; ii++)
{
//Take
sem.take();
io_service.post([ii, &sem] ()
{
int id = 0;
while(id < 5)
{
id++;
printf("hello world %i\n", static_cast<int>(ii));
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
//Give
sem.give();
});
}
sem.wait();
for (size_t ii = 0; ii < 3; ii++)
{
sem.take();
io_service.post([ii, &sem] ()
{
int id = 0;
while(id < 5)
{
id++;
printf("bye world %i\n", static_cast<int>(ii));
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
sem.give();
});
}
sem.wait();
io_service.stop();
return 0;
}