3

Side Note: I'm beginning to learn how to use pthreads and I'm starting to get the concept. I've been using this example script (written in C++) here to manage a Merge Sort with threads: https://www.geeksforgeeks.org/merge-sort-using-multi-threading/

Since I'm writing my own merge sort in C rather than C++, I rewrote this example script to test and noticed an issue. Rather than MAX 20 for an array of 20 elements, I decided to go with 15. I noticed that the sorting/merging is invalid (yet somewhat close) and I cannot figure out why... In addition, I've altered the code to use a different amount of threads instead of THREAD_MAX 4 so I may change it to 5 or 10 threads.

Could it be the Merge that's yielding invalid results? I've commented it below within main().

My C++ to C convert below:

#include <pthread.h>
#include <time.h>
#include <stdlib.h>

// number of elements in array
#define MAX 15

// number of threads
#define THREAD_MAX 4

//using namespace std;

// array of size MAX
int a[MAX];
int part = 0;

// merge function for merging two parts
void merge(int low, int mid, int high)
{   
    int* left = (int*) malloc( (mid - low + 1) * sizeof(int));
    int* right = (int*) malloc( (high - mid) * sizeof(int));


    // n1 is size of left part and n2 is size
    // of right part
    int n1 = mid - low + 1,
    n2 = high - mid,
    i, j;

    // storing values in left part
    for (i = 0; i < n1; i++)
        left[i] = a[i + low];

    // storing values in right part
    for (i = 0; i < n2; i++)
        right[i] = a[i + mid + 1];

    int k = low;
    i = j = 0;

    // merge left and right in ascending order
    while (i < n1 && j < n2) {
        if (left[i] <= right[j])
            a[k++] = left[i++];
        else
            a[k++] = right[j++];
    }

    // insert remaining values from left
    while (i < n1) {
        a[k++] = left[i++];
    }

    // insert remaining values from right
    while (j < n2) {
        a[k++] = right[j++];
    }

    free(left);
    free(right);
}

// merge sort function
void merge_sort(int low, int high)
{
    // calculating mid point of array
    int mid = low + (high - low) / 2;
    if (low < high) {

        // calling first half
        merge_sort(low, mid);

        // calling second half
        merge_sort(mid + 1, high);

        // merging the two halves
        merge(low, mid, high);
    }
}

// thread function for multi-threading
void* merge_sort123(void* arg)
{
    // which part out of 4 parts
    int thread_part = part++;

    // calculating low and high
    int low = thread_part * (MAX / THREAD_MAX);
    int high = (thread_part + 1) * (MAX / THREAD_MAX) - 1;

    // evaluating mid point
    int mid = low + (high - low) / 2;
    if (low < high) {
        merge_sort(low, mid);
        merge_sort(mid + 1, high);
        merge(low, mid, high);
    }
    return 0;
}



