1

I want to add 5 for every element in range(1,100) with threading module, to watch which rusult is in which thread. I finished almost of the code,but how to pass argument into threading.Thread?

import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
    y.put(i)

def myadd(x):
    print(x+5)


for i in range(5):
    print(threading.Thread.getName())
    threading.Thread(target=myadd,args=x).start() #it is wrong here
    y.join()

Thinks to dano ,it is ok now ,in order to run in interactive way, i rewrite it as:

method 1:run in interactive way.

from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)

def myadd(x):
    print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))

def run():
    t = ThreadPoolExecutor(max_workers=5)
    t.map(myadd, x)
    t.shutdown()
run()

methdo 2:

from concurrent.futures import ThreadPoolExecutor
import threading
x = range(1, 100)
def myadd(x):
    print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))
def run():
    t = ThreadPoolExecutor(max_workers=5)
    t.map(myadd, x)
    t.shutdown()
if __name__=="__main__":
    run()

What about if more args to be passed into ThreadPoolExecutor? I want to calculate 1+3, 2+4, 3+45 until 100+102 with multi-processing module. And what about 20+1,20+2,20+3 until 20+100 with multi-processing module?

from multiprocessing.pool import ThreadPool
do = ThreadPool(5)
def myadd(x,y):
    print(x+y)

do.apply(myadd,range(3,102),range(1,100))

How to fix it?

showkey
  • 482
  • 42
  • 140
  • 295
  • Are you trying to create a thread pool with the `for i in range(5)` thing you're doing? – dano Aug 01 '14 at 03:32
  • You put a hundred things on a Queue and never take them off, then `.join()` to wait for it to become empty? That's not going to fly. Surely your `myadd()` function should have no argument but read an element from the Queue. Then you stand some chance of detecting it's empty. Sadly at present the `join()` call stops at the first iteration because the Queue isn't empty, and has no chance to become so. So I suspect you see one line of output then your program stalls. It would have been helpful to have such information ... – holdenweb Aug 01 '14 at 03:42

3 Answers3

2

Here you need to pass a tuple rather than using a single element.

For making a tuple the code would be .

dRecieved = connFile.readline();
processThread = threading.Thread(target=processLine, args=(dRecieved,)); 
processThread.start();

Please refer here for the more explanation

Community
  • 1
  • 1
Avinash Babu
  • 6,171
  • 3
  • 21
  • 26
0

It looks like you're trying to create a thread pool manually, so that five threads are used to add up all 100 results. If this is the case, I would recommend using multiprocessing.pool.ThreadPool for this:

from multiprocessing.pool import ThreadPool
import threading
import queue

x = range(1, 100)

def myadd(x):
    print("Current thread: {}. Result: {}.".format(
               threading.current_thread(), x+5))

t = ThreadPool(5)
t.map(myadd, x)
t.close()
t.join()

If you're using Python 3.x, you could use concurrent.futures.ThreadPoolExecutor instead:

from concurrent.futures import ThreadPoolExecutor
import threading

x = range(1, 100)

def myadd(x):
    print("Current thread: {}. Result: {}.".format(threading.current_thread(), x+5))

t = ThreadPoolExecutor(max_workers=5)
t.map(myadd, x)
t.shutdown()

I think there are two issues with your original code. First, you need to pass a tuple to the args keyword argument, not a single element:

threading.Thread(target=myadd,args=(x,))

However, you're also trying to pass the entire list (or range object, if using Python 3.x) returned by range(1,100) to myadd, which isn't really what you want to do. It's also not clear what you're using the queue for. Maybe you meant to pass that to myadd?

One final note: Python uses a Global Interpreter Lock (GIL), which prevents more than one thread from using the CPU at a time. This means that doing CPU-bound operations (like addition) in threads provides no performance boost in Python, since only one of the threads will ever run at a time. Therefore, In Python it's preferred to use the multiple processes to parallelize CPU-bound operations. You could make the above code use multiple processes by replacing the ThreadPool in the first example with from mulitprocessing import Pool. In the second example, you would use ProcessPoolExecutor instead of ThreadPoolExecutor. You would also probably want to replace threading.current_thread() with os.getpid().

Edit:

Here's how to handle the case where there are two different args to pass:

from multiprocessing.pool import ThreadPool

def myadd(x,y):
    print(x+y)

def do_myadd(x_and_y):
    return myadd(*x_and_y)

do = ThreadPool(5)    
do.map(do_myadd, zip(range(3, 102), range(1, 100)))

We use zip to create a list where we pair together each variable in the range:

[(3, 1), (4, 2), (5, 3), ...]

We use map to call do_myadd with each tuple in that list, and do_myadd uses tuple expansion (*x_and_y), to expand the tuple into two separate arguments, which get passed to myadd.

