0

T tried to speed up my Pypy code by adding some c functions. The problem is, the memory usage is always increasing! I saw a few posts on this subject and try to make a simple test to illustrate that. In my test bellow, I am able to release memory according to this post (Python CFFI memory management issues) but my code crashes like this: free(): invalid next size (normal). It seems I double free the memory...

Can anyone help me to solve my issue?

import gc
from cffi import FFI
from time import sleep
import execnet
ffibuilder = FFI()


Create_TypeStructure = """
typedef struct _STRUCT1
    {
        int Data1;
        int Data2;
        int Data3;
        double Data4;
        double Data5;
        double Data6;
        double Data7;
        double Data8;

    }STRUCT1, *PSTRUCT1;
"""

Create_DataStructure = """
PSTRUCT1 CreateDataStructure()
 {
     PSTRUCT1 pStruct1 = ( PSTRUCT1 ) malloc( sizeof( STRUCT1 )*6000 );
     SetDummyValues(&pStruct1);
     if(pStruct1 != NULL) printf("SECOND TEST: ptr is not null/n");
     else printf("SECOND TEST: ptr is null/n");
     return pStruct1;
 }
"""

Set_DummyValues = """
void SetDummyValues( PSTRUCT1 ptr )
    {
        ptr = NULL;
    }
"""

Free_DataStructure = """
 void FreeDataStructure(PSTRUCT1 ptr) 
    {
        free(ptr);
    }  
"""


Some_Function = """
PSTRUCT1 SomeCFunction(STRUCT1 *p) 
    {

        int tmp=-1;
        int numline = 5999;

        while (tmp < numline) 
        {
        tmp++ ;
        {
            p[tmp].Data1 = 1000000;
            p[tmp].Data2 = 1000000;
            p[tmp].Data3 = 1000000;
            p[tmp].Data4 = 2125585.265;
            p[tmp].Data5 = 2125585.265;
            p[tmp].Data6 = 2125585.265;
            p[tmp].Data7 = 2125585.265;
            p[tmp].Data8 = 2125585.265;
        }

        }

        return p;
    }


"""

