0

I'm looking to process loops in multi-thread way. The loop for each key is done independently, and collected in the final result dictionary.

My atttempt so far, inspired from How do I parallelize a simple Python loop?

import time
from joblib import Parallel, delayed

#global dict to capture all result

res_dict = {}

#list of keys to be processed independently and in parallel

keys = ["k1", "k2", "k3"]
for k in keys :
    res_dict[k] = None

def process(key,i):
    #modifying the global dict, for that specific key
    global res_dict
    print("Processing ",key, i)
    res_dict[key] = key + str(i*i)
    #return i * i

Parallel(n_jobs=4)(delayed(process)(k, 5) for k in keys)

Result is ["k1":None, "k2": None, "k3":None] as initialization. Nothing is changed after processed. Any thoughts ? Any recommendation to do it better? Thanks

UPDATES ATTEMPT 2

Learned that READ is working by MUTATE does not

res_dict = {}
keys = list(range(0,5))
for k in keys :
    res_dict[k] = k

def process(k,i):
    global res_dict
   
    #try reading and re-assigning back
    print(k,i,)
    #assigning back is not working here, but reading DOES
    res_dict[k]= res_dict[k] +2
    return res_dict[k]

tmp_res = Parallel(n_jobs=2)(delayed(process)(k, 5) for k in keys)
print(tmp_res) #[2, 3, 4, 5, 6]

#assign result back to overwrite
for idx,k in enumerate(keys) :
    res_dict[k] = tmp_res[idx]

Concerns :

  1. tmp_res return a list, SEEMINGLY collect the for loop IN ORDER. I'm not sure if this is true given the parallel nature.
  2. In reality, each res_dict[k] is a dataframe. I have ~100 of them to be processed ASAP, hence the intention to multithread. With this assigning every time, res_dict[k] = tmp_res[idx] will there be memory issue ? Every time the older df version stays in memory but no longer used.
Kenny
  • 1,902
  • 6
  • 32
  • 61
  • Instead of `print("Processing ",k, i)` do you mean `print("Processing ",key, i)`? Does it print the `Processing…` messages? – DisappointedByUnaccountableMod Jun 25 '21 at 19:27
  • Nothing is printed either. Good catch on k - key – Kenny Jun 25 '21 at 19:30
  • k so now your problem is finding why `process` isn’t called. Try removing the `delayed()`? Why can’t you use the threaded version of `map()` instead of joblib?? – DisappointedByUnaccountableMod Jun 25 '21 at 19:33
  • ThreadPoolExecutor - see https://docs.python.org/3/library/concurrent.futures.html?highlight=threadpoolexecutor#concurrent.futures.ThreadPoolExecutor - so your question isn’t anything to do with global variables, title should be more like “How do I get my function called when using joblib delayed?” But I can’t believe there aren’t simple examples like what you want to do. – DisappointedByUnaccountableMod Jun 25 '21 at 19:39
  • i think you can return res_dict[key],but i dont sure about a solution to the purpose for you, i have a document for you , its working for arrays https://joblib.readthedocs.io/en/latest/parallel.html#shared-memory-semantics – nazlikrmn Jun 25 '21 at 19:49
  • Keep in mind that multithreading generally do not improve performance in CPython due to the Global Interpreter Lock (GIL), except in IO-bound code and few specific cases. Multiprocessing can, but Inter-Process Communication (IPC) have a significant overhead which make its use not really better when the computation is not very intensive. Multiprocessing is incompatible with mutability unless shared memory is used, but shared memory cannot be used in many cases... still because of the GIL and the GC. Unfortunately, there are not a lot of options on pure-python object. – Jérôme Richard Jun 25 '21 at 19:50
  • Most likely reason your code doesn’t update `res_dict` Is that Parallel is using multiprocessing. – DisappointedByUnaccountableMod Jun 25 '21 at 19:58
  • barny : IKR. I thought it would be a well-established problem. Followed the example linked on post. I don't think delayed is the issue. See updates. naz Jerome : the mutation did not work, nor printing, but I realized the READ is working. Calculation will be intensive. Each res_dict[k] is a dataframe undergoing bunch of calculation; there will be about 100 df that needs to be calculated ASAP before the next round comes in, hence I want to optimize by parallelization. – Kenny Jun 25 '21 at 19:58
  • @barny supposedly Parallel and joblib is the new way of Parallelization in py3 https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop – Kenny Jun 25 '21 at 19:59
  • Hmm. Will try it once it’s in the Python standard library - thanks – DisappointedByUnaccountableMod Jun 25 '21 at 20:00

1 Answers1

0

You don't need third-party library.

import concurrent.futures
import threading
import time

res_dict = {k: k for k in range(0,5)}

def process(k, i):
    print("Process key %s with %s in thread %s" % (k, i, threading.get_ident()))
    time.sleep(1)
    res_dict[k] = res_dict[k] +2

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for k in res_dict:
        executor.submit(process, k, 5)
    executor.shutdown(wait=True)

print(res_dict)

But keep in mind that due to GIL limitations, multi-threading is only relevant if you spend lots of time waiting for IO.

Using global variable for process function is a bad design.

import concurrent.futures
import threading
import time

def process(value):
    print("Process value %s in thread %s" % (value, threading.get_ident()))
    time.sleep(1)
    return value + 2

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    values = {k: k for k in range(0,5)}
    # Launch threads
    res = {k: executor.submit(process, v) for k, v in values.items()}
    # get result
    res = {k: future.result() for k, future in res.items()}
    executor.shutdown(wait=True)

print(res)

This second version is compatible with ProcessPoolExecutor if your process function is computing intensive.

Balaïtous
  • 826
  • 6
  • 9