I have a problem finding a memory leak in my program.
top reports an increasing memory usage as program runs. When profiling my program with valgrind, no memory leaks are reported.
Program consists in a "reader" thread and several "consumer" threads.
"reader" thread loads data into one of several char** pointers, one for every "consumer" thread.
"consumer" thread works on the data of its corresponding char* pointer and frees memory.
I have included some pseudocode that describes what my program is doing. I know the code provided might not be enough to describe the problem. I am happy to include the entire code project if that will help.
"reader" thread, condensed for brevity
//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
//'length' number of datapoints a 'consumer' works on at a time
queue[i] = malloc(length*sizeof(char *));
}
char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
char *datapoint = malloc(data->legth);
memcpy(datapoint, data->content, data->length);
queue[qtracker][ltracker] = datapoint;
qtracker++;
if (nconsumers == qtracker) {
qtracker = 0;
ltracker++;
if (length == ltracker) ltracker = 0;
}
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read
"consumer" thread
//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];
datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
//Do work on data
free(datapoint);
datapoint = dataqueue[datatracker];
datatracker++;
//More synchronization code
}
"consumer" thread is correctly reading data and processing it as it should. Again, valgrind reports no memory leaks. When monitoring my process with top or htop, memory usage to this program keeps increasing to the point where my machine starts swapping.
EDIT
I have added a complete program that reproduces the error. This is not exactly the program where I encountered the problem as that one contains additional dependencies. Again, this program spawns 1 "reader" thread and N consumer threads. When running on large text file with hundreds of millions of lines, (such as DNA sequencing files) htop steadily shows a growing memory usage with valgrind showing no memory leaks excepts for a pthreads specific one.
Thanks again for all the help!!
Compile and run with in any modern linux box
gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>
Only this warning should show up as I done use the extracted pointer in this example:
<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
// Data passed to threads
typedef struct {
//Input file
FILE *fp;
//Number of threads
int numt;
//Syncronization data
pthread_mutex_t mtx;
pthread_cond_t workcond;
pthread_cond_t readcond;
int gowork;
int goread;
//Tracks how many threads are done analyzing data
int doneq;
/*
Stores "data queues" (1 queue per thread)
queue -> [ [ char** [ char** [ char** [ char** [ char**
len(queue)=numt [char*] [char*] [char*] [char*] [char*]
len(queue[n])=maxqueue [char*] [char*] [char*] [char*] [char*]
len(queue[n][m])=data ... ... ... ... ...
[char*] [char*] [char*] [char*] [char*]
] ] ] ] ]
]
*/
char ***queue;
//Internal thread ID
int *threadidx;
//Maximum number of lines to read
int maxseqs;
//Maximum number of lines per thread == maxseqs/numthreads
int maxqueue;
} thread_t;
/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);
/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);
/*
Initializes thread data
*/
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);
/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);
int main(int argc, char *argv[])
{
if (argc < 4) {
fprintf(stderr, "ERROR: Not enough arguments.\n");
exit(-1);
}
FILE *fp = fopen(argv[1], "r");
if (!fp) {
fprintf(stderr, "ERROR: Failed to open input file.\n");
exit(-1);
}
int numt = atoi(argv[2]);
if (!numt) {
fprintf(stderr, "ERROR: Please specify number of threads.\n");
fclose(fp);
exit(-1);
}
int maxseqs = atoi(argv[3]);
if (!maxseqs) {
fprintf(stderr, "ERROR: Please specify max number of lines.\n");
fclose(fp);
exit(-1);
}
//Start data struct for threads
thread_t threaddata;
if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
fprintf(stderr, "ERROR: Could not initialize thread data.\n");
fclose(fp);
exit(-1);
}
fprintf(stderr, "Thread data initialized.\n");
//return code
int ret;
//pthread creation
pthread_t readerthread;
pthread_t *consumerpool = NULL;
consumerpool = malloc((numt)*sizeof(pthread_t));
if (!consumerpool) {
fprintf(stderr, "Failed to allocate threads.\n");
ret = -1;
goto exit;
}
// Initialize and set thread detached attribute
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
//Consumer threads
int thrc;
for (int i = 0; i < numt; i++) {
thrc = pthread_create(consumerpool + i,
&attr,
consumer,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
}
//Couple of sleeps to keep track of stuff while running
sleep(1);
//Reader thread
thrc = pthread_create(&readerthread,
&attr,
reader,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
// Free attribute and wait for the other threads
pthread_attr_destroy(&attr);
int jrc;
jrc = pthread_join(readerthread, NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
}
for (int i = 0; i < numt; i++) {
jrc = pthread_join(*(consumerpool + i), NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
ret = -1;
goto exit;
}
}
ret = 0;
exit:
threadtkill(&threaddata);
free(consumerpool);
fprintf(stderr, "Finished.\n");
return(ret);
}
void *reader(void *readt)
{
fprintf(stderr, "Reader thread started.\n");
thread_t *threaddata = readt;
int numt = threaddata->numt;
int maxqueue = threaddata->maxqueue;
int maxseqs = threaddata->maxseqs;
FILE *fp = threaddata->fp;
// Array of queues, one per consumer thread
char ***queue = threaddata->queue;
// Number of bytes used to store length of line
size_t bytes = sizeof(ssize_t);
// Tracks number of lines loaded so far
size_t nlines = 0;
// Tracks to which queue data should be added to
int qtracker = 0;
// Tracks to which position in any particular queue, data should be added
int ltracker = 0;
// Holds read line
char *line = NULL;
ssize_t linelength = 0;
size_t n;
// Tracks how much data will be read
size_t totallength = 0;
size_t totallines = 0;
while ( (linelength = getline(&line, &n, fp)) != -1 ) {
// enough data is used to hold line contents + line length
char *data = malloc(bytes + linelength + 1);
if (!data) {
fprintf(stderr, "memerr\n");
continue;
}
// move line lenght bytes to data
memcpy(data, &linelength, bytes);
//move line bytes to data
memcpy(data + bytes, line, linelength + 1);
totallength += linelength;
// Add newly allocated data to one of numt queues
queue[qtracker][ltracker] = data;
qtracker++;
if (numt == qtracker) {
// Loop around queue
qtracker = 0;
ltracker++;
// Loop around positions in queue
if (maxqueue == ltracker) ltracker = 0;
}
nlines++;
// Stop reading thread and start consumer threads
if (nlines == maxseqs) {
fprintf(stderr, "%lu lines loaded\n", nlines);
sleep(3);
totallines += nlines;
nlines = 0;
fprintf(stderr, "Waking up consumers\n");
pthread_mutex_lock(&(threaddata->mtx));
//Wake consumer threads
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
//Wait for consumer threads to finish
while ( !threaddata->goread ) {
pthread_cond_wait(&(threaddata->readcond),
&(threaddata->mtx));
}
fprintf(stderr, "Reader has awoken!!!!\n\n");
sleep(3);
threaddata->goread = 0;
pthread_mutex_unlock(&(threaddata->mtx));
}
}
//Add NULL pointers to the end of each queue to indicate reading is done
pthread_mutex_lock(&(threaddata->mtx));
for (int i = 0; i < numt; i++) {
queue[i][ltracker] = NULL;
}
// Wake consumers for the last time
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
pthread_mutex_unlock(&(threaddata->mtx));
// Log info
fprintf(stderr, "%lu characters read.\n", totallength);
if (line) free(line);
pthread_exit(NULL);
}
void *consumer(void *consumert)
{
thread_t *threaddata = consumert;
// Number of consumer threads
int numt = threaddata->numt;
// Max length of queue to extract data from
int maxqueue = threaddata->maxqueue;
// Holds data sent by reader thread
char *data = NULL;
// Holds the actual line read
char *line = NULL;
size_t linelength;
size_t bytes = sizeof(ssize_t);
// get queue number for corresponding thread
int qnum = -1;
pthread_mutex_lock(&(threaddata->mtx));
int *tlist = threaddata->threadidx;
while (qnum == -1) {
qnum = *tlist;
*tlist = -1;
tlist++;
}
fprintf(stderr, "Thread got queueID: %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Any thread works on only one and one queue only
char **queue = threaddata->queue[qnum];
//After initializing, wait for reader to start working
pthread_mutex_lock(&(threaddata->mtx));
while ( !threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
}
fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Tracks number of characters this thread consumes
size_t totallength = 0;
// Tracks from which position in queue data should be taken from
size_t queuecounter = 1;
// Get first data point
data = queue[0];
while (data != NULL) {
//get line length
memcpy(&linelength, data, bytes);
//get line
line = data + bytes;
//Do work
totallength += linelength;
free(data);
//Check for number of sequences analyzed
if (queuecounter == maxqueue) {
// Wait for other threads to catchup
sleep(1);
queuecounter = 0;
pthread_mutex_lock(&(threaddata->mtx));
threaddata->doneq++;
threaddata->gowork = 0;
// If this thread is the last one to be done with its queue, wake
// reader
if (threaddata->doneq == numt) {
threaddata->goread = 1;
pthread_cond_signal(&(threaddata->readcond));
threaddata->doneq = 0;
}
// When done consuming data, wait for reader to load more
while (!threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond),
&(threaddata->mtx));
}
pthread_mutex_unlock(&(threaddata->mtx));
}
//Get next line
data = queue[queuecounter];
queuecounter++;
}
// Log and exit
fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
pthread_exit(NULL);
}
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
threaddata->fp = fp;
//Determine maximum thread queue length
threaddata->maxqueue = ceil((float)maxseqs/numt);
threaddata->maxseqs = threaddata->maxqueue*numt;
fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
threaddata->numt = numt;
//Allocate data for queues and initilize them
threaddata->queue = malloc(numt*sizeof(char *));
threaddata->threadidx = malloc(numt*sizeof(int));
for (int i = 0; i < numt; i++) {
threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
threaddata->threadidx[i] = i;
}
//Initialize syncronization data
pthread_mutex_init(&(threaddata->mtx), NULL);
pthread_cond_init(&(threaddata->workcond), NULL);
pthread_cond_init(&(threaddata->readcond), NULL);
threaddata->gowork = 0;
threaddata->goread = 0;
threaddata->doneq = 0;
return 1;
}
void threadtkill(thread_t *threaddata)
{
fclose(threaddata->fp);
for (int i = 0; i < threaddata->numt; i++) {
free(threaddata->queue[i]);
}
free(threaddata->queue);
free(threaddata->threadidx);
pthread_mutex_destroy(&(threaddata->mtx));
}