1

I am building own webserver and I want it to be multithreaded. I don't know how start new thread, store it in some list, detect when thread is done and add some new from waiting queue. Can anyone give me simple example how to do this ? Now I am running threads in this way:

boost::thread t(app, client_fd); // but it's not good way because I can't control it

Below is pseucode that's illustrate what I mean:

class worker
{
    public:
    void run(int cfd)
    {
    // do something
    }
}

std::vector<int> waitQueue;
std::vector<worker> runningQueue;

onAcceptClient(int client_fd)
{
    waitQueue.insert(client_fd);
}

while(1) // this must run in single thread
{
    client_fd = accept(...);
    onAcceptClient(client_fd);
}

while(1) // this must run in single thread
{
    if (runningQueue.size() < 128)
    {
       int diff = 128 - runningQueue.size() ;
       for (int a = 0; a < diff; a++)
       {
        int cfc = waitQueue.pop();
        worker w;
        w.run(cfc);
        runningQueue.insert(w);
       }
     }
}
mitch
  • 2,235
  • 3
  • 27
  • 46
  • Not a direct answer to you question, but you should definitly have a look at Boost::Asio (http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio.html), especially if you are working on a webserver. It allows you to divide the work on different threads (See e.g. http://stackoverflow.com/q/14265676/991425) and ist made specificly for network IO, which is what you are planning to do a lot, when programming a webserver, I suppose. – Haatschii Mar 06 '14 at 21:26
  • You need some kind of mutex for the queue. When using windows the easiest is CriticalSection. On Unix, mutex. For the thread to signal when it is finished, I use a semaphore and wait on it. – Ian Thompson Mar 06 '14 at 21:31

2 Answers2

0

This is how I implement this.

First, I have a base ThreadWorker class that my actual processing tasks can descend from. Note that this is Windows specific, but you can substitute boost mutex for the CriticalSection, and the POSIX implementation of semaphores is here: http://linux.die.net/man/7/sem_overview

class ThreadWorker
{
public:
ThreadWorker(void)
{
    signalfinished = NULL;
    forcesignal = false;
}
virtual ~ThreadWorker(void)
{
    if (signalfinished!=NULL)   ReleaseSemaphore(signalfinished,1,NULL);
}

DWORD ThreadId;
HANDLE threadhandle;
HANDLE signalfinished;
bool forcesignal;

static DWORD WINAPI workerthread(void *param)
{
    ThreadWorker *worker = (ThreadWorker*)param;
    worker->signalfinished = CreateSemaphore(NULL,0,1,NULL);
    worker->RunTask();
    ReleaseSemaphore(worker->signalfinished,1,NULL);
}

void StartWorker()
{
    CreateThread(NULL,NULL,ThreadWorker::workerthread,this,0,&ThreadId);
}

void WaitUntilWorkerFinished()
{
    DWORD waitresult;
    do
    {
        waitresult = WaitForSingleObject(signalfinished,1000);
    } while (waitresult!=WAIT_OBJECT_0 && !forcesignal);

}
virtual void RunTask()=0;
};

Then, I have a ThreadManager that manages a queue of ThreadWorker objects. Again, you can substitute mutex for CriticalSection. I prefer std::list to std::vector because vector is contiguous.

    class ThreadManager
{
    CRITICAL_SECTION critsec;
    std::list<ThreadWorker*> taskqueue;
public:
    ThreadManager(void)
    {
        InitializeCriticalSection(&critsec);
    }
    void AddTaskToQueue(ThreadWorker *task)
    {
        EnterCriticalSection(&critsec);
        taskqueue.push_back(task);
        LeaveCriticalSection(&critsec);
    }
    void ProcessTaskQueue()
    {
        while (true)
        {
            EnterCriticalSection(&critsec);
            ThreadWorker *thistask = taskqueue.front();
            taskqueue.pop_front();
            LeaveCriticalSection(&critsec);
            thistask->StartWorker();
        }
    }
    ~ThreadManager(void)
    {
        DeleteCriticalSection(&critsec);
    }
};

To add a task to a queue, we need a subclass that implements ThreadWorker.

class SomeWorkerTask : public ThreadWorker
{
public:
SomeWorkerTask(void);
virtual ~SomeWorkerTask(void);

void RunTask()
{
    std::cout << "Hello, I am a worker task runing on thread id " << ThreadId << std::endl;
}
};

Create a new instance of SomeWorkerTask and add it to the queue, then process the queue. In your case you will have different threads adding tasks to the queue and processing the queue, but I assume you will get the idea.

SomeWorkerTask *atask = new SomeWorkerTask();
ThreadManager manager;
manager.AddTaskToQueue(atask);
manager.ProcessTaskQueue();

If you want to know when the task has completed processing, you can call ThreadWorker::WaitUntilWorkerFinished from another thread or add the call to ProcessTaskQueue. You can modify the ThreadManager so you have one queue of waiting tasks, one queue of running tasks and a third queue of finished tasks. After you pop the task off the waiting queue, you add it to the running queue, and use the task's semaphore to determine when it has completed, then add it to the finished tasks/remove it from running tasks. Note that standard containers such as vector,map and list are not thread safe so you should always surround operations that insert/remove from the container with a mutual exclusion lock such as a critical section or mutex.

Hope that helps.

Ian Thompson
  • 644
  • 1
  • 6
  • 11
0

you need a mutex for the queue which is accessed concurrently from several threads, and since you are testing if runningQueue.size() < 128, you also need a condition variable. If your compiler supports c++ 11, std::mutex and std::condition_variable will do the job, or boost::mutex and boost::condition_variable are OK.
So it's kind of like this:

onAcceptClient(int client_fd)
{
    boost::mutex::scoped_lock waitLock(mWaitMutex);
    waitQueue.insert(client_fd);
}  

while(1) // this must run in single thread
{
    boost::mutex::scoped_lock lock(mRunningMutex);
    // wait if the runningQueue.size >= 128
    while(runningQueue.size() >= 128)
        mRunningCond.wait(lock);

    if (runningQueue.size() < 128)
    {
        int diff = 128 - runningQueue.size() ;
        for (int a = 0; a < diff; a++)
        {
            boost::mutex::scoped_lock waitLock(mWaitMutex);
            int cfc = waitQueue.pop();
            worker w;
            w.run(cfc);
            runningQueue.insert(w);
        }
    }
} 
jfly
  • 7,715
  • 3
  • 35
  • 65