0

I have a problem finding a memory leak in my program.

top reports an increasing memory usage as program runs. When profiling my program with valgrind, no memory leaks are reported.

Program consists in a "reader" thread and several "consumer" threads.

"reader" thread loads data into one of several char** pointers, one for every "consumer" thread.

"consumer" thread works on the data of its corresponding char* pointer and frees memory.

I have included some pseudocode that describes what my program is doing. I know the code provided might not be enough to describe the problem. I am happy to include the entire code project if that will help.

"reader" thread, condensed for brevity

//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
    //'length' number of datapoints a 'consumer' works on at a time
    queue[i] = malloc(length*sizeof(char *));
}

char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    memcpy(datapoint, data->content, data->length);
    queue[qtracker][ltracker] = datapoint;
    qtracker++;
    if (nconsumers == qtracker) { 
        qtracker = 0;
        ltracker++;
        if (length == ltracker) ltracker = 0;
    }
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read

"consumer" thread

//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];

datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

"consumer" thread is correctly reading data and processing it as it should. Again, valgrind reports no memory leaks. When monitoring my process with top or htop, memory usage to this program keeps increasing to the point where my machine starts swapping.

EDIT

I have added a complete program that reproduces the error. This is not exactly the program where I encountered the problem as that one contains additional dependencies. Again, this program spawns 1 "reader" thread and N consumer threads. When running on large text file with hundreds of millions of lines, (such as DNA sequencing files) htop steadily shows a growing memory usage with valgrind showing no memory leaks excepts for a pthreads specific one.

Thanks again for all the help!!

Compile and run with in any modern linux box

gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>

Only this warning should show up as I done use the extracted pointer in this example:

<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
     char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>

// Data passed to threads
typedef struct {
    //Input file
    FILE *fp;
    //Number of threads
    int numt;
    //Syncronization data
    pthread_mutex_t mtx;
    pthread_cond_t workcond;
    pthread_cond_t readcond;
    int gowork;
    int goread;
    //Tracks how many threads are done analyzing data
    int doneq;
    /*
      Stores "data queues" (1 queue per thread)
      queue ->       [  [ char**    [ char**    [ char**    [ char**    [ char**
len(queue)=numt          [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n])=maxqueue   [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n][m])=data      ...         ...         ...         ...         ...
                         [char*]     [char*]     [char*]     [char*]     [char*]
                                 ]           ]           ]           ]          ]
                                                                                ]
    */
    char ***queue;
    //Internal thread ID
    int *threadidx;
    //Maximum number of lines to read
    int maxseqs;
    //Maximum number of lines per thread == maxseqs/numthreads
    int maxqueue;
} thread_t;

/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);

/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);

/*
Initializes thread data
*/
int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);

/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);


