0

I must use library multiprocessing. Trying to run in parallel following loops:

tag=[]
#get all model ID
all_model_id=get_models_id()    #I have a list of list
liste_all_img_id=[] 

#I want to start multiprocessing here
for i in range(0,len(all_model_id)):
    tag=get_tags(all_model_id[i][0]) # get_tags function return me a list 
    #print(tag)
    for l in range(0,len(tag)):
        liste_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0]))  #get_images_id function return me a list


I tried this:

def funcs(start,end):
    tag=[]
    list_all_img_id=[]
    for i in range(start,end):
        tag=get_tags(all_model_id[i][0])
        for l in range(0,len(tag)):
            list_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0]))
    return(list_all_img_id)



from multiprocessing import Pool
import multiprocessing
def main():
    all_model_id=get_models_id()
    len_all_model_id=len(all_model_id)
    div_total = int(len_all_model_id / 3)
    rest_div_total = len_all_model_id%3
    t1 = multiprocessing.Process(target = funcs,name = "", args=(0, div_total))
    t2 = multiprocessing.Process(target = funcs,name = "", args=(div_total, div_total*2))
    t3 = multiprocessing.Process(target = funcs,name = "", args=(div_total*2, div_total*3 + rest_div_total + 1))
    list_threads = [t1,t2,t3]
    for i in list_threads:
          i.start()
    for i in list_threads:
          i.join()

if __name__ == "__main__":
        main()

But :

  • I'm not sure main function is well defined

  • I don't know how to store my results

akhetos
  • 686
  • 1
  • 10
  • 31

3 Answers3

1
def funcs(start,end):
    tag=[]
    list_all_img_id=[]
    for i in range(start,end):
        tag=get_tags(all_model_id[i][0])
        for l in range(0,len(tag)):
            list_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0]))
    return(list_all_img_id)

from multiprocessing.pool import Pool

def main():
    all_model_id=get_models_id()
    len_all_model_id=len(all_model_id)
    div_total = int(len_all_model_id / 3)
    rest_div_total = len_all_model_id%3
    with Pool(3) as pool:
        results = []
        # submit 3 tasks without blocking
        results.append(pool.apply_async(funcs, args=(0, div_total)))
        results.append(pool.apply_async(funcs, args=(div_total, div_total*2)))
        results.append(pool.apply_async(funcs, args=(div_total*2, div_total*3 + rest_div_total + 1)))
        # now await 3 results:
        for result in results:
            print(result.get())

if __name__ == "__main__":
        main()

Note that apply_async takes an optional callback argument where you can specify a function to be called with a result (the actual return value from the task) as soon as it becomes available, which may not be in the order in which the tasks were submitted. The above method of obtaining the results (i.e. relying on the result object returned from apply_async on which a blocking call to get can be made) will always obtain the results in task-submission order like the starmap function, a reasonable alternative if you have all the call arguments for all the task submissions in an iterable such as a list or a tuple):

with Pool(3) as pool:
    results = pool.starmap(funcs, [
        (0, div_total),
        (div_total, div_total*2),
        (div_total*2, div_total*3 + rest_div_total + 1)
    ])
    for result in results:
        print(result)

I, too, am a fan of the concurrent.futures module but wanted to make the minimal number of changes to your program. But note that you can use the undocumented but nevertehless existant ThreadPool class that is compatible with the multiprocessing Pool class by simply invoking:

from mulitprocessing.pool import ThreadPool

instead of

from mulitprocessing.pool import Pool

and then specifying:

with ThreadPool(3) as pool:

If your tasks are very I/O intensive, then threading may be a better option.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • When I run *main()*, code is running but nothing happen, and I have to restart my kernel after a time. Sorry i'm very new with parallelization and python in general – akhetos Sep 18 '20 at 11:46
  • Are you running Jupyter Notebook under Windows? – Booboo Sep 18 '20 at 11:50
  • Spyder, anaconda. And when I use the ThreadPool solution, I got an error message **TypeError: string indices must be integers** when the function calls *get_images_id* – akhetos Sep 18 '20 at 11:50
  • 1
    First, you should try running this in a standard .py file at the command prompt. I know that for Jupyter Notebook under Windows you would have to put function `funcs` in a .py file and import it else you would not be able to find it. See [this answer](https://stackoverflow.com/questions/63936096/example-from-documentaion-doesnt-work-in-jupiter-notebook/63937530#63937530). – Booboo Sep 18 '20 at 11:53
  • As far as your TypeError, you need to get your program working first without threading or multiprocessing. Verify that start and end parameters to `funcs` are reasonable. If so, your problem has nothing to do with threading or multiprocessing, so why confuse the issue? You would then have to open a second question on SO. – Booboo Sep 18 '20 at 11:57
  • *funcs* is working well when i'm not trying multitreading – akhetos Sep 18 '20 at 12:01
  • So if you were to replace the calls to `pool.apply_async(...` with `funcs(0, div_total)`, `funcs(div_total, div_total*2)`, etc. there would be no errors? – Booboo Sep 18 '20 at 12:09
  • yes this is working well. `funcs(0,div_total) funcs(div_total, div_total*2)` – akhetos Sep 18 '20 at 13:19
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/221691/discussion-between-booboo-and-akhetos). – Booboo Sep 18 '20 at 13:25
0

I've modified your code a little bit, but I haven't run it yet. But I think my code will work for your problem. Ask me something it's not working for you.

from multiprocessing import Pool
import multiprocessing as mp

def funcs(tupl):
    start, end = tupl[0], tupl[1]
    tag=[]
    list_all_img_id=[]
    for i in range(start,end):
        tag=get_tags(all_model_id[i][0])
        for l in range(0,len(tag)):
            list_all_img_id.append(get_images_id(tag[l][0],all_model_id[i][0]))
    return(list_all_img_id)

def main():
    all_model_id=get_models_id()
    len_all_model_id=len(all_model_id)
    div_total = int(len_all_model_id / 3)
    rest_div_total = len_all_model_id%3
    lst_args = [(0, div_total), (div_total, div_total*2), (div_total*2, div_total*3 + rest_div_total + 1)]
    pool = mp.Pool(processes=3)
    res = pool.map(funcs, list_args) # you can loop through res to get your results 

if __name__ == "__main__":
        main()
leminhnguyen
  • 1,518
  • 2
  • 13
  • 19
  • You could have (should have) left the arguments to `funcs` as they were and used `pool.starmap`. And if the computer has 16 CPUs, isn't creating 16 processes, an expensive operation, excessive just to run 3 tasks? – Booboo Sep 18 '20 at 11:02
0

Try this out, using concurrent.futures module.

ThreadPoolExecutor(max_workers = 10) (you can specify maximum workers).

Moreover if you want multiple Processes instead of Threads. You can simply replace ThreadPoolExecutor with ProcessPoolExecutor

tag=[]
all_model_id=get_models_id()
liste_all_img_id=[]

def func(model_id):
    tag = get_tags(model_id[0])
    for l in range(0,len(tag)):
        liste_all_img_id.append(get_images_id(tag[l][0],model_id[0]))  

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(func, all_model_id)
BATMAN
  • 375
  • 2
  • 14