// Driver Code
int main()
{
    // generating random values in array
    for (int i = 0; i < MAX; i++){
        a[i] = rand() % 100;
    //        printf("%d ", a[i]);
    }


    pthread_t threads[THREAD_MAX];

    // creating 4 threads
    for (int i = 0; i < THREAD_MAX; i++)
        pthread_create(&threads[i], NULL, merge_sort123,
                       (void*)NULL);

    // joining all 4 threads
    for (int i = 0; i < THREAD_MAX; i++)
        pthread_join(threads[i], NULL);


    ///////////////////////////////////////////////////////////////
    // --- THIS MAY BE THE PART WHERE THE MERGING IS INVALID --- //
    ///////////////////////////////////////////////////////////////
    // merging the final 4 parts
    merge(0, (MAX / 2 - 1) / 2, MAX / 2 - 1);
    merge(MAX / 2, MAX/2 + (MAX-1-MAX/2)/2, MAX - 1);
    merge(0, (MAX - 1)/2, MAX - 1);


    // displaying sorted array
    printf("\n\nSorted array: ");
    for (int i = 0; i < MAX; i++)
        printf ("%d ", a[i]);


    printf("\n");
    return 0;
}
theflarenet
  • 642
  • 2
  • 13
  • 27
  • 1
    To start, you have a race condition with `int thread_part = part++` between the threads (i.e. some threads may get the same number or there may be a gap, where a given segment is _not_ covered). Have the _main_ thread assign the thread number and pass it as an argument to the thread: replace the last argument to `pthread_create` with: `(void *) i` and in the thread function: `unsigned long thread_part = (unsigned long) arg;` I realize you're copying this part of the code, but the example you linked to has this--and it _is_ a bug. – Craig Estey Oct 11 '18 at 20:03
  • @CraigEstey Unfortunately, these changes output the same results. `pthread_create(&threads[i], NULL, merge_sort123, (void *) i);` instead of `pthread_create(&threads[i], NULL, merge_sort123, (void*)NULL);` and `unsigned long thread_part = (unsigned long) arg;` instead of `int thread_part = part++;` – theflarenet Oct 11 '18 at 20:12
  • 1
    Yes, it wouldn't fix a logic bug, just the race condition. The port to C is simply to replace the `new` statements with `malloc` calls in `merge`. And, change `delete` at the end to `free()` [which you've done]. AFAICT, in the linked example, there should be `delete` for each temp array [unless they are added by the compiler when the function goes out of scope], so I'm worried about the code quality. So, I'm wondering about the original--does it actually work? In fact, I pulled the original and ran it with 15 and it fails, so not your bug. – Craig Estey Oct 11 '18 at 20:43
  • It appears that `MAX` must be a multiple of `THREAD_MAX`? I'd find a better example to start from as a multithread version has to be able to handle an arbitrary length array, just as a single thread version can. To decide if the final merges in `main` are correct, before doing them, print out the individual per-thread sections first (i.e. they [all 4 of them] should all be in sort). Do this with `MAX` as (e.g. 15) and 20. Rather than `rand`, I'd create a reversed initial array: `n - 1, n - 2, ..., 3, 2, 1`. It's easier to identify what went wrong and if any thread treads on another's space – Craig Estey Oct 11 '18 at 20:52
  • @CraigEstey Thank you for the support. I hope there is a better example out there but it seems like they don't exist in C and the C++ ones utilize different methods which is overkill to convert back to C. Tinkering with this script is all I do to figure this out now. – theflarenet Oct 11 '18 at 21:05

2 Answers2

4

As I mentioned in the top comments, there are a few issues with the original code.

The above race condition.

The [seemingly] missing delete at the bottom of merge

The fact that the number of array elements must be a multiple of the number of threads. If not, the range for the last thread will be calculated incorrectly.

The final merge in the main thread is fixed/hardwired for 4 threads.


A general solution is possible. However, unless the array size is very large, it isn't as big a time saver, so it's mostly for practice with multithreading [which is what you wanted, I believe]. See: Multithreaded quicksort or mergesort

It is easier to pass several parameters to a thread using a control struct. This is good technique for multithreading in general.

The main thread can prepopulate this with the array ranges for each thread. It can later use these control structs to generalize the final merge.

Here's a a cleaned up version that works for an arbitrary array size and arbitrary number of threads:

#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>

int opt_a;
int opt_t;
int opt_r;

// number of elements in array
//#define MAX 15
//#define MAX 16
int MAX;

// number of threads
//#define THREAD_MAX 4
int THREAD_MAX;

//using namespace std;

// array of size MAX
int *a;

// thread control parameters
struct tsk {
    int tsk_no;
    int tsk_low;
    int tsk_high;
};

// merge function for merging two parts
void
merge(int low, int mid, int high)
{

    // n1 is size of left part and n2 is size of right part
    int n1 = mid - low + 1;
    int n2 = high - mid;

    int *left = malloc(n1 * sizeof(int));
    int *right = malloc(n2 * sizeof(int));

    int i;
    int j;

    // storing values in left part
    for (i = 0; i < n1; i++)
        left[i] = a[i + low];

    // storing values in right part
    for (i = 0; i < n2; i++)
        right[i] = a[i + mid + 1];

    int k = low;

    i = j = 0;

    // merge left and right in ascending order
    while (i < n1 && j < n2) {
        if (left[i] <= right[j])
            a[k++] = left[i++];
        else
            a[k++] = right[j++];
    }

    // insert remaining values from left
    while (i < n1)
        a[k++] = left[i++];

    // insert remaining values from right
    while (j < n2)
        a[k++] = right[j++];

    free(left);
    free(right);
}

