0

I discovered that openmp doesn't support while loops( or at least doesn't like them too much). And also doesn't like the ' != ' operator.

I have this bit of code.

int count = 1;
#pragma omp parallel for
    while ( fgets(buff, BUFF_SIZE, f) != NULL )
    {
        len = strlen(buff);
        int sequence_counter = segment_read(buff,len,count);
        if (sequence_counter == 1)
        {
            count_of_reads++;
            printf("\n Total No. of reads: %d \n",count_of_reads);
        }
    count++;
    }

Any clues as to how to manage this ? I read somewhere ( another post on stackoverflow included) that I can use a pipeline. What is that ? and how to implement it ?

Massimiliano
  • 7,842
  • 2
  • 47
  • 62
Sid5427
  • 721
  • 3
  • 11
  • 19
  • Provide a link to which stackoverflow post you read that, please. – Shahbaz May 29 '13 at 15:14
  • 1
    @Shahbaz, I think he may be referring to this SO post http://stackoverflow.com/questions/8121077/fread-slow-performance-in-openmp-threads –  May 30 '13 at 08:29
  • actually ... this one .. http://stackoverflow.com/questions/7532067/parallelize-while-loop-with-openmp but thats also relevant ! – Sid5427 May 30 '13 at 09:48

3 Answers3

14

It's too bad people are so quick to select the best answer. Here is my answer.
First, you should read the file into a buffer with something like fread. This is very quick. An example of how to do this can be found here http://www.cplusplus.com/reference/cstdio/fread/

Then you can operate on the buffer in parallel with OpenMP. I have implemented most of this for you. Below is the code. You did not provide the segment_read function so I created a dummy one. I used a few functions from C++ such as std::vector and std::sort but with a little more work you could do this in pure C as well.

Edit: I edited this code and was able to remove the sorting and critical section.

I compiled with g++ foo.cpp -o foo -fopenmp -O3

#include <stdio.h>
#include <omp.h>
#include <vector>

using namespace std;

int segment_read(char *buff, const int len, const int count) {
  return 1;  
}

void foo(char* buffer, size_t size) {
    int count_of_reads = 0;
    int count = 1;
    std::vector<int> *posa;
    int nthreads;

    #pragma omp parallel 
    {
        nthreads = omp_get_num_threads();
        const int ithread = omp_get_thread_num();
        #pragma omp single 
        {
            posa = new vector<int>[nthreads];
            posa[0].push_back(0);
        }

        //get the number of lines and end of line position
        #pragma omp for reduction(+: count)
        for(int i=0; i<size; i++) {
            if(buffer[i] == '\n') { //should add EOF as well to be safe
                count++;
                posa[ithread].push_back(i);
            }
        }

        #pragma omp for     
        for(int i=1; i<count ;i++) {    
            const int len = posa[ithread][i] - posa[ithread][i-1];
            char* buff = &buffer[posa[ithread][i-1]];
            const int sequence_counter = segment_read(buff,len,i);
            if (sequence_counter == 1) {
                #pragma omp atomic
                count_of_reads++;
                printf("\n Total No. of reads: %d \n",count_of_reads);
            }

        }
    }
    delete[] posa;
}

