0

I'm currently learning to use threads in Python, and I'm playing around with this dummy bit of code for practice:

import threading
import queue
import time

my_queue = queue.Queue()

lock = threading.Lock()

for i in range(5):

    my_queue.put(i)

def something_useful(CPU_number):

    while not my_queue.empty():

        lock.acquire()
        print("\n CPU_C " + str(CPU_number) + ": " + str(my_queue.get()))
        lock.release()

    print("\n CPU_C " + str(CPU_number) + ": the next line is the return")

    return

number_of_threads = 8

practice_threads = []

for i in range(number_of_threads):
    thread = threading.Thread(target=something_useful, args=(i, ))
    practice_threads.append(thread)
    thread.start()

All this does is create a queue with 5 items, and pull them out and print them with different threads.

What I noticed, though, is that some of the threads aren't terminating properly. For example, if I later add something to the queue (e.g. my_queue.put(7)) then some thread will instantly print that number.

That's why I added the last print line print("\n CPU_C " + str(CPU_number) + ": the next line is the return"), and I noticed that only one thread will terminate. In other words, when I run the code above, only one thread will print "the next line is the return".

The weird thing is, this issue disappears when I remove the lock. Without the lock, it works perfectly fine.

What am I missing?

amanb
  • 5,276
  • 3
  • 19
  • 38
Mohamad Zeina
  • 404
  • 3
  • 20
  • 1
    Generally speaking, using a lock to access a queue somewhat defeats the purpose of the `queue`. You might just as well use a list or `collections.deque` instead of `queue.Queue` in your code. See [here](https://stackoverflow.com/questions/35160417/threading-queue-working-example) for one example without a lock. How you end your threads (the `except queue.Empty:`) depends on your problem. Commo alternatives are some stop flag (which you would check for instead of simply returning) or putting some "stop" item on the queue to stop a thread (then you no longer need a timeout for the `get`). – stephan Mar 31 '18 at 20:40

1 Answers1

3

Actually it's not just 1 thread that will give the next line is the return. There can be anywhere between 1 to 8.

In my executions, sometimes i got 1,3,4,5,6,7 or 1,2,3,4,5,6,7 or 1,4,5,6,7 or only 5,6,7 etc.

You have a race-condition.

The race condition is in between the while check not my_queue.empty() and the lock.acquire()

Essentially, the .empty() could give you a "it is not empty" but before you acquired the lock, something else could have taken that value out. Hence you need to do your checks for these things within the lock.

Here is a safer implementation:

import threading
import queue
import time

my_queue = queue.Queue()

lock = threading.Lock()

for i in range(50):
    my_queue.put(i)

def something_useful(CPU_number):
    while True:
        lock.acquire()
        if not my_queue.empty():
            print("CPU_C " + str(CPU_number) + ": " + str(my_queue.get()))
            lock.release()
        else:
            lock.release()
            break

    print("CPU_C " + str(CPU_number) + ": the next line is the return")

    return

number_of_threads = 8

practice_threads = []

for i in range(number_of_threads):
    thread = threading.Thread(target=something_useful, args=(i, ))
    practice_threads.append(thread)
    thread.start()

Note: in you're current code as you're only getting the value - it's always a blocker i.e. only 1 thread at a time for the whole loop. Ideally you would do:

if not my_queue.empty():
    val = my_queue.get()
    lock.release()
    print("CPU_C " + str(CPU_number) + ": " + str(val))
    heavy_processing(val)  # While this is going on another thread can read the next val
AbdealiLoKo
  • 3,261
  • 2
  • 20
  • 36
  • Thanks, I. didn't realise this. Any idea how to fix it? – Mohamad Zeina Mar 31 '18 at 16:20
  • The [docs](https://docs.python.org/3.6/library/threading.html#threading.Lock) say: "Once a thread has acquired a lock, subsequent attempts to acquire it block, until it is released; any thread may release it." – amanb Mar 31 '18 at 16:23