3

I have python TCP client and need to send media(.mpg) file in a loop to a 'C' TCP server.

I have following code, where in separate thread I am reading the 10K blocks of file and sending it and doing it all over again in loop, I think it is because of my implementation of thread module, or tcp send. I am using Queues to print the logs on my GUI ( Tkinter ) but after some times it goes out of memory..

UPDATE 1 - Added more code as requested

Thread class "Sendmpgthread" used to create thread to send data

.
. 
def __init__ ( self, otherparams,MainGUI):
    .
    .
    self.MainGUI = MainGUI
    self.lock = threading.Lock()
    Thread.__init__(self)

#This is the one causing leak, this is called inside loop
def pushlog(self,msg):
    self.MainGUI.queuelog.put(msg)

def send(self, mysocket, block):
    size = len(block)
    pos = 0;
    while size > 0:
        try:
            curpos = mysocket.send(block[pos:])
        except socket.timeout, msg:
            if self.over:
                 self.pushlog(Exit Send)
                return False
        except socket.error, msg:
            print 'Exception'     
            return False  
        pos = pos + curpos
        size = size - curpos
    return True

def run(self):
    media_file = None
    mysocket = None 

    try:
        mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        mysocket.connect((self.ip, string.atoi(self.port)))
        media_file = open(self.file, 'rb') 

        while not self.over:
            chunk = media_file.read(10000)
            if not chunk:   # EOF Reset it
                print 'resetting stream'
                media_file.seek(0, 0)
                continue
            if not self.send(mysocket, chunk): # If some error or thread is killed 
                break;

            #disabling this solves the issue
            self.pushlog('print how much data sent')       

    except socket.error, msg:
        print 'print exception'
    except Exception, msg:
        print 'print exception'

    try:
        if media_file is not None:
            media_file.close()
            media_file = None            
        if mysocket is not None:
            mysocket.close()
            mysocket = None
    finally:
            print 'some cleaning'   

def kill(self):
    self.over = True

I figured out that it is because of wrong implementation of Queue as commenting that piece resolves the issue

UPDATE 2 - MainGUI class which is called from above Thread class

class MainGUI(Frame):
    def __init__(self, other args):
       #some code
       .
       .
        #from the above thread class used to send data
        self.send_mpg_status = Sendmpgthread(params)
        self.send_mpg_status.start()     
        self.after(100, self.updatelog)
        self.queuelog = Queue.Queue()

    def updatelog(self):
       try:
           msg = self.queuelog.get_nowait() 

           while msg is not None:
               self.printlog(msg)
               msg = self.queuelog.get_nowait() 
        except Queue.Empty:
           pass

        if self.send_mpg_status: # only continue when sending   
            self.after(100, self.updatelog)

    def printlog(self,msg):
        #print in GUI
