2

I'm trying to implement multithreading to a very time consuming program, and I've come across this SO answer: https://stackoverflow.com/a/28463266/3451339, which basically offers this solution for multiple arrays:

from multiprocessing.dummy import Pool as ThreadPool

pool = ThreadPool(4)
results = pool.map(my_function, my_array)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

and, passing multiple arrays:

results = pool.starmap(function, zip(list_a, list_b))

The following is the code I have so far which must be refactored with threading. It iterates over 4 arrays, and needs to pass arguments to the function at each iteration and append all results to a final container:

    strategies = ['strategy_1', 'strategy_2']
    budgets = [90,100,110,120,130,140,150,160]
    formations=['343','352','433','442','451','532','541']
    models = ['model_1', 'model_2', 'model_3']

    all_teams = pd.DataFrame()

    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                for model in models:

                    team = function(strategy=strategy, 
                                    budget=budget, 
                                    curr_formation=formation,
                                    model=model)
                       
                    all_teams = all_teams.append(team, ignore_index=True, sort=False)\
                                         .reset_index(drop=True)\
                                         .copy()

Note: Each function call makes api web requests.

What is the way to go with multithreading in this scenario?

8-Bit Borges
  • 9,643
  • 29
  • 101
  • 198
  • First, multithreading in python does not make processor-intensive programs faster (they actually make them slightly slower). Python multithreading excels when you have a large amount of idle time, for example, a program that downloads 40 images, or a program that waits for 3 seconds 500 times in parallel (cant figure out the use of this one but it seems to be popular on forums). It is not fast when generating 40,000,000,000 random numbers. (i learned this the hard way) – a1cd May 05 '21 at 00:29
  • But can the code be run in parallel somehow? – 8-Bit Borges May 05 '21 at 00:42
  • not a real parallel, it's just splitting up the tasks into micro tasks and quickly switching between them. Python was built in a way that means it can never use multiple cores or threads. You can do artificial parallel but it's actually slower (unless you're doing web requests). How you do this, I cannot remember nor do I want to. I upvoted because this is a good question. – a1cd May 05 '21 at 00:49

2 Answers2

1

Python has the multiprocessing module which can run multiple tasks in parallel and inside each process you can have multiple threads or async io code

Here is a working example which uses 3 Processes and Multithreading

import pandas as pd
import multiprocessing
from multiprocessing import Queue
from threading import Thread

strategies = ['strategy_1', 'strategy_2']
budgets = [90,100,110,120,130,140,150,160]
formations=['343','352','433','442','451','532','541']
models = ['model_1', 'model_2', 'model_3']

 #shared Queue if you want to reduce write locking use 3 Queues
Q = Queue()

# Retrive async if you want to speed up the process
def function(q,strategy,budget,curr_formation,model):
    q.put("Team")

def runTask(model,q):
    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                Thread(target=function,args=(q,strategy,budget,formation,model)).start()

def main():
    p1 = multiprocessing.Process(target=runTask, args=('model_1',Q))
    p2 = multiprocessing.Process(target=runTask, args=('model_2',Q))
    p3 = multiprocessing.Process(target=runTask, args=('model_3',Q))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    all = []
    for i in range(0,Q.qsize()):
        all.append(Q.get())
    print(all)
    print(len(all))

if __name__ == "__main__": 
    main()

A usefull article Multiprocessing in Python | Set 2

firephil
  • 830
  • 10
  • 18
  • 1
    Thank you for your answer, but your solution is generic and does not acknowledge my code above, does it? – 8-Bit Borges May 05 '21 at 01:36
  • Split the work at 3 parts for each model which will run in 3 processes this way you reduce the cyclomatic complexity by 1. I will update the example. – firephil May 05 '21 at 01:45
  • @8-BitBorges i think the code is complete now, have a look and let me know – firephil May 05 '21 at 06:40
1

This can be one approach.

Note: Thread vs multiProcess. In this SO, I have provided execution through map, that will not work here as map has limitation on number.

  1. Run your nested for loops and build a list of parameters ==> financial_options
    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                for model in models:
                    financial_options.append([strategy,budget,formation,model])
    financial_options_len=len(financial_options)
  1. Create a new function that will handle API calls
def access_url(url,parameter_list):
    #response=requests.get(url) # request goes here
    print(parameter_list)
    time.sleep(2)
    print("sleep done!")
    return "Hello"#,parameter_list # return type

now run the threading with these permutation parameters. so complete program will look like this:

import concurrent.futures
import requests # just in case needed
from bs4 import BeautifulSoup # just in case needed
import time
import pandas as pd

def access_url(url,parameter_list):
    #response=requests.get(url) # request goes here
    print(parameter_list)
    time.sleep(2)
    print("sleep done!")
    return "Hello"#,parameter_list # return type

def multi_threading():
    test_url="http://bla bla.com/"
    base_url=test_url
    THREAD_MULTI_PROCESSING= True
    
    
    strategies = ['strategy_1', 'strategy_2']
    budgets = [90,100,110,120,130,140,150,160]
    formations=['343','352','433','442','451','532','541']
    models = ['model_1', 'model_2', 'model_3']

    all_teams = pd.DataFrame()
    start = time.perf_counter() # start time for performance
    financial_options=[]
    decision_results=[]
    for strategy in strategies:
        for budget in budgets:
            for formation in formations:
                for model in models:
                    financial_options.append([strategy,budget,formation,model])
    financial_options_len=len(financial_options)
    print(f"Total options:{financial_options_len}")
    future_list = []
    THREAD_MULTI_PROCESSING_LOOP=True
    if THREAD_MULTI_PROCESSING_LOOP:
        with concurrent.futures.ThreadPoolExecutor() as executor: # Through executor
            for each in range(financial_options_len):
                future = executor.submit(access_url,test_url,financial_options[each]) # submit each option
                future_list.append(future)    
        for f1 in concurrent.futures.as_completed(future_list):
            r1=f1.result()
            decision_results.append(r1)
        
    end = time.perf_counter() # finish time for performance
    print(f'Threads: Finished in {round(end - start,2)} second(s)') 
    df=pd.DataFrame(decision_results)
    df.to_csv("multithread_for.csv")
    return df,decision_results
df,results=multi_threading()

simpleApp
  • 2,885
  • 2
  • 10
  • 19
  • thanks for you detalied answer. just one thing needs to be cleared up to me, please. where did `function()` in my working code go? `access_url()` is not really needed here, because api calls are implicit from within `function()`. are all `function()` parameters implicit as well?, must they all be passed in the same order as in the nested for loop? – 8-Bit Borges May 05 '21 at 04:58
  • That's right your `function()' can replace `access_url()` or you can wrap your function like `def access_url(url,parameter_list): function()` . parameters have been passed in 2nd and 3rd position `(access_url,test_url,financial_options[each])`. as you were using nested loop, so list was build in the same sequence. even though Threads have been created in sequence but their completion is not guaranteed in the same sequence because some will take more time as you are calling api in them. fyip..need to define boundary or task for thread, that is reason function was defined. – simpleApp May 05 '21 at 13:44