1

I am writing a program in ANSI C (1972) and i have to use fixed amount of threads. I am basically read a big file with records like a .csv with latitude and longitude data and i have to process them. The problem is that i cannot wait 2 weeks on a 100.000.000 lines file, and i need to use threads or forking.

Basically i read the .txt file like this

FILE *file2 = fopen ( lat_long_file, "r" );
if (file2 != NULL)
{
    char line2 [128];

    while (fgets(line2, sizeof line2, file2) != NULL)
    {
        //fputs(line2, stdout);

        char *this_record = trimqq(line2);

        // .....
        // ..... STUFF TO DO (here i must send data to thread function like in JAVA)
        // Thread temp_thread = new Thread(new ThreadClass(arguments ....));
        // temp_thread.start(); <- this is how i would do if i was programming in JAVA
        // .....

    }
}

main_1.c (threading with pthread.h)

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS     10

static int current_threads = 0;


void *wait(void *t)
{
   int i;
   long tid;

   tid = (long)t;

   // sleep(1);

   system("sleep 3; date;");

   printf("Sleeping in thread\n");
   printf("Thread with id %lu  ...exiting\n",tid);

   pthread_exit(NULL);
}

int main ()
{
   int rc;
   int i;
   pthread_t threads[NUM_THREADS];
   pthread_attr_t attr;
   void *status;

   // Initialize and set thread joinable
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

   for( i=0; i < NUM_THREADS; i++ )
   {
     // cout << "main() : creating thread, " << i << endl;
      rc = pthread_create(&threads[i], NULL, wait, (void *)(intptr_t)i );

      if (rc)
      {
        // cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }

    // free attribute and wait for the other threads
    pthread_attr_destroy(&attr);
    for( i=0; i < NUM_THREADS; i++ )
    {
        rc = pthread_join(threads[i], &status);
        if (rc)
        {
            printf("Error:unable to join %d\n",rc);
            exit(-1);
        }

        printf("Main: completed thread id : %d",i);
        printf(" exiting with status : %p\n",status);
    }

    printf("Main: program exiting.\n");

    pthread_exit(NULL);
}

The output i am getting to this id

Sleeping in thread
Sleeping in thread
Thread with id 5  ...exiting
Sleeping in thread
Thread with id 0  ...exiting
Sleeping in thread
Sleeping in thread
Sleeping in thread
Thread with id 9  ...exiting
Thread with id 1  ...exiting
Sleeping in thread
Sleeping in thread
Thread with id 7  ...exiting
Thread with id 3  ...exiting
Thread with id 2  ...exiting
Thread with id 6  ...exiting
Sleeping in thread
Thread with id 4  ...exiting
Sleeping in thread
Thread with id 8  ...exiting
Main: completed thread id : 0 exiting with status : (nil)
Main: completed thread id : 1 exiting with status : (nil)
Main: completed thread id : 2 exiting with status : (nil)
Main: completed thread id : 3 exiting with status : (nil)
Main: completed thread id : 4 exiting with status : (nil)
Main: completed thread id : 5 exiting with status : (nil)
Main: completed thread id : 6 exiting with status : (nil)
Main: completed thread id : 7 exiting with status : (nil)
Main: completed thread id : 8 exiting with status : (nil)
Main: completed thread id : 9 exiting with status : (nil)
Main: program exiting.

And execution time is 3 seconds

if i change system("sleep 3; date;"); to system("sleep 10; date;");, execution time will be 10 seconds, while i expect to sleep at every call of the void *wait(void *t) function ...

main_2_fork (i also tried fork, but no use)

#include  <stdio.h>
#include  <string.h>
#include  <sys/types.h>
#include <stdlib.h>

#define   MAX_COUNT  200
#define   BUF_SIZE   100

int random_number(int min_num, int max_num);

void  main(void)
{
    int numforks = 0;
    int maxf = 5;
    int status;

    char   buf[BUF_SIZE];

    pid_t PID; 

    int job = 0;
    for(job; job <= 10; job++)
    {
        // fork() = make a copy of this program from this line to the bottom
        PID = fork();

        int fork_id = random_number(1000000,9999999);

        if (PID < 0) 
        {
            // if -1 then couldn't fork ....
            fprintf(stderr, "[!] Couldn't fork!\n");
            exit(1);
        }
        if (( PID == 0 ))
        {
            // 0 = has created a child process
            exit(0);
        }
        else            
        {
            // means that PID is 1 2 3 .... 30000 44534 534634 .... whatever
            // increment the fork count
            numforks++;

            sprintf(buf, "FORK[#%d] BEGIN pid=%d num_forks=%d\n",fork_id,PID,numforks);
            write(1, buf, strlen(buf));

            // sleep(random_number(1,2));

            char str[300];
            sprintf(str,"sleep %d; ps ax | wc -l",random_number(1,4));
            puts(str);

            // OUTPUT COMMAND BEGIN
            FILE *command_execute = popen(str, "r");
            char buf[256];
            int increment = 0;
            while (fgets(buf, sizeof(buf), command_execute) != 0)
            {
                printf("LINE[%d]:%s",increment,buf);
                increment++;
                break;
            }
            pclose(command_execute);
            // OUTPUT COMMAND END   

            // block to not do extra forks
            if (numforks > maxf)
            {
                for (numforks; numforks > maxf; numforks--)
                {
                    PID = wait(&status);
                }
            }

            sprintf(buf, "FORK[#%d] END pid=%d num_forks=%d\n",fork_id,PID,numforks);
            write(1, buf, strlen(buf));
        }

        // sleep(1);
    }
}

int random_number(int min_num, int max_num)
{
    int result=0,low_num=0,hi_num=0;
    if(min_num<max_num)
    {
        low_num=min_num;
        hi_num=max_num+1; // this is done to include max_num in output.
    }
    else
    {
        low_num=max_num+1;// this is done to include max_num in output.
        hi_num=min_num;
    }
    srand(time(NULL));
    result = (rand()%(hi_num-low_num))+low_num;
    return result;
}

the output is :

FORK[#7495656] BEGIN pid=29291 num_forks=1
sleep 1; ps ax | wc -l
LINE[0]:312
FORK[#7495656] END pid=29291 num_forks=1
FORK[#9071759] BEGIN pid=29296 num_forks=2
sleep 4; ps ax | wc -l
LINE[0]:319
FORK[#9071759] END pid=29296 num_forks=2
FORK[#2236079] BEGIN pid=29330 num_forks=3
sleep 4; ps ax | wc -l

......

And the execution is not parallel ... rather it is executing one by one, even though i se that the fork() function si creating child processes in ps ax | grep 'fork2.exe' ...

Here is an example with what i want : http://www.javacodegeeks.com/2013/01/java-thread-pool-example-using-executors-and-threadpoolexecutor.html

Where you put let's say 5 to be the maximum threads at a time.

QUESTIONS

  1. Why void *wait(void *t) function is not sleeping properly? Why pthread is executing them one by one rather then in parallel ?
  2. What should i do to make thread pool with fixed maximum threads in C?

Thank you very much.

Damian
  • 761
  • 1
  • 9
  • 26
  • 1
    There is no 1972 ANSI C. Do you mean C89 which was standardized 1989? – fuz Dec 13 '14 at 16:10
  • i don't know for sure, i am a young programmer, what can i tell you is that i made an `start.sh` that contains this : `clear; rm pthreads.exe; gcc -m64 -o pthreads.exe pthreads.c -lpthread; ./pthreads.exe $1 $2; echo "\n"; ls -al;` – Damian Dec 13 '14 at 16:11
  • Are you working with Cygwin? – fuz Dec 13 '14 at 16:12
  • i am working with `gcc` in `Ubuntu 14.04.1 LTS`, `gcc` is `gcc version 4.8.2 (Ubuntu 4.8.2-19ubuntu1)` installed with `apt-get install gcc`, i don't know for sure what `Cygwin` is – Damian Dec 13 '14 at 16:15
  • 1
    You do realize that programs on Linux do not have a `.exe` suffix? – fuz Dec 13 '14 at 16:21
  • Also, you might run into problems: `wait()` is a standard function of the operating system. You should not use that name for your own functions. – fuz Dec 13 '14 at 16:22
  • If your performance is constrained by I/O, then multithreading isn't necessarily going to help. Multithreading is most useful when you can do lots of expensive computations *while* also performing I/O at the same time. – Kerrek SB Dec 13 '14 at 16:22
  • 1
    @FUZxxl -> yes, i like tu use `.exe` prefix in linux, even though it is not necessary @Kerrek SB -> i am not is constrained by I/O, i just want to make multiple operations in the same time, i did this in `JAVA`, works perfectly, i can put 10, 100, 1000 fixed threads, how many i want, but in `C` i don't know how – Damian Dec 13 '14 at 16:28
  • In your example with `fork()`, you do realize that you do not actually increase the number of running programs? This is because you immediately kill the parent after forking. – fuz Dec 13 '14 at 16:54
  • What gives you the idea that your `wait()` is not sleeping properly? If you have 10 threads and they sleep 3 seconds each, that gives a total sleeping time of 3 seconds because they all sleep at the same time. – fuz Dec 13 '14 at 16:57
  • you are talking about `pthread.h` example right? Oh whell ... let me put it this way, if i had a function `wait()` in `JAVA` that inside that function i would write `Thread.sleep(random(1000,5000))` i will see on my screen that every thread started at the same time (1,2,3,4,5), but i will se how they stop at different time (4,3,1,5,2). How can i achieve that in `C`? The main reason is that instead of `system("sleep 3 & date;");` it will be a command with `curl.h` class, and i don't want to stop all the execution of the program for one thread, do you understand ? – Damian Dec 13 '14 at 17:13
  • for example i put under the `system("sleep ;")` a function `curlate_and_print("http://somesite.com/test.php");` witch gives the following output of `test_curl` and i get this output : http://pastebin.com/Hf5giHD5 , it is executing lie i put 10 lines with `curlate_and_print("http://somesite.com/test.php");`, why is that, should i see this : http://pastebin.com/GA6eS7vk ? thank you for your answers – Damian Dec 13 '14 at 17:31
  • Add at least your modified `wait()` function to your post or the pastebin; the exact code is needed to answer this question. – Armali Mar 16 '17 at 09:29

1 Answers1

1

I cannot comment yet so I'll reply here. Your threaded example takes exactly the amount of time one thread (your wait() function) sleeps. This said, it'd have been clearer if you wrote it this way:

void *some_running_task(void *t)
{
   int i;
   long tid = (long)t;

   printf("Sleeping in thread #%lu ...\n", tid);
   system("sleep 3; date;");

   printf("Thread with #%lu ... exiting\n", tid);
   pthread_exit(NULL);
}

As @fuzxxl says, there is a wait() in standard thread libraries so you should not use it.

All your threads start at the same instant, to a few tens of microseconds maybe. They all start at the same moment hence all end 3 seconds later. Change the sleep instruction to 10 seconds and your program lasts 10 seconds.

What you probably want is a pool of threads that constantly keeps the same number of threads busy until the whole work is done: fire a thread until you reach the maximum pool count for as long as there is data to process. Synchronising a thread pool is prone to deadlocks though. You might as well have each thread process its own section of the file... unless what you want is dedicate a thread to a single line.

One issue I see with parallelism here is sequence. If you care about the sequence order, the threads will not necessarily yield data in the same order you read lines. So unless you put the processed data along with the line number in a database, you will lose the sequence order.

Another issue I see pointing in is outputting processed data. It requires proper synchronisation to avoid one thread output not to mess another one's (iif threads are supposed to print out their data, of course).

It's a little unclear to me what you expect from parallelism here — apart from speeding up the global processing time. If you want a bunch of threads to process a bunch of lines you'll come up anyway with something similar and as simple as splitting your source data file... if it can be done at all of course. But at least you can control the sequence of data as you read each line and you can then fall back on firing long-running single-threaded processes instead of a long running multi-threaded application. Single-threaded applications are easier to program than multi-threaded ones.

Is also the use of C mandatory that you cannot use, say, Python or Cython? The biggest advantage is sparing you the hassle of thread synchronisation.

Anyway there are more than one way to speed up linear data processing. For instance UNIX sed can be used to pipe a certain amount of lines to a processing application. Run as many sed | <processing app> as you need. Or you might just pipe split portions of your data file into a processing application written in C or Python.

Just giving headlines.

  • First of all, thank you very much for your answers, i really appreciate them! `1`-> `fire a thread until you reach the maximum pool count for as long as there is data to process` yes, this is exacly i need, basically the function `void *some_running_task(void *t)` runs a `curlate_and_print("http://somesite.com/test.php");` with post parameters, and it must be something like in the background, and i need a max of 10 of those background function at a time, no more. `2-> requires proper synchronisation` as soon as a thread begins or finishes, `printf` status or `write()` in the same memory zone – Damian Dec 13 '14 at 17:36
  • `3-> use, say, Python or Cython?` trust me, if was up to me i would use `JAVA`, but the project must be done in `C` ...., `4->` `sed | `do you have an tutorial or something? i only used sed for string parsing or converting .... – Damian Dec 13 '14 at 17:37
  • `5->` this is the output i am getting : http://pastebin.com/Hf5giHD5 and this is the output i want http://pastebin.com/GA6eS7vk – Damian Dec 13 '14 at 17:40
  • I think you might be better off with some shell scripting instead of plain C. Take a look at [this bash trick to wait for multiple processes](http://jeremy.zawodny.com/blog/archives/010717.html). Hope it gives you a starting point. It was pointed to from a [question](http://stackoverflow.com/questions/356100/how-to-wait-in-bash-for-several-subprocesses-to-finish-and-return-exit-code-0) asked here. –  Dec 13 '14 at 17:47
  • -> trust me, i already tried with bash, like writing a file `sh/t.34325.sh` and after that running it from my `C` program with `sh sh/t.34325.sh &`, and it is not sitting in background, it is running one by one .... – Damian Dec 13 '14 at 17:52
  • Use sed to filter out lines: `sed -n 1,10000p` prints only the first 10,000th lines of a text file for instance. –  Dec 13 '14 at 17:53
  • -> do you say that i have to emulate threads and run 10 instances of my program on a splited big file? This is what you say? – Damian Dec 13 '14 at 17:56
  • You might indeed split your data source and have each part processed by a dedicated instance of your program, yes. It's not emulating threads rather than waiting for background processes from a parent script. All your C program needs to do is read and process every line from the standard input. But again, from what I've understood and unless there is some significant business logic to implement in C you don't need it to just spawn sub-processes as bash can indeed run and wait for background processes too. –  Dec 13 '14 at 18:18
  • `run and wait for background processes too` i tried that, it dosen't work, besides that i have i have to use functions from `curl.h` to send back responses to another server, and yes, it can actually be an solution to make another `create_threads.exe` running 10 times `main.exe`, but there is another problem, this program i am making it has to be compatible on both 64 and 32 systems, i am also created 2 versions of `main.exe` for 64 and 32 and a script in `shell` that decides witch of them will be executed, and i must do the same for `create_threads.exe` witch means that it is more complicated – Damian Dec 13 '14 at 18:27