DevC
  • 7,055
  • 9
  • 39
  • 58
  • How many threads are you spawning? – Jeremy Friesner Oct 03 '13 at 14:06
  • 1
    *Why* do you 'have a requirement to run it in a loop'? Requirements shouldn't dictate tiny details like that. Unless you're in non-blocking mode, one send will spend the whole thing. – user207421 Oct 04 '13 at 00:07
  • 1
    The server picks the file and plays it and it is required to be played in loop, I cannot send the file to server and let it play from there as we have reversed engineered complete linux kernal along with media player to be controlled from client – DevC Oct 04 '13 at 07:20
  • Which version of Python are you using on Windows? If it's 2.x, but not the latest 2.7.y, then please upgrade and see if that helps. – pts Oct 15 '13 at 08:09
  • It is Python 2.7, Windows 7. – DevC Oct 15 '13 at 08:10
  • Please post a more complete code sample. The example above contains a `continue` without a loop to continue in, it doesn't show when the top half of the code is called, it doesn't show what type your socket objects are, and doesn't show how the threading factors into the system. I'd be worried about an infinite recursive loop of mysocket.send(), but that depends on the types involved. – Kylotan Oct 15 '13 at 09:40
  • @Kylotan I have updated the code with more details, let me know if you have any more doubts – DevC Oct 15 '13 at 10:06
  • Now that you know that queuelog is causing the problems you should debug the updatelog function. Maybe it is not called ... are you seeing a the logs in the GUI ? – Mihai Stan Oct 17 '13 at 10:47
  • @MihaiStan yes I am seeing the log in GUI.. the issue is when it is called in the while loop it starts filling memory. I am also calling same outside loop to print timeout or exceptions and it is working without any issues – DevC Oct 17 '13 at 11:01
  • 1
    can you comment out calling self.printlog(msg), to see if the problem is in the actual queue or in printing the log ? – Mihai Stan Oct 17 '13 at 11:24
  • @MihaiStan this is simple tkinter "self.edit.insert(END, msg)" , I don't think it is causing issue. The main thing is this same method is called at so many other places and it works fine. – DevC Oct 17 '13 at 11:35
  • @DevC: In general, instead of assuming something (e.g. *I don't think it is causing the issue*), trying it is much more useful. – pts Oct 17 '13 at 11:46
  • 1
    If you are constantly calling self.edit.insert then the memory used by the edit control will always grow. But there is no need to guess, just comment it out and see what happens. A memory leak doesn't usually mean a function "doesn't work", but that some place is "accumulating memory" – Mihai Stan Oct 17 '13 at 11:46
  • @MihaiStan, pts sorry for my attitude and overconfidence, yes commenting edit.insert resolved the issue, I wonder when I print to the console then there is no issue. But edit.insert is required as I need to show details in GUI, it is causing issue when I call it fast in loop. Everywhere else it is fine. Can you suggest something – DevC Oct 18 '13 at 11:24
  • 1
    The console will only be showing the last X log messages and you could do the same for your edit control. I'll add an answer for that since comments are not the best place for this. You should also update your question mentioning the tkinter self.edit.insert call – Mihai Stan Oct 18 '13 at 15:05

4 Answers4

5

Since printlog is adding to a tkinter text control, the memory occupied by that control will grow with each message (it has to store all the log messages in order to display them).

Unless storing all the logs is critical, a common solution is to limit the maximum number of log lines displayed.

A naive implementation is to eliminate extra lines from the begining after the control reaches a maximum number of messages. Add a function to get the number of lines in the control and then, in printlog something similar to:

while getnumlines(self.edit) > self.maxloglines:
    self.edit.delete('1.0', '1.end')

(above code not tested)

update: some general guidelines

Keep in mind that what might look like a memory leak does not always mean that a function is wrong, or that the memory is no longer accessible. Many times there is missing cleanup code for a container that is accumulating elements.

A basic general approach for this kind of problems:

  • form an opinion on what part of the code might be causing the problem
  • check it by commenting that code out (or keep commenting code until you find a candidate)
  • look for containers in the responsible code, add code to print their size
  • decide what elements can be safely removed from that container, and when to do it
  • test the result
Community
  • 1
  • 1
Mihai Stan
  • 1,052
  • 6
  • 7
4

I can't see anything obviously wrong with your code snippet.

To reduce memory usage a bit under Python 2.7, I'd use buffer(block, pos) instead of block[pos:]. Also I'd use mysocket.sendall(block) instead of your send method.

If the ideas above don't solve your problem, then the bug is most probably elsewhere in your code. Could you please post the shortest possible version of the full Python script which still grows out-of-memory (http://sscce.org/)? That increases your change of getting useful help.

pts
  • 80,836
  • 20
  • 110
  • 183
  • I am looping over a mpg file, reading and sending data over tcp. I am not sure if by anyway to have only 10k chunk in memory which needs to be sent and flush everything else. – DevC Oct 15 '13 at 08:41
  • Please post a simplified, full Python source code (http://sscce.org/) to get help. – pts Oct 15 '13 at 09:34
  • For me everything seems to be OK with your updated code, but it's still incomplete. Are you sure that the code you have posted goes out of memory? Please create an http://sscce.org/ , check that it goes out of memory and post the full source code. – pts Oct 15 '13 at 11:39
  • 1
    @andrewcooke sorry I am debugging code written by someone else, and there 4000 lines of code.. couldn't judge what to post. Btw I figured out it is because of Queues, I updated the code with more details and one which is causing leaks. It is being called in loop, If possible suggest something which can resolve it, I am pretty new to queues. – DevC Oct 17 '13 at 07:18
  • @pts I have made changes in both Update1 and Update2. Also commented which part is causing leaks – DevC Oct 17 '13 at 07:19
2

Out of memory errors are indicative of data being generated but not consumed or released. Looking through your code I would guess these two areas:

  • Messages are being pushed onto a Queue.Queue() instance in the pushlog method. Are they being consumed?
  • The MainGui printlog method may be writing text somewhere. eg. Is it continually writing to some kind of GUI widget without any pruning of messages?

From the code you've posted, here's what I would try:

  1. Put a print statement in updatelog. If this is not being continually called for some reason such as a failed after() call, then the queuelog will continue to grow without bound.
  2. If updatelog is continually being called, then turn your focus to printlog. Comment the contents of this function to see if out of memory errors still occur. If they don't, then something in printlog may be holding on to the logged data, you'll need to dig deeper to find out what.

Apart from this, the code could be cleaned up a bit. self.queuelog is not created until after the thread is started which gives rise to a race condition where the thread may try to write into the queue before it has been created. Creation of queuelog should be moved to somewhere before the thread is started.

updatelog could also be refactored to remove redundancy:

def updatelog(self):
       try:
           while True:
               msg = self.queuelog.get_nowait() 
               self.printlog(msg)
        except Queue.Empty:
           pass

And I assume the the kill function is called from the GUI thread. To avoid thread race conditions, the self.over should be a thread safe variable such as a threading.Event object.

def __init__(...):
    self.over = threading.Event()

def kill(self):
    self.over.set()
Austin Phillips
  • 15,228
  • 2
  • 51
  • 50
1

There is no data piling up in your TCP sending loop.

Memory error is probably caused by logging queue, as you have not posted complete code try using following class for logging:

from threading import Thread, Event, Lock
from time import sleep, time as now


class LogRecord(object):
    __slots__ = ["txt", "params"]
    def __init__(self, txt, params):
        self.txt, self.params = txt, params

class AsyncLog(Thread):
    DEBUGGING_EMULATE_SLOW_IO = True

    def __init__(self, queue_max_size=15, queue_min_size=5):
        Thread.__init__(self)
        self.queue_max_size, self.queue_min_size = queue_max_size, queue_min_size
        self._queuelock = Lock()
        self._queue = []            # protected by _queuelock
        self._discarded_count = 0   # protected by _queuelock
        self._pushed_event = Event()
        self.setDaemon(True)
        self.start()

    def log(self, message, **params):
        with self._queuelock:
            self._queue.append(LogRecord(message, params))
            if len(self._queue) > self.queue_max_size:
                # empty the queue:
                self._discarded_count += len(self._queue) - self.queue_min_size
                del self._queue[self.queue_min_size:] # empty the queue instead of creating new list (= [])
            self._pushed_event.set()

    def run(self):
        while 1: # no reason for exit condition here
            logs, discarded_count = None, 0
            with self._queuelock:
                if len(self._queue) > 0:
                    # select buffered messages for printing, releasing lock ASAP
                    logs = self._queue[:]
                    del self._queue[:]
                    self._pushed_event.clear()
                    discarded_count = self._discarded_count
                    self._discarded_count = 0
            if not logs:
                self._pushed_event.wait()
                self._pushed_event.clear()
                continue
            else:
                # print logs
                if discarded_count:
                    print ".. {0} log records missing ..".format(discarded_count)
                for log_record in logs:
                    self.write_line(log_record)
                if self.DEBUGGING_EMULATE_SLOW_IO:
                    sleep(0.5)

    def write_line(self, log_record):
        print log_record.txt, " ".join(["{0}={1}".format(name, value) for name, value in log_record.params.items()])



if __name__ == "__main__":
    class MainGUI:
        def __init__(self):
            self._async_log = AsyncLog()
            self.log = self._async_log.log # stored as bound method

        def do_this_test(self):
            print "I am about to log 100 times per sec, while text output frequency is 2Hz (twice per second)"

            def log_100_records_in_one_second(itteration_index):
                for i in xrange(100):
                    self.log("something happened", timestamp=now(), session=3.1415, itteration=itteration_index)
                    sleep(0.01)

            for iter_index in range(3):
                log_100_records_in_one_second(iter_index)

    test = MainGUI()
    test.do_this_test()

I have noticed that you do not sleep() anywhere in the sending loop, this means data is read as fast as it can and is sent as fast as it can. Note that this is not desirable behavior when playing media files - container time-stamps are there to dictate data-rate.

Alex
  • 2,837
  • 5
  • 25
  • 27