int main(int argc, char *argv[])
{
    if (argc < 4) {
        fprintf(stderr, "ERROR: Not enough arguments.\n");
        exit(-1);
    }

    FILE *fp = fopen(argv[1], "r");
    if (!fp) {
        fprintf(stderr, "ERROR: Failed to open input file.\n");
        exit(-1);
    }

    int numt = atoi(argv[2]);
    if (!numt) {
        fprintf(stderr, "ERROR: Please specify number of threads.\n");
        fclose(fp);
        exit(-1);
    }

    int maxseqs = atoi(argv[3]);
    if (!maxseqs) {
        fprintf(stderr, "ERROR: Please specify max number of lines.\n");
        fclose(fp);
        exit(-1);
    }

    //Start data struct for threads
    thread_t threaddata;
    if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
        fprintf(stderr, "ERROR: Could not initialize thread data.\n");
        fclose(fp);
        exit(-1);
    }
    fprintf(stderr, "Thread data initialized.\n");


    //return code
    int ret;

    //pthread creation
    pthread_t readerthread;
    pthread_t *consumerpool = NULL;
    consumerpool = malloc((numt)*sizeof(pthread_t));
    if (!consumerpool) {
        fprintf(stderr, "Failed to allocate threads.\n");
        ret = -1;
        goto exit;
    }

    // Initialize and set thread detached attribute
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    //Consumer threads
    int thrc;
    for (int i = 0; i < numt; i++) {
        thrc = pthread_create(consumerpool + i,
                              &attr,
                              consumer,
                              (void *)&threaddata);
        if (thrc) {
            fprintf(stderr, "ERROR: Thread creation.\n");
            ret = -1;
            goto exit;
        }
    }

    //Couple of sleeps to keep track of stuff while running
    sleep(1);

    //Reader thread
    thrc = pthread_create(&readerthread,
                          &attr,
                          reader,
                          (void *)&threaddata);
    if (thrc) {
        fprintf(stderr, "ERROR: Thread creation.\n");
        ret = -1;
        goto exit;
    }


    // Free attribute and wait for the other threads
    pthread_attr_destroy(&attr);

    int jrc;
    jrc = pthread_join(readerthread, NULL);
    if (jrc) {
        fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
    }
    for (int i = 0; i < numt; i++) {
        jrc = pthread_join(*(consumerpool + i), NULL);
        if (jrc) {
            fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
            ret = -1;
            goto exit;
        }
    }
    ret = 0;
    exit:
        threadtkill(&threaddata);
        free(consumerpool);
        fprintf(stderr, "Finished.\n");
        return(ret);
}


void *reader(void *readt)
{
    fprintf(stderr, "Reader thread started.\n");
    thread_t *threaddata = readt;
    int numt = threaddata->numt;
    int maxqueue = threaddata->maxqueue;
    int maxseqs = threaddata->maxseqs;
    FILE *fp = threaddata->fp;

    // Array of queues, one per consumer thread
    char ***queue = threaddata->queue;

    // Number of bytes used to store length of line
    size_t bytes = sizeof(ssize_t);
    // Tracks number of lines loaded so far
    size_t nlines = 0;

    // Tracks to which queue data should be added to
    int qtracker = 0;
    // Tracks to which position in any particular queue, data should be added
    int ltracker = 0;

    // Holds read line
    char *line = NULL;
    ssize_t linelength = 0;
    size_t n;

    // Tracks how much data will be read
    size_t totallength = 0;
    size_t totallines = 0;
    while ( (linelength =  getline(&line, &n, fp)) != -1 ) {
        // enough data is used to hold line contents + line length
        char *data = malloc(bytes + linelength + 1);

        if (!data) {
            fprintf(stderr, "memerr\n");
            continue;
        }
        // move line lenght bytes to data
        memcpy(data, &linelength, bytes);
        //move line bytes to data
        memcpy(data + bytes, line, linelength + 1);

        totallength += linelength;

        // Add newly allocated data to one of numt queues
        queue[qtracker][ltracker] = data;
        qtracker++;
        if (numt == qtracker) {
            // Loop around queue
            qtracker = 0;
            ltracker++;
            // Loop around positions in queue
            if (maxqueue == ltracker) ltracker = 0;
        }
        nlines++;
        // Stop reading thread and start consumer threads
        if (nlines == maxseqs) {
            fprintf(stderr, "%lu lines loaded\n", nlines);
            sleep(3);
            totallines += nlines;
            nlines = 0;
            fprintf(stderr, "Waking up consumers\n");
            pthread_mutex_lock(&(threaddata->mtx));
            //Wake consumer threads
            threaddata->gowork = 1;
            pthread_cond_broadcast(&(threaddata->workcond));
            //Wait for consumer threads to finish
            while ( !threaddata->goread ) {
                pthread_cond_wait(&(threaddata->readcond),
                                  &(threaddata->mtx));
            }
            fprintf(stderr, "Reader has awoken!!!!\n\n");
            sleep(3);
            threaddata->goread = 0;
            pthread_mutex_unlock(&(threaddata->mtx));
        }
    }

    //Add NULL pointers to the end of each queue to indicate reading is done
    pthread_mutex_lock(&(threaddata->mtx));
    for (int i = 0; i < numt; i++) {
        queue[i][ltracker] = NULL;
    }
    // Wake consumers for the last time
    threaddata->gowork = 1;
    pthread_cond_broadcast(&(threaddata->workcond));
    pthread_mutex_unlock(&(threaddata->mtx));

    // Log info
    fprintf(stderr, "%lu characters read.\n", totallength);
    if (line) free(line);
    pthread_exit(NULL);
}