// merge sort function
void
merge_sort(int low, int high)
{

    // calculating mid point of array
    int mid = low + (high - low) / 2;

    if (low < high) {
        // calling first half
        merge_sort(low, mid);

        // calling second half
        merge_sort(mid + 1, high);

        // merging the two halves
        merge(low, mid, high);
    }
}

// thread function for multi-threading
void *
merge_sort123(void *arg)
{
    struct tsk *tsk = arg;
    int low;
    int high;

    // calculating low and high
    low = tsk->tsk_low;
    high = tsk->tsk_high;

    // evaluating mid point
    int mid = low + (high - low) / 2;

    if (low < high) {
        merge_sort(low, mid);
        merge_sort(mid + 1, high);
        merge(low, mid, high);
    }

    return 0;
}

// Driver Code
int
main(int argc, char **argv)
{
    char *cp;
    struct tsk *tsk;

    --argc;
    ++argv;

    MAX = 15;
    THREAD_MAX = 4;

    // use new/general algorithm by default
    opt_a = 1;

    for (; argc > 0; --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'M':  // array count
            MAX = atoi(cp + 2);
            break;

        case 'T':  // thread count
            THREAD_MAX = atoi(cp + 2);
            break;

        case 'a':  // change algorithm
            opt_a = !opt_a;
            break;

        case 'r':  // do _not_ use rand -- use linear increment
            opt_r = !opt_r;
            break;

        case 't':  // tracing
            opt_t = !opt_t;
            break;

        default:
            break;
        }
    }

    // allocate the array
    a = malloc(sizeof(int) * MAX);

    // generating random values in array
    if (opt_t)
        printf("ORIG:");
    for (int i = 0; i < MAX; i++) {
        if (opt_r)
            a[i] = MAX - i;
        else
            a[i] = rand() % 100;
        if (opt_t)
            printf(" %d", a[i]);
    }
    if (opt_t)
        printf("\n");

    pthread_t threads[THREAD_MAX];
    struct tsk tsklist[THREAD_MAX];

    int len = MAX / THREAD_MAX;

    if (opt_t)
        printf("THREADS:%d MAX:%d LEN:%d\n", THREAD_MAX, MAX, len);

    int low = 0;

    for (int i = 0; i < THREAD_MAX; i++, low += len) {
        tsk = &tsklist[i];
        tsk->tsk_no = i;

        if (opt_a) {
            tsk->tsk_low = low;
            tsk->tsk_high = low + len - 1;
            if (i == (THREAD_MAX - 1))
                tsk->tsk_high = MAX - 1;
        }

        else {
            tsk->tsk_low = i * (MAX / THREAD_MAX);
            tsk->tsk_high = (i + 1) * (MAX / THREAD_MAX) - 1;
        }

        if (opt_t)
            printf("RANGE %d: %d %d\n", i, tsk->tsk_low, tsk->tsk_high);
    }

    // creating 4 threads
    for (int i = 0; i < THREAD_MAX; i++) {
        tsk = &tsklist[i];
        pthread_create(&threads[i], NULL, merge_sort123, tsk);
    }

    // joining all 4 threads
    for (int i = 0; i < THREAD_MAX; i++)
        pthread_join(threads[i], NULL);

    // show the array values for each thread
    if (opt_t) {
        for (int i = 0; i < THREAD_MAX; i++) {
            tsk = &tsklist[i];
            printf("SUB %d:", tsk->tsk_no);
            for (int j = tsk->tsk_low; j <= tsk->tsk_high; ++j)
                printf(" %d", a[j]);
            printf("\n");
        }
    }

    // merging the final 4 parts
    if (opt_a) {
        struct tsk *tskm = &tsklist[0];
        for (int i = 1; i < THREAD_MAX; i++) {
            struct tsk *tsk = &tsklist[i];
            merge(tskm->tsk_low, tsk->tsk_low - 1, tsk->tsk_high);
        }
    }
    else {
        merge(0, (MAX / 2 - 1) / 2, MAX / 2 - 1);
        merge(MAX / 2, MAX / 2 + (MAX - 1 - MAX / 2) / 2, MAX - 1);
        merge(0, (MAX - 1) / 2, MAX - 1);
    }

    // displaying sorted array
    printf("\n\nSorted array:");
    for (int i = 0; i < MAX; i++)
        printf(" %d", a[i]);
    printf("\n");

    return 0;
}
Craig Estey
  • 30,627
  • 4
  • 24
  • 48
