0

As an attempt to create an MWE of a more complicated code, I have a function do() which is supposed to be run by two threads. A shared variable temp should be manipulated by both threads using what queue module brings. Here is the code:

import random
from threading import Thread
from queue import Queue

epochs = 2

success = [0 for x in range(epochs)]

def do(temp,q):
    temp = q.get()
    if random.random() > 0.5:
        temp += 1
    print("Teh current temp is {0}".format(temp))
    return q.put(temp)

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    temp = 0
    q = Queue()
    Thread(target=do, args=(temp,q)).start()
    Thread(target=do, args=(temp,q)).start()
    success[epoch] = temp

However the output is just

iteration: 0
iteration: 1

Obviously, even the print commends of do()s are not returned.

Can one address what I am doing wrong here?

User
  • 952
  • 2
  • 21
  • 43
  • Does this answer your question? [Python creating a shared variable between threads](https://stackoverflow.com/questions/17774768/python-creating-a-shared-variable-between-threads) – Thomas Dignan Dec 30 '20 at 00:14
  • you're not sharing that variable, each thread is getting a copy of it and what you're seeing is the result of your random() value's effect on that if statement. – Thomas Dignan Dec 30 '20 at 00:14
  • @ThomasDignan: I'd rather not use global variables for that sharing mechanism. Any solution using queue would be appreciated, then. – User Dec 30 '20 at 00:17
  • you could have race conditions for the queue object, you should think more about the problem and post more details -- if you want to use the queue you'll need to put temp on the queue before you pass it in to the threads and there's no need to pass it in args -- then you'll need to use synchronization around the queue unless your design can tolerate async accesses to it -- note the queue is thread safe, but your design seems to depend on using it to get a single variable - it's possible between the get and put call you'll starve the other thread i.e. there might be nothing there for it. – Thomas Dignan Dec 30 '20 at 00:25
  • 2
    There are several things wrong with your code. First, your function does a `q.get()` but prior to that there has never been a `q.put` call, so you will hang forever. Second, assuming the queue was not empty, you are assigning `q.get()` to variable `temp`, which is the argument to your `do` function. What is even the point of passing `temp` to this function is you never read it and instead overlay it immediately? Thirdly, if you are trying to initialize `success` to be a list of `epochs` zeroes, then just do: `success = [0] * epochs`. There's more I could say, but I've run out of characters.. – Booboo Dec 30 '20 at 00:43
  • You really should save references to your threads and then call `join()` on them to wait for them to terminate, I have no idea what you are even trying to accomplish. – Booboo Dec 30 '20 at 00:44
  • @Booboo: Your comments, especially the one regarding the wrong put/get order, saved me. Thanks. – User Dec 30 '20 at 00:54
  • If you are attempting to have multiple threads update a shared variable, forget the queue and investigate using a `threadling.Lock` instance to serialize access. – Booboo Dec 30 '20 at 01:52

1 Answers1

1

Because threads use the same memory so you could use global variable like in normal program

import random
from threading import Thread

epochs = 2

success = [0] * epochs

def do():
    global temp
    
    if random.random() > 0.5:
        print('  add')
        temp += 1

    print("Teh current temp is {0}".format(temp))


for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    temp = 0
    
    t1 = Thread(target=do)
    t2 = Thread(target=do)

    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    success[epoch] = temp

You could eventually use Lock() to block other threads when you access shared variable. And eventually you can send lock as argument. But because Python can't run two threads at the same time so I don't know if it is really needed.

import random
from threading import Thread
from threading import Lock

epochs = 2

success = [0] * epochs

def do(lock):
    global temp
    
    if random.random() > 0.5:
        print('  add')
        lock.acquire()  # block other threads
        temp += 1
        lock.release()  # unblock other threads
        
    print("Teh current temp is {0}".format(temp))

lock = Lock()

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    temp = 0
    
    t1 = Thread(target=do, args=(lock,))
    t2 = Thread(target=do, args=(lock,))

    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    success[epoch] = temp

If you would have to only add some value without displaing then I would rather use queue to send 0 or 1 to main thread and add it to temp in main thread.

import random
from threading import Thread
from queue import Queue

epochs = 2

success = [0] * epochs

def do(q):
    if random.random() > 0.5:
        q.put(1)
    else:
        q.put(0)

q = Queue()

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    temp = 0
    
    t1 = Thread(target=do, args=(q,))
    t2 = Thread(target=do, args=(q,))

    t1.start()
    t2.start()
    
    temp += q.get()
    temp += q.get()
    
    t1.join()
    t2.join()
    
    success[epoch] = temp
    
    print(temp)

And this method should works also with multiprocessking, ray, joblib, etc.


EDIT:

Last version with ray. I use bigger epochs and more processes in epoch

import random
import ray


ray.init()

@ray.remote
def do():
    if random.random() > 0.5:
        print('do: 1')
        return 1
    else:
        print('do: 0')
        return 0

epochs = 5

success = [0] * epochs

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    results = ray.get([do.remote() for _ in range(5)])
    
    temp = sum(results)
    
    success[epoch] = temp
    
    print('Temp:', temp)

Last version with joblib

import random
from joblib import Parallel, delayed


def do():
    if random.random() > 0.5:
        print('do: 1')
        return 1
    else:
        print('do: 0')
        return 0

epochs = 5

success = [0] * epochs

pool = Parallel(n_jobs=3)

for epoch in range(epochs):
    print("iteration: {0}".format(epoch))
    
    #results = Parallel(n_jobs=3)(delayed(do)() for _ in range(5))
    results = pool(delayed(do)() for _ in range(5))
    
    temp = sum(results)
    
    success[epoch] = temp
    
    print('Temp:', temp)
furas
  • 134,197
  • 12
  • 106
  • 148
  • for last example I added version in `ray` and `joblib` – furas Dec 30 '20 at 02:05
  • Thanks for all the variations, especially the `ray` one, as my application may be better handled in a multiprocessing way rather than a multithreading one. – User Dec 30 '20 at 02:06
  • For what it's worth, appending to a list (as an example) is thread safe. But augmented assignment operators (e.g. `+=`) would not be. According to the docs: **Unlike normal assignments, augmented assignments evaluate the left-hand side *before* evaluating the right-hand side.** So the assignment does not appear to be atomic and a lock would be necessary if two threads were doing augmented assignments against the same shared variable. – Booboo Dec 30 '20 at 02:41
  • @Booboo you right, I assumed that `+=` will block other threads all time. – furas Dec 30 '20 at 03:17