void *consumer(void *consumert)
{
    thread_t *threaddata = consumert;
    // Number of consumer threads
    int numt = threaddata->numt;
    // Max length of queue to extract data from
    int maxqueue = threaddata->maxqueue;

    // Holds data sent by reader thread
    char *data = NULL;
    // Holds the actual line read
    char *line = NULL;
    size_t linelength;
    size_t bytes = sizeof(ssize_t);

    // get queue number for corresponding thread
    int qnum = -1;
    pthread_mutex_lock(&(threaddata->mtx));
    int *tlist = threaddata->threadidx;
    while (qnum == -1) {
        qnum = *tlist;
        *tlist = -1;
        tlist++;
    }
    fprintf(stderr, "Thread got queueID: %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));
    // Any thread works on only one and one queue only
    char **queue = threaddata->queue[qnum];

    //After initializing, wait for reader to start working
    pthread_mutex_lock(&(threaddata->mtx));
    while ( !threaddata->gowork) {
        pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
    }
    fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));

    // Tracks number of characters this thread consumes
    size_t totallength = 0;
    // Tracks from which position in queue data should be taken from
    size_t queuecounter = 1;
    // Get first data point
    data = queue[0];

    while (data != NULL) {
        //get line length
        memcpy(&linelength, data, bytes);

        //get line
        line = data + bytes;

        //Do work
        totallength += linelength;
        free(data);

        //Check for number of sequences analyzed
        if (queuecounter == maxqueue) {
            // Wait for other threads to catchup
            sleep(1);
            queuecounter = 0;
            pthread_mutex_lock(&(threaddata->mtx));
            threaddata->doneq++;
            threaddata->gowork = 0;
            // If this thread is the last one to be done with its queue, wake
            // reader
            if (threaddata->doneq == numt) {
                threaddata->goread = 1;
                pthread_cond_signal(&(threaddata->readcond));
                threaddata->doneq = 0;
            }
            // When done consuming data, wait for reader to load more
            while (!threaddata->gowork) {
                pthread_cond_wait(&(threaddata->workcond),
                                  &(threaddata->mtx));
            }
            pthread_mutex_unlock(&(threaddata->mtx));
        }
        //Get next line
        data = queue[queuecounter];
        queuecounter++;
    }

    // Log and exit
    fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
    pthread_exit(NULL);
}


int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
    threaddata->fp = fp;
    //Determine maximum thread queue length
    threaddata->maxqueue = ceil((float)maxseqs/numt);
    threaddata->maxseqs = threaddata->maxqueue*numt;
    fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
    fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
    threaddata->numt = numt;
    //Allocate data for queues and initilize them
    threaddata->queue = malloc(numt*sizeof(char *));
    threaddata->threadidx = malloc(numt*sizeof(int));
    for (int i = 0; i < numt; i++) {
        threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
        threaddata->threadidx[i] = i;
    }
    //Initialize syncronization data
    pthread_mutex_init(&(threaddata->mtx), NULL);
    pthread_cond_init(&(threaddata->workcond), NULL);
    pthread_cond_init(&(threaddata->readcond), NULL);
    threaddata->gowork = 0;
    threaddata->goread = 0;
    threaddata->doneq = 0;
    return 1;
}