dano
  • 91,354
  • 19
  • 222
  • 219
  • some thing wrong in example 2,there is no join() ,when i run it ,can't exit from it,some thraeds can not be killed,how to fix it? – showkey Aug 01 '14 at 05:53
  • @it_is_a_literature I've added in the calls to properly close/shutdown both pool objects. However, leaving those out shouldn't have prevented the threads from being killed, as far as I know. Does it work with the `shutdown` call added in? – dano Aug 01 '14 at 14:12
  • would you mind to help me solve the problem http://stackoverflow.com/questions/25075356/how-to-write-the-map-sentence-here-to-call-array-to-calculate-with-threadpool-mo ? – showkey Aug 01 '14 at 14:31
  • it is t.map(myadd, x) t.join() t.close() ,not t.map(myadd, x) t.close() t.join() . – showkey Aug 02 '14 at 05:31
  • @it_is_a_literature, no [the docs](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.join) state you must call `close` before `join`. – dano Aug 02 '14 at 13:58
  • think for telling me,a new problem to see the my edited post at the top,apply function can't run. – showkey Aug 02 '14 at 14:05
  • @it_is_a_literature I edited my answer to show you how to do what you're trying. But in the future, you should ask a new question when you have new requirements, rather than change the question and unaccept the accepted answer. – dano Aug 02 '14 at 15:23
0

From:

import threading,queue
x=range(1,100)
y=queue.Queue()
for i in x:
    y.put(i)

def myadd(x):
    print(x+5)


for i in range(5):
    print(threading.Thread.getName())
    threading.Thread(target=myadd,args=x).start() #it is wrong here
    y.join()

To:

import threading
import queue

# So print() in various threads doesn't garble text; 
# I hear it is better to use RLock() instead of Lock().
screen_lock = threading.RLock() 

# I think range() is an iterator or generator. Thread safe?
argument1 = range(1, 100)
argument2 = [100,] * 100 # will add 100 to each item in argument1

# I believe this creates a tuple (immutable). 
# If it were a mutable object then perhaps it wouldn't be thread safe.
data = zip(argument1, argument2)

# object where multiple threads can grab data while avoiding deadlocks.
q = queue.Queue()

# Fill the thread-safe queue with mock data
for item in data:
    q.put(item)

# It could be wiser to use one queue for each inbound data stream.
# For example one queue for file reads, one queue for console input,
# one queue for each network socket. Remembering that rates of 
# reading files and console input and receiving network traffic all
# differ and you don't want one I/O operation to block another.
# inbound_file_data = queue.Queue()
# inbound_console_data = queue.Queue() # etc.

# This function is a thread target
def myadd(thread_name, a_queue):

    # This thread-targetted function blocks only within each thread;
    # at a_queue.get() and at a_queue.put() (if queue is full).
    #
    # Each thread targetting this function has its own copy of
    # this functions local() namespace. So each thread will 
    # pause when the queue is empty, on queue.get(), or when 
    # the queue is full, on queue.put(). With one queue, this 
    # means all threads will block at the same time, when the 
    # single queue is full or when the single queue is empty 
    # unless we check for the number of remaining items in the
    # queue before we do a queue.get() and if none remain in the 
    # queue just exit this function. This presumes the data is 
    # not a continues and slow stream like a network connection 
    # or a rotating log file but limited like a closed file.

    # Let each thread continue to read from the global 
    # queue until it is empty. 
    # 
    # This is a bad use-case for using threading. 
    # 
    # If each thread had a separate queue it would be 
    # a better use-case. You don't want one slow stream of 
    # data blocking the processing of a fast stream of data.
    #
    # For a single stream of data it is likely better just not 
    # to use threads. However here is a single "global" queue 
    # example...

    # presumes a_queue starts off not empty
    while a_queue.qsize():
        arg1, arg2 = a_queue.get() # blocking call

        # prevent console/tty text garble
        if screen_lock.acquire():
            print('{}: {}'.format(thread_name, arg1 + arg2))
            print('{}: {}'.format(thread_name, arg1 + 5))
            print()
            screen_lock.release()
        else:
            # print anyway if lock fails to acquire
            print('{}: {}'.format(thread_name, arg1 + arg2))
            print('{}: {}'.format(thread_name, arg1 + 5))
            print()

        # allows .join() to keep track of when queue finished
        a_queue.task_done()


# create threads and pass in thread name and queue to thread-target function
threads = []
for i in range(5):
    thread_name = 'Thread-{}'.format(i)
    thread = threading.Thread(
        name=thread_name, 
        target=myadd, 
        args=(thread_name, q))

    # Recommended:
    # queues = [queue.Queue() for index in range(len(threads))] # put at top of file 
    # thread = threading.Thread(
    #   target=myadd, 
    #   name=thread_name, 
    #   args=(thread_name, queues[i],))
    threads.append(thread)

# some applications should start threads after all threads are created.
for thread in threads:
   thread.start()

# Each thread will pull items off the queue. Because the while loop in 
# myadd() ends with the queue.qsize() == 0 each thread will terminate 
# when there is nothing left in the queue.
DevPlayer
  • 5,393
  • 1
  • 25
  • 20