ffibuilder.cdef(Create_TypeStructure) #declare strucutres
ffibuilder.cdef('PSTRUCT1 CreateDataStructure();') #declare function 
ffibuilder.cdef('void FreeDataStructure(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('PSTRUCT1 SomeCFunction(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('void SetDummyValues(PSTRUCT1 pStruct1);') #declare function

ffibuilder.set_source("c", Create_TypeStructure + Set_DummyValues + Create_DataStructure + Free_DataStructure + Some_Function )
ffibuilder.compile(verbose=True)
from c import ffi, lib


def worker_process(channel):
    # Import modules
    import gc
    from time import sleep
    import execnet
    from c import ffi, lib
    # task processor, sits on each CPU
    channel.send("ready")

    for x in channel:
        if x is None:  # we can shutdown
                break
        data_list = {}
        for i in range(15000):  
            # Create a pointer to the data structure and tell the garbage collector how to destroy it
            gc_c_pDataStructure = ffi.gc( lib.CreateDataStructure(), lib.FreeDataStructure )
            lib.SomeCFunction( gc_c_pDataStructure )  # Populate the data structure
            data_list[i]= gc_c_pDataStructure # Store some data 
            #lib.FreeDataStructure(data_list[i])

        for i in range(15000):
            lib.FreeDataStructure(data_list[i])

        #sleep(15)
        channel.send("Ok!")


numberOfTasks = 500 # simulations to launch
workerCount = 1 # CPUs
group = execnet.Group()
for i in range(workerCount):  # CPUs
    group.makegateway()



# execute taskprocessor everywhere
mch = group.remote_exec(worker_process)

# get a queue that gives us results
q = mch.make_receive_queue(endmarker="Stop")


tasks = range(numberOfTasks)  # a list of tasks, here just integers


terminated = 0

while 1:
    channel, item = q.get()
    if item == "Stop":
        terminated += 1
        print "Terminated task on channel %s" % channel.gateway.id
    if terminated == len(mch):
        print "Got all results, Finish!"
        break
        continue
    if item != "ready":
        print "%s: Terminated: %s" % (channel.gateway.id, item)
    if not tasks:
        print "No tasks remain, sending termination request to all"
        mch.send_each(None)
        tasks = -1
    if tasks and tasks != -1:
        task = tasks.pop()
        channel.send(task)
        print "Sent task %r to channel %s" % (task, channel.gateway.id)

group.terminate()
Ronan Boiteau
  • 9,608
  • 6
  • 34
  • 56
  • 1
    Hey, your python code is all messed up with the indentation. I was going to edit it, but I don't know what you really intended. Did you try to paste tabs? Regardless of your indentation, I can't think of a reason to `continue` (after the `break`) in that python code at the bottom. Would your program behave correctly if you lowered the 15000 `range` in the `for` loop in `worker_process`? – Jason Feb 08 '19 at 20:14
  • 1
    Also, that "double freeing" of memory may come from the explicit free call here `lib.FreeDataStructure(data_list[i])`. You explicitly free memory that was added to the garbage collector. If the garbage collector frees it first, you will indeed try to free already freed memory. – Jason Feb 08 '19 at 20:18
  • Sorry, I think some indentation disappeared during the copy/paste. – Yannick Banaszak Feb 08 '19 at 21:01
  • My problem figures in the worker_process def, rest of the code is just to illustrate the problem on a multiprocess example. If I don't free my c_struct, the memory never stop to increase. – Yannick Banaszak Feb 08 '19 at 21:04
  • Something weird is going on with the call to `SetDummyValues` . You pass in the address of a pointer to Struct1, but expect only a pointer to Struct1. Setting the argument to NULL in that function won't affect anything in the caller. And if you 'fixed' it by changing the code to `*ptr = NULL`, then you would lose the pointer to the memory you just allocated. What is that function _supposed_ to do? – AShelly Feb 08 '19 at 23:15
  • To set the structure pointer to NULL – Yannick Banaszak Feb 08 '19 at 23:21
  • OT: it is a very poor programming practice to hide pointers in `typedef`s – user3629249 Feb 09 '19 at 00:56

1 Answers1

0

My Solution: force Pypy garbage collector to work and remove manual freeing. Also don't need to set pointer to NULL.

import gc
from cffi import FFI
from time import sleep
import execnet
ffibuilder = FFI()
import __pypy__


Create_TypeStructure = """
typedef struct _STRUCT1
    {
        int Data1;
        int Data2;
        int Data3;
        double Data4;
        double Data5;
        double Data6;
        double Data7;
        double Data8;

    }STRUCT1, *PSTRUCT1;
"""

Create_DataStructure = """
PSTRUCT1 CreateDataStructure()
 {
     PSTRUCT1 pStruct1 = ( PSTRUCT1 ) malloc( sizeof( STRUCT1 )*6000 );
     return pStruct1;
 }
"""


Free_DataStructure = """
 void FreeDataStructure(PSTRUCT1 ptr) 
    {
        free(ptr);
    }  
"""


Some_Function = """
PSTRUCT1 SomeCFunction(STRUCT1 *p) 
    {

        int tmp=-1;
        int numline = 5999;

        while (tmp < numline) 
        {
        tmp++ ;
        {
            p[tmp].Data1 = 1000000;
            p[tmp].Data2 = 1000000;
            p[tmp].Data3 = 1000000;
            p[tmp].Data4 = 2125585.265;
            p[tmp].Data5 = 2125585.265;
            p[tmp].Data6 = 2125585.265;
            p[tmp].Data7 = 2125585.265;
            p[tmp].Data8 = 2125585.265;
        }

        }

        return p;
    }


"""

ffibuilder.cdef(Create_TypeStructure) #declare strucutres
ffibuilder.cdef('PSTRUCT1 CreateDataStructure();') #declare function 
ffibuilder.cdef('void FreeDataStructure(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('PSTRUCT1 SomeCFunction(PSTRUCT1 ptr);') #declare function

ffibuilder.set_source("c", Create_TypeStructure + Create_DataStructure + Free_DataStructure + Some_Function )
ffibuilder.compile(verbose=True)
from c import ffi, lib


def worker_process(channel):
    # Import modules
    import gc
    from time import sleep
    import execnet
    from c import ffi, lib
    import __pypy__
    # task processor, sits on each CPU
    channel.send("ready")

    for x in channel:
        if x is None:  # we can shutdown
                break
        data_list = {}
        for i in range(5000):  
            # Create a pointer to the data structure and tell the garbage collector how to destroy it
            gc_c_pDataStructure = ffi.gc( lib.CreateDataStructure(), lib.FreeDataStructure )
            __pypy__.add_memory_pressure(6000)
            lib.SomeCFunction( gc_c_pDataStructure )  # Populate the data structure
            data_list[i]= gc_c_pDataStructure # Store some data 
            #lib.FreeDataStructure(data_list[i])

        #for i in range(5000):
            #lib.FreeDataStructure(data_list[i])

        #sleep(15)
        channel.send("Ok!")


numberOfTasks = 500 # simulations to launch
workerCount = 3 # CPUs
group = execnet.Group()
for i in range(workerCount):  # CPUs
    group.makegateway()



# execute taskprocessor everywhere
mch = group.remote_exec(worker_process)

# get a queue that gives us results
q = mch.make_receive_queue(endmarker="Stop")


tasks = range(numberOfTasks)  # a list of tasks, here just integers


terminated = 0

while 1:
    channel, item = q.get()
    if item == "Stop":
        terminated += 1
        print "Terminated task on channel %s" % channel.gateway.id
        if terminated == len(mch):
            print "Got all results, Finish!"
            break
        continue
    if item != "ready":
        print "%s: Terminated: %s" % (channel.gateway.id, item)
    if not tasks:
        print "No tasks remain, sending termination request to all"
        mch.send_each(None)
        tasks = -1
    if tasks and tasks != -1:
        task = tasks.pop()
        channel.send(task)
        print "Sent task %r to channel %s" % (task, channel.gateway.id)

group.terminate()