1

I want all threads to read from the same structure. I did it in the past by adding threads inside the loop which reads from the structure but this time I need the structure to be opened inside void "dowork" as my example shows.

I have the following code:

struct word_list {
    char word[20];
    struct word_list * next;
};

struct word_list * first_word = NULL;
//other function which loads into struct is missing cause it's not relevant


//in main()
pthread_t thread_id[MAX_THREADS];
int max_thread = 10;
for(t = 0 ; t < max_thread; t++)
{
    pthread_mutex_lock(&thrd_list);
    arg_struct *args = calloc(1, sizeof(*args));
    args->file = file;
    args->t = t;
    args->e = ex;
    pthread_mutex_unlock(&thrd_list);
    if(pthread_create(&thread_id[t],NULL,dowork,args) != 0)
    {
        t--;
        fprintf(stderr,RED "\nError in creating thread\n" NONE);
    }
}

for(t = 0 ; t < max_thread; t++)
    if(pthread_join(thread_id[t],NULL) != 0)
    {
        fprintf(stderr,RED "\nError in joining thread\n" NONE);
    }




void *dowork(void *arguments)
{
    struct word_list * curr_word = first_word;
    char myword[20];
    while( curr_word != NULL )
    {
        pthread_mutex_lock(&thrd_list);
        strncpy(myword,curr_word->word,sizeof(myword) - 1);
        pthread_mutex_unlock(&thrd_list);

        //some irrelevant code is missing

        pthread_mutex_lock(&thrd_list);
        curr_word = curr_word->next;
        pthread_mutex_unlock(&thrd_list);
    }

}

How can I read different elements from the same structure in all threads?

Sam Reina
  • 305
  • 1
  • 7
  • 22
  • What *exactly* are you trying to do? Have each thread walk a full-linked list? Have each thread process one node in the list? As-written you have an odd protection loop setup, as it is easily conceivable one or more threads will only process the partial node list because they read a null next-ptr before `main()` can append the next node. – WhozCraig Sep 13 '13 at 15:33
  • I need each thread to process only one node from the list and all threads process the same list – Sam Reina Sep 13 '13 at 15:36
  • I removed a lot of code. In the original one there is no null next error, there is a condition for that – Sam Reina Sep 13 '13 at 15:37
  • If each thread is to only process one node, what is the purpose of the while-loop in the thread proc? And the work-to-do is precisely the reason for a thread-param, is there some specific reason you're adverse to *using* it? – WhozCraig Sep 13 '13 at 15:42
  • actually there is a reason. each thread is using a socket and on each socket I have to make the same operations but using different nodes from the same structure – Sam Reina Sep 13 '13 at 15:52
  • the code is more complex than that and I cannot put the threads inside the loop – Sam Reina Sep 13 '13 at 15:52
  • So again, *what are you trying to do?* Is each node to be processed *once* by one thread (like a work *queue*)? All nodes processed once *per* thread (like a work *crew*)? – WhozCraig Sep 13 '13 at 16:00
  • on each thread I open a different socket and I use a different node from the same structure(same structure on all threads but they must process the nodes until no node left). I tried to explain as accurate as I could – Sam Reina Sep 13 '13 at 16:25
  • Concentrate on the *list* and your thread-pool. The "structure" you refer to is a *list*. And I ask (again) is each thread tasked with processing *all* nodes in the list? *One* node in the list? Let me ask it this way: How many times should any single node in your linked list be "processed"? Once? Once per-thread? It *sounds* like you want a work *queue*. I.e. a thread pool of size N where each thread is responsible for pulling nodes off the list one at a time, handling it, and going back to the list for more, but no node is handled more than once *globally*. Is *that* accurate?? – WhozCraig Sep 13 '13 at 16:31
  • I want each node to be processed only once, it doesn't matter on which thread. – Sam Reina Sep 13 '13 at 16:34
  • OK. *Thank you*. Suddenly the curtain is lifted and we can see the show. You need a multi-threaded work queue (and for the record a work *crew* has each thread processing *all* nodes, a work *queue* processes each node only once). It isn't difficult to setup, but it is somewhat tedious. And obviously your current setup is incorrect. I'll try and setup a sample if I have the time, but there are plenty on the web. Search for "multi-threaded work queue pthread" and you'll likely get plenty of hits. I'll review your code now that I know what you're trying to do and see what needs to be fixed. – WhozCraig Sep 13 '13 at 16:38
  • OK. Posted. Hope you get something out of it. – WhozCraig Sep 13 '13 at 18:13

