2

I have two threads (producer and consumer) and I share the data with Queue. The problem is, that when I forcefully abort the producer, the consumer sometimes locks.

I read in the docs that canceling a thread with queue might corrupt the queue and cause deadlock. I don't acquire any locks explicitly but reading the source of Queue.py says that put and get are doing so.

Please, does anyone know of it might be the case that when I abort the thread, it might be in the middle of get/put, i.e. using the lock and then not releasing it? What can I do about it? I sometimes need to terminate the producer prematurely. Would using processes, instead of threads, make any difference?

Ecir Hana
  • 10,864
  • 13
  • 67
  • 117

2 Answers2

0

Most probably your deadlock is due to not finished threads. If you have linux you can use injector from pyrasite to print backtrace (you would know where you program hung)

If you are using any locks in your signal handler - then probably this is your deadlock (that's a bit complicated, please ask if you want explanation for that)

Creating processes instead of threads surely change situation but remember that any data exchange and synchronization is much complicated.

Wilfred Hughes
  • 29,846
  • 15
  • 139
  • 192
ddzialak
  • 1,042
  • 7
  • 15
  • How do I "finish" a thread? I thought that when run() reaches the end it's finished. And I don't use any signal handler myself, maybe Queue.py does? http://hg.python.org/cpython/file/2.7/Lib/Queue.py – Ecir Hana May 25 '12 at 22:04
  • Yes, if run() reach end then it's finished (for elegance main thread should do "join()" on it) -> you can add debug information to be sure that your thread (producer/consument) ended because it may be hung on queue.get() or even queue.put() (!) – ddzialak May 25 '12 at 22:08
  • I think this "hanging" is exactly what's happening. The question is, I do abort one thread so it wont reach the end - what am I supposed to do? I mean, I need to abort it but then the queue seems to hang..? – Ecir Hana May 25 '12 at 22:14
  • If you abort producer then consumer that is waiting on queue.get() must be awaken, so you can not just abort one thread. – ddzialak May 25 '12 at 22:23
  • I've sent "MyQueue" class that is a bit simpler and got "abort" function - you can verify. If you still need more help then please say something more at least about system (linux/windows/mac) and python version, then I will suggest how to find the location of deadlock. – ddzialak May 25 '12 at 22:47
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/11763/discussion-between-ddzialak-and-ecir-hana) – ddzialak May 26 '12 at 01:07
0

Maybe this will help:

import threading

class MyQueue:
    def __init__(self):
        self.tasks = []
        self.tlock = threading.Semaphore(0)
        self.dlock = threading.Lock()
        self.aborted = False

    def put(self, arg):
        try:
            self.dlock.acquire()
            self.tasks.append(arg)
        finally:
            self.dlock.release()
            self.tlock.release()

    def get(self):
        if self.aborted:
            return None
        self.tlock.acquire()
        if self.aborted:
            self.tlock.release()
            return None
        try:
            self.dlock.acquire()
            if self.tasks:
                return self.tasks.pop()
            else: # executed abort
                return None
        finally:
            self.dlock.release()

    def abort(self):
        self.aborted = True
        self.tlock.release()

# TESTING

mq = MyQueue()
import sys

def tlog(line):
    sys.stdout.write("[ %s ] %s\n" % (threading.currentThread().name, line))
    sys.stdout.flush()

def reader():
    arg = 1
    while arg is not None:
        tlog("start reading")
        arg = mq.get()
        tlog("read: %s" % arg)
    tlog("END")

import time, random
def writer():
    try:
        pos = 1
        while not mq.aborted:
            x = random.random() * 5
            tlog("writer sleep (%s)" % x)
            pending = x
            while pending > 0:
                tosleep = min(0.5, pending)
                if mq.aborted:
                    return
                time.sleep(tosleep)
                pending -= tosleep

            tlog("write: %s" % x)
            mq.put("POS %s  val=%s" % (pos, x))
            pos += 1
    finally:
        tlog("writer END")

def testStart():
    try:
        for i in xrange(9):
            th = threading.Thread(None, reader, "reader %s" % i, (), {}, None)
            th.start()
        for i in xrange(3):
            th = threading.Thread(None, writer, "writer %s" % i, (), {}, None)
            th.start()
        time.sleep(30) # seconds for testing
    finally:
        print "main thread: abort()"
        mq.abort()

if __name__ == "__main__":
    testStart()
ddzialak
  • 1,042
  • 7
  • 15
  • Thank you very much, but I need to abort the thread as above because a task might take too long to finish. – Ecir Hana May 26 '12 at 07:17