I'm playing with cellular automatons and trying to boost my application performance by using multi threading. But I've some interesting results. I'm not really know why this is happening and what I'm missing...
So my aim is to process a big buffer of data as fast as I can. In my example I have a big (20000 x 20000) bool array and transforming it into an image (one bool value to one pixel). This process can be done in parallel; there isn't any dependency between the pixels. I divide the bool array into threadCount
number of blocks, start a new thread for each block, let them run, and wait for them to finish.
I assumed that with more threads I`ll get a bit better runtime. (I do not use an unrealistic thread count, just between one and the logical core number.)
So I wrote this:
typedef std::size_t Size;
typedef std::vector<bool> Data;
typedef std::vector<Data> History;
class RenderTask
{
public:
typedef void result_type;
public:
RenderTask(Ppm& ppm,
const Ppm::Pixel& fColor)
: mPpm(ppm),
mForegroundColor(fColor)
{
}
void operator()(const History& history,
const Size minIdxX,
const Size countX,
const Size minIdxY,
const Size countY)
{
const Size maxIdxX(minIdxX + countX);
const Size maxIdxY(minIdxY + countY);
for(Size y(minIdxY); y < maxIdxY; ++y)
{
for(Size x(minIdxX); x < maxIdxX; ++x)
{
if(history[y][x])
{
mPpm.setPixel(x, y, mForegroundColor);
}
}
}
}
private:
Ppm& mPpm;
const Ppm::Pixel mForegroundColor;
};
void render(const History& history,
Ppm& ppm,
const Ppm::Pixel& fColor,
const Size threadCount)
{
boost::asio::io_service io_service;
boost::thread_group threads;
for(Size i(0); i < threadCount; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
RenderTask task(ppm, fColor);
io_service.reset();
const Size count(history.size() / threadCount);
const Size rem(history.size() % threadCount);
Size minIdxY(0);
for(Size i(0); i < threadCount; ++i)
{
const bool addRemainders(rem && i == threadCount - 1);
io_service.post(boost::bind(task,
boost::cref(history),
0,
history.front().size(),
minIdxY,
addRemainders ? count + rem : count));
minIdxY += count;
}
threads.join_all();
}
int main(int argc, char* argv[])
{
const Size rule(parseNumber<Size>(argv[1]));
const Size size(parseNumber<Size>(argv[2]));
const Size iteration(parseNumber<Size>(argv[3]));
const Size threadCount(clamp(1,
static_cast<Size>(boost::thread::physical_concurrency())
parseNumber<Size>(argv[4])));
...
History history(iteration, Data(size, false));
history.front()[size / 2] = true;
...
process(history, rule, threadCount);
...
Ppm ppm(history.front().size(), history.size(), Ppm::Pixel(30, 30, 30));
std::cout << "rendering... ";
t.start();
render(history, ppm, Ppm::Pixel(200, 200, 200), threadCount);
t.stop();
std::cout << t.ms() << " ms" << std::endl;
}
But when I ran the program with different number of threads I got the following:
I have no idea why the more cores could not result in better performance. With two cores it's a bit better, but interestingly with three cores it almost as the same as with one core... These value are averages:
ThreadingTest.exe 110 20000 20000 1 test.ppm
rendering... 554.95 ms
ThreadingTest.exe 110 20000 20000 2 test.ppm
rendering... 289.75 ms
ThreadingTest.exe 110 20000 20000 3 test.ppm
rendering... 555.37 ms
ThreadingTest.exe 110 20000 20000 4 test.ppm
rendering... 554.23 ms
ThreadingTest.exe 110 20000 20000 5 test.ppm
rendering... 564.23 ms
ThreadingTest.exe 110 20000 20000 6 test.ppm
rendering... 551.82 ms
ThreadingTest.exe 110 20000 20000 7 test.ppm
rendering... 555.22 ms
ThreadingTest.exe 110 20000 20000 8 test.ppm
rendering... 510.12 ms
What can causes this? Am I using the io_service in a wrong fashion? There is no I/O operation involved, just pure memory.
I have eight cores in my machine with 16 GB of RAM.
For additional details, here is the outline of the Ppm class:
class Ppm
{
public:
struct Pixel
{
typedef unsigned char ChannelType;
ChannelType mRed, mGreen, mBlue;
...
};
typedef std::vector<Pixel> ImageData;
Ppm( const SizeType width
, const SizeType height
, const Pixel& color = Pixel() )
: mWidth( width )
, mHeight( height )
, mImageData( mWidth * mHeight, color )
{ }
void setPixel( SizeType x, SizeType y, const Pixel& p )
{
mImageData[x + y * mWidth] = p;
}
...
private:
const SizeType mWidth;
const SizeType mHeight;
ImageData mImageData;
};
UPDATE
After your valuable comments I changed the approach a lot and wrote this: Now I'm using pure c++'11 stuff and there is no boost involved anymore..
class ThreadPool
{
public:
ThreadPool(const Size threadCount);
~ThreadPool();
public:
template<class T>
void addTask(T task);
void wait();
private:
bool mStopped;
Size mRunningCount;
std::vector<std::thread> mWorkers;
std::deque<std::function<void()>> mTasks;
std::mutex mMutex;
std::condition_variable mCondition;
std::condition_variable mFinishCondition;
};
ThreadPool::ThreadPool(const Size threadCount)
: mStopped(false),
mRunningCount(0)
{
for (Size i(0); i < threadCount; ++i)
{
mWorkers.push_back(std::thread([this]()
{
std::function<void()> task;
while(true)
{
{
std::unique_lock<std::mutex> lock(this->mMutex);
this->mCondition.wait(lock, [this] { return this->mStopped ||
!this->mTasks.empty();
});
if(this->mStopped)
{
return;
}
++this->mRunningCount;
task = this->mTasks.front();
this->mTasks.pop_front();
}
task();
{
std::unique_lock<std::mutex> lock(this->mMutex);
--this->mRunningCount;
}
this->mFinishCondition.notify_all();
}
}));
}
}
ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mMutex);
mStopped = true;
mCondition.notify_all();
}
for(auto& worker : mWorkers)
{
worker.join();
}
}
template<class T>
void ThreadPool::addTask(T task)
{
{
std::unique_lock<std::mutex> lock(mMutex);
mTasks.push_back(std::function<void()>(task));
}
mCondition.notify_one();
}
void ThreadPool::wait()
{
std::unique_lock<std::mutex> lock(mMutex);
mFinishCondition.wait(lock, [this]()
{
return mTasks.empty() && mRunningCount == 0;
});
}
Now the performance is OK; using more threads the runtime gets faster... But something is not good with the wait method. I'm using it in this way:
ThreadPool pool(threadCount);
for(Size i(1); i < iteration; ++i)
{
Size count(history.front().size() / threadCount);
Size rem(history.front().size() % threadCount);
Size minIdx(0);
for(Size n(0); n < threadCount; ++n)
{
pool.addTask(std::bind(ECATask(rule),
std::cref(history[i-1]),
std::ref(history[i]),
minIdx,
(rem && n == threadCount - 1) ?
count + rem :
count));
minIdx += count;
}
pool.wait();
}
The problem with this in not clear yet, but it seems the pool.wait()
sometimes does not wait for all the current tasks to be finished and code starts a new iteration... Can you please do a code review for me? :)