3

I am about to start on an endevour with python. The goal is to multithread different tasks and use queues to communicate between tasks. For the sake of clarity I would like to be able to pass a queue to a sub-function, thus sending information to the queue from there. So something similar like so:

from queue import Queue
from threading import Thread
import copy

# Object that signals shutdown
_sentinel = object()

# increment function
def increment(i, out_q):
    i += 1
    print(i)
    out_q.put(i)
    return

# A thread that produces data
def producer(out_q):
    i = 0
    while True:
        # Produce some data
        increment( i , out_q)

        if i > 5:
            out_q.put(_sentinel)
            break

# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        # Process the data

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break


# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

# Wait for all produced items to be consumed
q.join()

Currently the output is a row of 0's, where I would like it to be the numbers 1 to 6. I have read the difficulty of passing references in python, but would like to clarify if this is just not possible in python or am I looking at this issue wrongly?

JoeyD
  • 277
  • 3
  • 14

1 Answers1

4

The problem has nothing to do with the way the queues are passed; you're doing that right. The issue is actually related to how you're trying to increment i. Because variable in python are passed by assignment, you have to actually return the incremented value of i back to the caller for the change you made inside increment to have any effect. Otherwise, you just rebind the local variable i inside of increment, and then i gets thrown away when increment completes.

You can also simplify your consume method a bit by using the iter built-in function, along with a for loop, to consume from the queue until _sentinel is reached, rather than a while True loop:

from queue import Queue
from threading import Thread
import copy

# Object that signals shutdown
_sentinel = object()

# increment function
def increment(i):
    i += 1
    return i

# A thread that produces data
def producer(out_q):
    i = 0
    while True:
        # Produce some data
        i = increment( i )
        print(i)
        out_q.put(i)
        if i > 5:
            out_q.put(_sentinel)
            break

# A thread that consumes data
def consumer(in_q):
    for data in iter(in_q.get, _sentinel):
        # Process the data
        pass


# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

Output:

1
2
3
4
5
6
Community
  • 1
  • 1
dano
  • 91,354
  • 19
  • 222
  • 219