-1

I want to be able to run multiple threads without actually making a new line for every thread I want to run. In the code below I cannot dynamically add more accountIDs, or increase the #of threads just by changing the count on thread_count

For example this is my code now:

    import threading
    def get_page_list(account,thread_count):
        return list_of_pages_split_by_threads

    def pull_data(page_list,account_id):
        data = api(page_list,account_id)
        return data

    if __name__ == "__main__":
        accountIDs = [100]

        #of threads to make:
        thread_count = 3

        #Returns a list of pages ie : [[1,2,3],[4,5,6],[7,8,9,10]]
        page_lists =  get_page_list(accountIDs[0],thread_count)

        t1 = threading.Thread(target=pull_data, args=(page_list[0],accountIDs[0]))
        t2 = threading.Thread(target=pull_data, args=(page_list[1],accountIDs[0]))
        t3 = threading.Thread(target=pull_data, args=(page_list[2],accountIDs[0]))

        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()

This is where I want to get to:

Anytime I want to add an additional thread if the server can handle it or add additional accountIDs I dont have to reproduce the code?

IE (This example is what I would like to do, but the below doesnt work it tries to finish a whole list of pages before moving on to the next thread)

if __name__ == "__main__":
    accountIDs = [100,101,103]
    thread_count = 3
    for account in accountIDs:
        page_lists =  get_page_list(account,thread_count)
        for pg_list in page_list:
            t1 = threading.Thread(target=pull_data, args=(pg_list,account))
            t1.start()
            t1.join()
chowpay
  • 1,515
  • 6
  • 22
  • 44

2 Answers2

2

One way of doing it is using Pool and Queue.

The pool will keep working while there are items in the queue, without holding the main thread.

Chose one of these imports:

import multiprocessing as mp (for process based parallelization)
import multiprocessing.dummy as mp (for thread based parallelization)

Creating the workers, pool and queue:

the_queue = mp.Queue() #store the account ids and page lists here


def worker_main(queue):
    while waiting == True:
        while not queue.empty():
            account, pageList = queue.get(True) #get an id from the queue
            pull_data(pageList, account)


waiting = True
the_pool = mp.Pool(num_parallel_workers, worker_main,(the_queue,))
#                                  don't forget the coma here  ^

accountIDs = [100,101,103]
thread_count = 3
for account in accountIDs:
    list_of_page_lists =  get_page_list(account, thread_count)
    for pg_list in page_list:
        the_queue.put((account, pg_list))

....

waiting = False #while you don't do this, the pool will probably never end. 
                #not sure if it's a good practice, but you might want to have
                #the pool hanging there for a while to receive more items   
the_pool.close()
the_pool.join()

Another option is to fill the queue first, create the pool second, use the worker only while there are items in the queue.

Then if more data arrives, you create another queue, another pool:

import multiprocessing.dummy as mp
#if you are not using dummy, you will probably need a queue for the results too
#as the processes will not access the vars from the main thread
#something like worker_main(input_queue, output_queue):   
#and pull_data(pageList,account,output_queue)
#and mp.Pool(num_parallel_workers, worker_main,(in_queue,out_queue))    
#and you get the results from the output queue after pool.join()

the_queue = mp.Queue() #store the account ids and page lists here


def worker_main(queue):
    while not queue.empty():
        account, pageList = queue.get(True) #get an id from the queue
        pull_data(pageList, account)

accountIDs = [100,101,103]
thread_count = 3
for account in accountIDs:
    list_of_page_lists =  get_page_list(account, thread_count)
    for pg_list in page_list:
        the_queue.put((account, pg_list))


the_pool = mp.Pool(num_parallel_workers, worker_main,(the_queue,))
#                                  don't forget the coma here  ^

the_pool.close()
the_pool.join()

del the_queue
del the_pool   
Daniel Möller
  • 84,878
  • 18
  • 192
  • 214
  • Pooling is definitely the way to go. Another approach to pooled execution is [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor). – kungphu Oct 08 '19 at 03:35
  • I keep getting this error: not sure what to make of it `the_pool.join() File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 545, in join assert self._state in (CLOSE, TERMINATE)` – chowpay Oct 08 '19 at 03:56
  • removing `the_pool.join()` seems to work .. sounds important is it ok to leave out? and why would it fail you think? – chowpay Oct 08 '19 at 04:05
0

I couldn't get MP to work correctly so I did this instead and it seems to work great. But MP is probably the better way to tackle this problem

    #Just keeps track of the threads
    threads = []
    #Generates a thread for whatever variable thread_count = N
    for thread in range(thread_count):
        #function retrns a list of pages stored in page_listS, this ensures each thread gets a unique list.
        page_list = page_lists[thread]
        #actual fucntion for each thread to work
        t = threading.Thread(target=pull_data, args=(account,thread))
        #puts all threads into a list
        threads.append(t)
        #runs all the treads up
        t.start()
    #After all threads are complete back to the main thread.. technically this is not needed
    for t in threads:
        t.join()

I also didn't understand why you would "need" .join() great answer here: what is the use of join() in python threading

chowpay
  • 1,515
  • 6
  • 22
  • 44
  • I updated my answer as I suspect the problem was the worker hanging there eternally with `while True:`. See the new versions and check whether they work. – Daniel Möller Oct 10 '19 at 12:39
  • Don't manually manage groups of processes/threads! It's exactly what pools are for; they manage multiple processes/threads and take into account failure conditions to avoid leaks and other issues that are _very_ easy to run into with multithreading. Check out [`concurrent.futures.ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor). Python also has `multiprocessing.pool.ThreadPool`, which mimics [`multiprocessing.pool.Pool`](https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing.pool.Pool) with threads instead of processes. – kungphu Oct 25 '19 at 04:40