This answer is probably going to need a little refinement from the community, since it's been a long while since I worked in this environment, but here's a start -
Since you're new to multi-threading in C++, start with a simple project to create a bunch of pthreads doing a simple task.
Here's a quick and small example of creating pthreads:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
void* ThreadStart(void* arg);
int main( int count, char** argv) {
pthread_t thread1, thread2;
int* threadArg1 = (int*)malloc(sizeof(int));
int* threadArg2 = (int*)malloc(sizeof(int));
*threadArg1 = 1;
*threadArg2 = 2;
pthread_create(&thread1, NULL, &ThreadStart, (void*)threadArg1 );
pthread_create(&thread2, NULL, &ThreadStart, (void*)threadArg2 );
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
free(threadArg1);
free(threadArg2);
}
void* ThreadStart(void* arg) {
int threadNum = *((int*)arg);
printf("hello world from thread %d\n", threadNum);
return NULL;
}
Next, you're going to be using multiple opus decoders. Opus appears to be thread safe, so long as you create separate OpusDecoder objects for each thread.
To feed jobs to your threads, you'll need a list of pending work units that can be accessed in a thread safe manner. You can use std::vector
or std::queue
, but you'll have to use locks around it when adding to it and when removing from it, and you'll want to use a counting semaphore so that the threads will block, but stay alive, while you slowly add workunits to the queue (say, buffers of files read from disk).
Here's some example code similar from above that shows how to use a shared queue, and how to make the threads wait while you fill the queue:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <unistd.h>
void* ThreadStart(void* arg);
static std::queue<int> workunits;
static pthread_mutex_t workunitLock;
static sem_t workunitCount;
int main( int count, char** argv) {
pthread_t thread1, thread2;
pthread_mutex_init(&workunitLock, NULL);
sem_init(&workunitCount, 0, 0);
pthread_create(&thread1, NULL, &ThreadStart, NULL);
pthread_create(&thread2, NULL, &ThreadStart, NULL);
// Make a bunch of workunits while the threads are running.
for (int i = 0; i < 200; i++ ){
pthread_mutex_lock(&workunitLock);
workunits.push(i);
sem_post(&workunitCount);
pthread_mutex_unlock(&workunitLock);
// Pretend that it takes some effort to create work units;
// this shows that the threads really do block patiently
// while we generate workunits.
usleep(5000);
}
// Sometime in the next while, the threads will be blocked on
// sem_wait because they're waiting for more workunits. None
// of them are quitting because they never saw an empty queue.
// Pump the semaphore once for each thread so they can wake
// up, see the empty queue, and return.
sem_post(&workunitCount);
sem_post(&workunitCount);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
pthread_mutex_destroy(&workunitLock);
sem_destroy(&workunitCount);
}
void* ThreadStart(void* arg) {
int workUnit;
bool haveUnit;
do{
sem_wait(&workunitCount);
pthread_mutex_lock(&workunitLock);
// Figure out if there's a unit, grab it under
// the lock, then release the lock as soon as we can.
// After we release the lock, then we can 'process'
// the unit without blocking everybody else.
haveUnit = !workunits.empty();
if ( haveUnit ) {
workUnit = workunits.front();
workunits.pop();
}
pthread_mutex_unlock(&workunitLock);
// Now that we're not under the lock, we can spend
// as much time as we want processing the workunit.
if ( haveUnit ) {
printf("Got workunit %d\n", workUnit);
}
}
while(haveUnit);
return NULL;
}