3 Answers3

2

Let me see if i understand this correctly?

  • struct word_list describes some kind of linked list
  • You want to spread out the elements of that list among the threads.

If this is what you want then I would just pop the elements one by one from the list and write the pointer to the rest back:

volatile struct word_list * first_word = NULL; // important to make it volatile

void *dowork(void *arguments)
{
    struct word_list * curr_word;
    char myword[20];
    do {
        // gain exclusive access to the control structures
        pthread_mutex_lock(&thrd_list);

        // get the next element
        curr_word = first_word;
        if (curr_word == NULL) {
            pthread_mutex_unlock(&thrd_list);
            break;
        }

        // notify the remaining threads what the next element is
        first_word = curr_word->next;

        pthread_mutex_unlock(&thrd_list);

        // do whatever you have to do

    } while (1);

}

Make an additional global volatile struct word_list * next_word if you don't want to modify first_word. Make sure to make it volatile, else the compiler may perform optimisations that lead to weird results.

Sergey L.
  • 21,822
  • 5
  • 49
  • 75
  • 2
    `volatile` is not the silver-bullet for thread-synchronization; *synchronization objects*, properly used, are the proper ammunition for that weapon. – WhozCraig Sep 13 '13 at 15:48
  • @WhozCraig Not sure what you are implying. But the answer is right on. It is using a mutex properly and suggesting volatile where it is required. Are you implying that the proper use of sync objects renders the use of volatile unnecessary? – Ziffusion Sep 13 '13 at 16:00
  • 2
    @Ziffusion That is precisely what I'm implying. There are a plethora of questions and answers on `volatile` on SO, so I'm not going to run down that rabbit hole yet-again. This is not the intended use for it. Though a C++ related question, [this is one of my more favorite links](http://stackoverflow.com/questions/4557979/when-to-use-volatile-with-multi-threading), as it has *many* links and discussions. – WhozCraig Sep 13 '13 at 16:14
  • 2
    I cannot find where `volatile` is required for this example. – user7116 Sep 13 '13 at 16:14
  • @WhozCraig Well, that implication may be incorrect then. Volatile is *absolutely* required when a value can be changed in ways that is not apparent to the compiler from the flow of the code. This may be true even if sync objects are being used properly. Multithreaded applications and hardware DMA are good candidates for this. Volatile simply instructs the compiler to not do any caching at the compiler level (registers etc.), and *always* do a memory access for the value. – Ziffusion Sep 13 '13 at 16:35
  • @WhozCraig: I can see why people use volatile - there's no way to prevent the compiler from optimising the code to store the shared value in a register, which would break the synchronisation requirements. So they look for something in the language to fix the problem and find volatile. But then, prior to C++11, the language was never designed for multiple threads and you'd have to find a platform specific solution (some OS call perhaps). – Skizz Sep 13 '13 at 16:43
  • 2
    You have a deadlock in your code: when the break is executed, the mutex is not released so only the first thread to reach the end of the list will exit, the rest will wait forever. – Skizz Sep 13 '13 at 16:45
  • @Skizz I know. The compiler *cannot* optimize away that which violate observable preconception. Ex: it cannot "optimize out" a global variable eval after a non-inlined function call; it has *no way of knowing* whether the function changed the global. The purpose of `volatile` is to prevent the compiler from assuming the code *it generates* is ruler-and-lord for all *observable* changes to a variable. The roots of it (hardware overlay, interrupt handler, etc.) are *stellar* examples of such changes that are not observable by the compiled code. This question (and answer) has no such conditions. – WhozCraig Sep 13 '13 at 17:08
  • @WhozCraig You need to add "multi-threaded" applications to that list. The complier generates code assuming a single thread. The variable "first_word" in the suggested solution can change value from an alternate thread. I think that qualifies for use of volatile. – Ziffusion Sep 13 '13 at 17:20
  • @Skizz You are right, I didn't notice the deadlock there. Corrected. About `volatile`: It is totally required here as the compiler needs to know to fetch `curr_word = first_word;` from memory on each access rather then relying on optimisations. – Sergey L. Sep 13 '13 at 17:52
2

So you want to process lots of data by splitting the work across multiple threads. Your solution is not very efficient because your threads are going to be fighting a lot of who owns the mutex and you can't be sure the work is evenly spread across all your threads. So, for example, threads 0 and 1 could be getting all the work as they get first access to the mutex and all the other threads are just idle all the time.

If you want to improve the performance, you need to do the following:-

  • Make all the threads independent of each other, i.e. remove synchronised data
  • Ensure memory coherency between threads, i.e. make sure the data for item n+1 is next to data for item n. This helps the CPU access memory better. Jumping around RAM a lot will generate lots of cache misses which kill performance.

So, in your program, instead of a single linked list that is shared across all threads, have a linked list for each thread:-

typedef struct _word_list
{
  //data
  struct _word_list *next;
} word_list;

static const int num_threads = 4; // actually, setting this to number of CPUs at run time would be better

word_list 
  *lists [num_threads] = {0};

void ReadWords ()
{
  word_list
    **current [num_threads];

  for (int i = 0 ; i < num_threads ; ++i)
  {
    current = &lists [i];
  }

  int destination = 0;

  while (read some valid input)
  {
    *current [destination] = malloc (sizeof (word_list));
    // set data 
    current [destination] = &current [destination]->next;

    destination = (destination + 1) % num_threads;
  }

  // data has now been read and stored into a series of linked lists, each list having
  // the same number of items (or one less)
}


void create_threads ()
{
   for (int i = 0 ; i < num_threads ; ++i)
   {
      // create thread, and pass it the value of lists [i]
   }
}

void do_work (...)
{
   for (word_list *item = passed in parameter ; item ; item = item->next)
   {
     process data
   }
}

In this program (just made it up, haven't checked it) I create four linked lists and evenly assign data to the lists. Then I create the threads and give each thread one of the linked lists. Each thread then processes its own linked list (they are separate lists).

Each thread can now run at full speed and never has to wait on a mutex to get data. Memory accesses are reasonable but dependent on the allocator to a large extent. Using an array rather than a linked list would improve this, but you'd need to know the number of data items before allocating the arrays which might not be possible.

Skizz
  • 69,698
  • 10
  • 71
  • 108
  • is array faster than structure? should I change the was items are stored currently ? – Sam Reina Sep 14 '13 at 15:48
  • I'm loading the items from a file so I can count them while storing. Is array the solution to my problem? I like the idea you posted and your arguments are very solid. Thanks – Sam Reina Sep 14 '13 at 15:49
  • @SamReina: Arrays provide good memory usage, but you need to know how many items there are before creating the array. If you know the number of items before you start reading from the file, then an array will work better than a linked list. You can have a single array and give each thread a start index and item count. – Skizz Sep 16 '13 at 09:20
2

If I understand your requirements now (and I think I finally do), you need to treat your word list as a work queue. To do that requires a notification mechanism that allows the "pusher" of items into the queue to inform the "pullers" that new data is available. Such a system does exist in pthreads: the marriage of a condition variable, a mutex, and the predicate(s) they manage for control flow.

This is one example of how to use it. I've tried to document what is going on at each step for you, and hopefully you will understand.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

// defined the number of threads in our queue and the number
//  of test items for this demonstration.
#define MAX_THREADS  16
#define MAX_ITEMS    128*1024

typedef struct word_list
{
    char word[20];
    struct word_list * next;

} word_list;

// predicate values for the word list
struct word_list * first_word = NULL;   // current word.
int word_shutdown = 0;                  // shutdown state

// used for protecting our list.
pthread_mutex_t wq_mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wq_cv = PTHREAD_COND_INITIALIZER;

// worker proc
void *dowork(void*);

int main()
{
    pthread_t thread_id[MAX_THREADS];
    int i=0;

    // start thread pool
    for(i=0; i < MAX_THREADS; ++i)
        pthread_create(thread_id+i, NULL, dowork, NULL);

    // add MAX_ITEMS more entries, we need to latch since the
    //  work threads are actively processing the queue as we go.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);
    pthread_mutex_unlock(&wq_mtx);

    // queue is empty, but threads are all still there waiting. So
    //  do it again, just to proves the pool is still intact.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // again wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);

    // queue is empty, and we're not adding anything else. latch
    //  the mutex, set the shutdown flag, and tell all the threads.
    //  they need to terminate.
    word_shutdown = 1;
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_broadcast(&wq_cv);

    for (i=0;i<MAX_THREADS; ++i)
        pthread_join(thread_id[i], NULL);

    return EXIT_SUCCESS;
}


// the work crew will start by locking the mutex, then entering the
//  work loop, looking for entries or a shutdown state
void *dowork(void *arguments)
{
    int n_processed = 0;
    while (1)
    {
        pthread_mutex_lock(&wq_mtx);
        while (first_word == NULL && word_shutdown == 0)
            pthread_cond_wait(&wq_cv, &wq_mtx);

        // we own the mutex, and thus current access to the predicate
        //  values it protects.
        if (first_word != NULL)
        {
            // pull the item off the queue. once we do that we own the
            //  item, so we can unlatch and let another waiter know there
            //  may be more data on the queue.
            word_list *p = first_word;
            first_word = p->next;
            if (p->next)
                pthread_cond_signal(&wq_cv);
            pthread_mutex_unlock(&wq_mtx);

            //
            // TODO: process item here.
            //
            ++n_processed;
            free(p);
        }
        else if (word_shutdown != 0)
            break;
    }

    // we still own the mutex. report on how many items we received, then
    //  one more signal to let someone (anyone, actually) know we're done.
    pthread_t self = pthread_self();
    printf("%p : processed %d items.\n",self, n_processed);
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_signal(&wq_cv);
    return NULL;
}

Sample Output: MAX_THREADS = 4 (your output will vary)

0x100387000 : processed 64909 items.
0x100304000 : processed 64966 items.
0x1000b5000 : processed 64275 items.
0x100281000 : processed 67994 items.

Sample Output: MAX_THREADS = 8

0x100304000 : processed 31595 items.
0x1000b5000 : processed 33663 items.
0x100593000 : processed 34298 items.
0x10040a000 : processed 32304 items.
0x10048d000 : processed 32406 items.
0x100387000 : processed 31878 items.
0x100281000 : processed 32317 items.
0x100510000 : processed 33683 items.

Sample Output: MAX_THREADS = 16

0x10079f000 : processed 17239 items.
0x101081000 : processed 16530 items.
0x101104000 : processed 16662 items.
0x100699000 : processed 16562 items.
0x10040a000 : processed 16672 items.
0x100593000 : processed 15158 items.
0x10120a000 : processed 17365 items.
0x101187000 : processed 14184 items.
0x100387000 : processed 16332 items.
0x100616000 : processed 16497 items.
0x100281000 : processed 16632 items.
0x100304000 : processed 16222 items.
0x100510000 : processed 17188 items.
0x10048d000 : processed 15367 items.
0x1000b5000 : processed 16912 items.
0x10071c000 : processed 16622 items.

And just because we can, with full global optimization enabled

Sample Output: MAX_THREADS = 32, MAX_ITEMS = 4194304

0x109c58000 : processed 260000 items.
0x109634000 : processed 263433 items.
0x10973a000 : processed 262125 items.
0x10921c000 : processed 261201 items.
0x108d81000 : processed 262325 items.
0x109a4c000 : processed 262318 items.
0x108f8d000 : processed 263107 items.
0x109010000 : processed 261382 items.
0x109946000 : processed 262299 items.
0x109199000 : processed 261930 items.
0x10929f000 : processed 263506 items.
0x109093000 : processed 262362 items.
0x108e87000 : processed 262069 items.
0x108e04000 : processed 261890 items.
0x109acf000 : processed 261875 items.
0x1097bd000 : processed 262040 items.
0x109840000 : processed 261686 items.
0x1093a5000 : processed 262547 items.
0x109b52000 : processed 261980 items.
0x109428000 : processed 264259 items.
0x108f0a000 : processed 261620 items.
0x1095b1000 : processed 263062 items.
0x1094ab000 : processed 261811 items.
0x1099c9000 : processed 262709 items.
0x109116000 : processed 261628 items.
0x109bd5000 : processed 260905 items.
0x10952e000 : processed 262741 items.
0x1098c3000 : processed 260608 items.
0x109322000 : processed 261970 items.
0x1000b8000 : processed 262061 items.
0x100781000 : processed 262669 items.
0x1096b7000 : processed 262490 items.

Hmmm. and I didn't use volatile in any of this. Must be time to buy a lotto ticket.

Anyway, I suggest doing some research on pthreads, particularly on mutex and condition variable control and their interactions. I hope this helps you out.

WhozCraig
  • 65,258
  • 11
  • 75
  • 141
  • thank you very much! I will try it in a couple of hours after I come back home. I will post an update – Sam Reina Sep 13 '13 at 18:30
  • +1 for cheeky "and I didn't use `volatile` in any of this" remark (and the answer of course ;-) –  Sep 13 '13 at 22:47
  • @WhozCraig: I have one more question .. How can affect your example the fact that I don't know how large is the structure and I want the records to be split by the number of threads not by MAX_ITEMS ? – Sam Reina Sep 14 '13 at 04:47
  • @SamReina They already are split (relatively) evenly among threads. The counts you see above should be evidence of that. The output you see is the number of items that thread handled during its lifetime before I shut the queue down. Regarding not knowing the number of items, thats entirely up to you. The sample above had to stop somewhere, but so long as the thread pool is continually extracting items there is no reason your feeder side couldn't push items on indefinitely. Perhaps I didn't understand the context of your question. – WhozCraig Sep 14 '13 at 04:51
  • no, you did perfectly and thanks again for your answer. I just have to figure out how to implement your solution to my code. It's kinda hard especially that I'm still learning and some of your code is kinda SF for me :) – Sam Reina Sep 14 '13 at 04:54
  • and I believe your example will work with thread args too, right ? – Sam Reina Sep 14 '13 at 04:55
  • @SamReina I can't stress this enough. Get a good source on POSIX thread programming. There a many books out there. O'Riley has good one. Addison-Wesley does too. It is super important you learn about the apis you see in this sample. This is actually a pretty simple pthread code block. some can get pretty nasty. – WhozCraig Sep 14 '13 at 04:56
  • Yes, it will work with thread args. Nothing stopping you from sending thread-specific data to each thread on startup using the `void*` param. I didn't use it here, but you certainly could. Do you now see how this is a fixed pool of threads that handle an arbitrarily unknown queue size ? Notice the number of items in the first three outputs all add to MAX_ITEMS. This was on purpose, to show you that the thread count can be big or small for the pool, depending on your need. The last test run was just a huge pool + huge item stack (over 4-million). Took a little time, but it worked, no problem. – WhozCraig Sep 14 '13 at 04:57
  • thanks for the tips but I don't intend to start coding in the future. I ordered a while ago a program, got the source and recently I wanted some additional modifications like instead doing one thread/one socket/one ip, I wanted multiple threads/multiple sockets/same ip (Just to finish the data quicker). But I couldn't get in touch with the coder who made my program so all I have to do is trying to understand your code so I can modify mine to work. Sorry again if I asked too many n00b questions. – Sam Reina Sep 14 '13 at 05:06
  • @SamReina No worries. Hopefully you got something out of it. This is a fairly textbook work queue example. There are better ones, even ones that use intrinsic atomic operations to avoid locks, but if you think this is a challenge to follow, those would turn you pale. Take your time. its worth it, and a helluva lot of fun. Glad to help. – WhozCraig Sep 14 '13 at 05:09