void threadtkill(thread_t *threaddata)
{
    fclose(threaddata->fp);
    for (int i = 0; i < threaddata->numt; i++) {
        free(threaddata->queue[i]);
    }
    free(threaddata->queue);
    free(threaddata->threadidx);
    pthread_mutex_destroy(&(threaddata->mtx));
}
jregalad
  • 374
  • 2
  • 12
  • That was my first thought, but it's the same pointer as in the reader thread. Really breaking my head as to what is happening. – jregalad Apr 22 '20 at 16:42
  • 1
    Although pointers to different things are [usually] the same size, because you have `char ***queue`, change: `queue[i] = malloc(length*sizeof(char *));` into `queue[i] = malloc(length*sizeof(char **));` If you're doing the former, it may indicate you have a level issue in your understanding. [Almost] _never_ use `char ***`, a triple level pointer. It's easy to mess it up, and I've never [in 40+ years of programming in C] had to use one. See: http://wiki.c2.com/?ThreeStarProgrammer – Craig Estey Apr 22 '20 at 16:46
  • @CraigEstey - `malloc(length*sizeof(char *));` and `malloc(length*sizeof(char **));` I believe have the same end result since `sizeof(char **) == sizeof(char *)`, always. – ryyker Apr 22 '20 at 17:03
  • When code uses `queue[qtracker][ltracker]` - a pointer, How does code determine how much data is reference-able? As in the `//Do work on data` code? Its length is lost. – chux - Reinstate Monica Apr 22 '20 at 17:05
  • 1
    @ryyker Yes, they are the same size (which I mentioned) (i.e. calculated result is the same). But, I was talking about logical correctness. This would [more clearly] manifest itself with: `struct foo { int bar[100]; };` Then, `struct foo *arr = malloc(25 * sizeof(struct foo *));` should be `struct foo *arr = malloc(25 * sizeof(struct foo));`, an OP problem we see here on SO all the time. Here, it was a similar issue, but, we "got lucky" that `char *` and `char **` are the same size – Craig Estey Apr 22 '20 at 17:17
  • @chux-ReinstateMonica Decided to skip that part for the sake of brevity. In summary, The first 4 bytes of data code the lengthof the entire datapoint. – jregalad Apr 22 '20 at 17:18
  • @jregalad Is `data->legth` the same as `data->length`? Consider posting true compilable code. – chux - Reinstate Monica Apr 22 '20 at 19:14
  • 2
    If you would take a stab at editing this post to make it into a [mcve], it would help to figure out what is wrong. Without that, there are too many gaps in the code to go any further than guessing.By the way, If you paid any attention to my answer before I deleted it, don't. It made incorrect assertions. – ryyker Apr 22 '20 at 19:18
  • @ryyker updated post with code to reproduce error, thanks again for the help – jregalad Apr 23 '20 at 12:34
  • Look at updates to bottom of my answer for what I hope will be answers to some of your questions in comments. Good luck. – ryyker Apr 23 '20 at 15:13

3 Answers3

0

This line looks suspicious:

if (length == ltracker) ltracker++;

I would normally expect to see:

if (length == ltracker) ltracker = 0; /* wrap */

but without the entire context, it is a bit fuzzy. Also, it seems pretty clear that you are creating a race between the producer and consumer with all of this, which might prove even harder to debug than your current issue.

Since you went triple level; you do recognize that your buffer space is O(n^3); and that free() seldom shrinks your process memory. Free generally just permits you to recycle previously allocated heap; so your program will grow until it no longer needs to ask the system for more memory, then stay at that size.

mevets
  • 10,070
  • 1
  • 21
  • 33
  • Fixed "if (length == ltracker) ltracker++;" type on my part. I understand my triple level pointer is rather complicated. But that allocation happens only once. Most memory allocated happens within a loop. That memory is freed and valgrind detects no leaks, but when running with large datasets, memory keeps being used. Working on a minimal reproducible example that can be posted here. – jregalad Apr 23 '20 at 07:42
  • Yes, my point is that you assert that you have a leak; but I don't think you do -- you just use a lot of data. You do have a sync problem -- the sleep at 327 is to mitigate a race condition; remove it your program stops working. The problem is in how you start the consumer threads. – mevets Apr 23 '20 at 14:23
  • @jregalad: valgrind can report the memory allocated and not freed. Use --show-reachable=yes or --show-leak-kinds=all. You can also search for leaks (or growing memory) on demand e.g. from the shell, using vgdb. See http://www.valgrind.org/docs/manual/mc-manual.html#mc-manual.monitor-commands for more details – phd Apr 23 '20 at 19:58