1

I found this Q&A useful, so I am reporting back my solution to the pthread sort problem. I used @CraigEstey's solution, but changed it to read ints from stdin and sort to stdout just like sort -n would work.

Here is my nsort.c

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>

typedef size_t I;
#if 1
typedef int T;
#define CMP(x,y)  (x)<(y)
#define CPY(x)    (x)
#define PRINT(x)  printf("%d\n",x)
#define PARSE(x)  atoi(x)
#else
typedef char *T;
#define CMP(x,y) (strcmp(x,y)<0)
#define CPY(x)    strdup(x)
#define PRINT(x)  printf("%s",x)
#define PARSE(x)  strdup(x)
#endif
#define SIZE      sizeof(T)

T *a;
void merge(I l, I m, I h)
{
    I n1 = m - l + 1, n2 = h - m;
    T *L = malloc(n1 * SIZE);
    T *R = malloc(n2 * SIZE);
    I i, j, k = l;
    for (i = 0; i < n1; i++)
        L[i] = a[i + l];
    for (i = 0; i < n2; i++)
        R[i] = a[i + m + 1];
    i = j = 0;
    while (i < n1 && j < n2) {
    if (CMP(L[i], R[j]))
        a[k++] = L[i++];
    else
        a[k++] = R[j++];
    }
    while (i < n1)
        a[k++] = L[i++];
    while (j < n2)
        a[k++] = R[j++];
    free(L);
    free(R);
}

void merge_sort(I l, I h)
{
    I m = (l + h) / 2;
    if (l < h) {
      merge_sort(l, m);
      merge_sort(m + 1, h);
      merge(l, m, h);
    }
}

FILE *in = 0;
I MAX;
I readi()
{
    char b[99];  // LIMITATION: will read input only up to 99 chars
    MAX = 0;
    I m = 1024;
    assert(a = malloc(SIZE * m));
    while (fgets(b, 99, in)) {
      MAX++;
      if (MAX > m)
          assert(a = realloc(a, SIZE * (m <<= 1)));
      a[MAX - 1] = PARSE(b);
    }
    return MAX;
}

I writei()
{
    for (I i = 0; i < MAX; i++)
      PRINT(a[i]);
    return MAX;
}

struct tsk {
    I l, h;
};
void *merge_sort_thread(void *arg)
{
    struct tsk tsk = *(struct tsk *) arg;
    merge_sort(tsk.l, tsk.h);
    return 0;
}

int main(int c, char **v)
{
    struct tsk *tsk;
    assert( in = c == 2 ? fopen(v[1], "r") : stdin );
    I N = sysconf(_SC_NPROCESSORS_ONLN);
    assert(readi());
    if (MAX < N)
      N = 1;
    pthread_t threads[N];
    struct tsk tsklist[N];
    I p = MAX / N, l = 0;
    for (I i = 0; i < N; i++, l += p) {
      tsk = &tsklist[i];
      tsk->l = l;
      tsk->h = l + p - 1;
      if (i == (N - 1))
        tsk->h = MAX - 1;
    }
    for (I i = 0; i < N; i++)
      pthread_create(&threads[i], NULL, merge_sort_thread, &tsklist[i]);
    for (I i = 0; i < N; i++)
      pthread_join(threads[i], NULL);

    struct tsk *tskm = &tsklist[0];
    for (I i = 1; i < N; i++) {
      struct tsk *tsk = &tsklist[i];
      merge(tskm->l, tsk->l - 1, tsk->h);
    }
    return assert(writei()), 0;
}

I wrote a test and put it in a Makefile:

test: nsort
        MAXNUMBER=1100100 ./gen >in
        ./nsort <in >nsort.out
        sort -n  in >sort.out
        cmp -s sort.out nsort.out

nsort: nsort.c
        gcc -g nsort.c -o nsort -lpthread

With a script called gen:

#!/usr/bin/awk -f
BEGIN {
  srand();
  i=1;
  n=ENVIRON["MAXNUMBER"]
  while (i++<=n) {
    printf("%d\n",int(rand()*n + 0.5));
  }
  exit;
}

I found this nsort to be faster than sort -n, but it didn't use much CPU.

effbiae
  • 1,087
  • 1
  • 7
  • 22