int main () {
  FILE * pFile;
  long lSize;
  char * buffer;
  size_t result;

  pFile = fopen ( "myfile.txt" , "rb" );
  if (pFile==NULL) {fputs ("File error",stderr); exit (1);}

  // obtain file size:
  fseek (pFile , 0 , SEEK_END);
  lSize = ftell (pFile);
  rewind (pFile);

  // allocate memory to contain the whole file:
  buffer = (char*) malloc (sizeof(char)*lSize);
  if (buffer == NULL) {fputs ("Memory error",stderr); exit (2);}

  // copy the file into the buffer:
  result = fread (buffer,1,lSize,pFile);
  if (result != lSize) {fputs ("Reading error",stderr); exit (3);}

  /* the whole file is now loaded in the memory buffer. */
  foo(buffer, result);
  // terminate


  fclose (pFile);
  free (buffer);
  return 0;
}
  • i absolutely adore this answer. Sorry for the quick "best answer". most questions seem to get them asap. – Sid5427 May 30 '13 at 09:57
  • 1
    I edited the code (again), the inner loop over the threads was wrong to do. The main problem I was struggling with is that the threads come in randomly so segment_read is likely not being called in order. That may not be a problem. The good news is that the variable with the position is filled in order. In other words posa[0] is the vector of the lowest positions and posa[7] (using 8 threads) is the vector with the highest positions. So if you need the positions in order you got them. Originally, I used sort() and a critical section to do this but the latest code does not need that. –  May 30 '13 at 13:57
  • thats been my problem mostly, I wanted the lines to be read sequentially, but some clauses in segment_read might terminate that thread early. Would that have any implications ? My whole idea was the entire segment_read function be running in parallel. i have a 8 core machine, so you can assume 8 segment_reads running in 8 different lines of the file. – Sid5427 Jun 02 '13 at 20:34
  • 1
    The lines are read sequentially within a thread. The threads, however, are run randomly. But as long as you save the results for each thread (like I did in posa) then you can loop over them in the end and get the results sequentially. Threads terminating early won't make any difference except you may be able to make further optimizations (such as trying schedule(dynamic)). –  Jun 03 '13 at 07:43
  • any clues for replacing this part ? posa = new vector[nthreads]; posa[0].push_back(0); – Sid5427 Jun 11 '13 at 12:19
  • Why don't create a new question with what you're trying to do? You can reference this question and say you have made more progress but have some new problems. That way others can help you as well. –  Jun 11 '13 at 12:29
  • This solution may not be practical if the file is large (multiple Gb) and you are trying to minimize memory impact of your program. – lynxoid Oct 09 '14 at 15:57
  • it seems to me that lines that happen to straddle the portion of buffer each thread looks at are not processed correctly, no? – ddevienne Nov 04 '15 at 14:59
4

One way to implement "parallel while" in OpenMP is to use a while loop that create tasks. Here is a general sketch:

void foo() {
    while( Foo* f = get_next_thing() ) {
#pragma omp task firstprivate(f)
        bar(f);
    }
#pragma omp taskwait
}

For the specific case of looping over fgets, note that fgets has inherently sequential semantics (it gets the "next" line), so it would need to be called before launching the task. It would also be important for each task to operate on its own copy of the data returned by fgets, so that a call to fgets does not overwrite the buffer being operated on by a previous task.

Arch D. Robison
  • 3,829
  • 2
  • 16
  • 26
  • Could those tasks invoke functions ? i.e. a task calling a function, performing the function on the char array, and return a value ? – Sid5427 May 29 '13 at 18:22
  • Yes, the tasks can invoke functions. Where you have to be careful is making sure no two concurrent tasks interfere with each other. For example, if I forgot to write "firstprivate(f) in my example, then when bar(f) got around to actually launching the value of f might have already disappeared or been overwritten by the next iteration. – Arch D. Robison May 29 '13 at 23:16
  • 3
    The way `f` is declared, it is firstprivate by default and the `firstprivate` clause in the `task` construct is redundant. – Hristo Iliev May 30 '13 at 11:55
1

First, even though it is very close, but openmp doesn't magically make your code parallel. It works with for because for has lower and upper bounds that it can understand. Openmp uses those bounds to divide work among different threads.

There is no such thing possible with a while loop.

Second, how do you expect your task to be parallelized? You are reading from a file, where probably sequential access is going to give you better performance than parallel access. You might parallelize segment_read (based on its implementation).

Alternatively, you may want to overlap file read with processing. For that, you need to use more low level functions such as Unix's open and read functions. Then, do asynchronous reads, meaning you send a read request, process the last read block and then wait for the read request to finish. Search for "linux asynchronous io" for example to read more on this.

Using a pipe might not actually help you much. That would depend on many internals of the pipe that I'm not very familiar with. However, if you have a big enough memory, you may also want to consider loading the whole data first, then processing it. That way, loading the data is done as fast as possible (sequentially) and then you can parallelize its processing.

Shahbaz
  • 46,337
  • 19
  • 116
  • 182