0

Note, the following focuses on only the code snippets you refer to as reader and consumer threads, although as pointed out in comments, and the other answer, there are other potential sources that should be reviewed for problems...

In your reader thread:

while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    ...
    // Note: there are no free(datapoint); calls in this loop
}

Clearly, datapoint is created within this block, but is not freed within this block.

Following are likely contributing factors to memory leaks:

  • Because the instance of datapoint in the reader thread is created within blocks, its life exists only within those blocks. The memory that was created at that address continues to be owned by the process that created it, but outside that block the pointer variable pointing to that memory no longer lives, thus cannot be freed outside that block. And because I see no calls to free(datapopint) inside that block, it is never freed.

  • Compounding this, char *datapoint = malloc(data->legth); is called in a loop, (without calling free in-between) eact time creating new memory at a new address, while overwriting the address that referred to its predecessor, thus making it impossible to free all of the previous allocations.

  • The instance of datapoint in the consumer thread, although has the same symbol as the one in the reader thread is not pointing to the same memory space. So even though that variable is being freed, it is not freeing the instance of datapoint that exists in the reader thread.

Excerpt of code from consumer thread

datapoint = dataqueue[datatracker]  //Note no ";" making this code uncompilable
                                    //forcing the conclusion that code posted
                                    //is not code actually used, 
                                    //Also: where is this instance of datapoint
                                    //created ?
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

Per questions in comments, and general Linux threading info:
Why doesn't Valgrind detect memory leaks, SO question
passing data between threads question
Creating threads in Linux tutorial
LinuxTtutorial: POSIX Threads

ryyker
  • 22,849
  • 3
  • 43
  • 87
  • 1
    I see, seems I have quite a misunderstanding about how to pass data between threads. What I don't fully understand is why valgrind does not show any kind of memory leak. – jregalad Apr 23 '20 at 14:15
  • 1
    @jregalad - Good question about `valgrind`. I do not have experience with it. In this case it is simply a scope problem. The `{...}` braces delineate the life span for any variable created within them. When execution flow exits, then everything created within ceases to exist, unless created with `static` modifier, and then it continues to exist until end of process, but only within the `{...}`. – ryyker Apr 23 '20 at 14:54
  • ...Also, a good rule of thumb about dynamically allocated memory is to match one-for-one calls to `free()` for each and every call to `[m][c][re]alloc()`. And passing pointer variables (pointers to Memory) as function arguments is commonly done and in `C` is idiomatic. Regarding passing data between threads, [maybe this will help](https://stackoverflow.com/questions/13741243/passing-data-between-thread-using-c-issue). – ryyker Apr 23 '20 at 14:54
0

Turns out there is nothing wrong with my code per se.Calling free() after a malloc() releases memory on the heap to be reused by the program but that does not mean it goes back to the system. The reason for this is still a bit out of my understanding.

Valgrind was not reporting memory leaks because there are none.

After doing dome research, reading more about the nature of dynamic memory allocation and landing here:

Force free() to return malloc memory back to OS

Why does the free() function not return memory to the operating system?

Will malloc implementations return free-ed memory back to the system?

Memory not freed after calling free()

Calling malloc_trim() after each free was enough to make the system reclaim the allocated memory.

For example, without calling malloc_trim(), CPU and memory usage of my program looks like this: enter image description here On each call to my "reader" thread (first peak in CPU ussage) some memory is allocated. Calling mu "consumer" threads free the requested memory but the memory is not always returned to the system as per the blue line in the plot.

With malloc_trim() after each free(), memory usage looks how I was expecting it to look: enter image description here When "reader" thread is executing memory associated with the porcess increases. When "consumers" are running, memory is freed and returned to the OS.

jregalad
  • 374
  • 2
  • 12