1

I have generated permutations list with the itertools.permutation (all permutations) or numpy.permuted (part of all permutations) functions in python, depending on how big all permutations are. This part of the code is ok and works well and quickly.

However, the iterator list is big enough (100k or bigger) and I would like to go through it with multiple threads but don't really know how to accomplish that.

Here is what I have so far. The chunk of code is working but in an inefficient way, it takes a long time to complete the task. I have tried to use multiprocessing.Pool() function, but I have not been able to make it work

avg = {'Block': np.arange(1, 17, 1),
        'A': np.random.uniform(0,1,16),
        'B': np.random.uniform(0,1,16),
        'C': np.random.uniform(0,1,16)
}
avg = pd.DataFrame(avg)
seed = 1234
thr = 100000

def permuta (avg, seed, thr):
    kpi = []

    # Permutations
    if len(pd.unique(avg.Block)) > 9:
        rng = np.random.default_rng(seed)     
        perm = rng.permuted(np.tile(pd.unique((avg.Block)-1).astype(int), thr).reshape(thr, (pd.unique(avg.Block)-1).size), axis=1)
        aa = list(perm)
        aa1 = [tuple(x) for x in aa]
        bb = [np.arange(0, len(avg))]
        bb1 = tuple(bb[0])
                
        if bb1 not in aa1:
            bb.extend(aa)
            aa = [tuple(x) for x in bb]
        else:
            aa = [tuple(x) for x in aa]
    else:
        perm = permutations(avg.index)
        aa = list(perm)

    n0 = len(aa)
    for i in aa:
        if (aa.index(i)+1) % 1000 == 0:
            print('    progress: {:.2f}%'.format((aa.index(i)+1)/n0*100))
        df = avg.loc[list(i)]
        df.reset_index(drop=True, inplace=True)
        
        model_A = LinearRegression(fit_intercept=True).fit(df.index.values.reshape(-1,1), df.A)
        model_B = LinearRegression(fit_intercept=True).fit(df.index.values.reshape(-1,1), df.B)
        model_C = LinearRegression(fit_intercept=True).fit(df.index.values.reshape(-1,1), df.C)
        block_order_id = tuple(x+1 for x in i)
        model_kpi = [block_order_id, model_A.coef_[0], model_B.coef_[0], model_C.coef_[0]]
        
        kpi.append(model_kpi)
    
    kpi = pd.DataFrame (kpi, columns = ['Block_ord', 'm_A', 'm_B', 'm_C'])
      
    return kpi

I would be grateful if someone could help me to speed up the code execution, using all cores for calculations, replacing the "for" loop for a more efficient iterator, or a mix of them.

Thanks for your help

WillyW0nka
  • 11
  • 2
  • It sounds like you want multi-***processing***, not multi-threading. https://docs.python.org/3/library/multiprocessing.html – MatBailie Jan 16 '23 at 09:12
  • @MatBailie To be honest, I'm not entirely sure. From most of the posts I have gone through, It seems that Multi-threading is I/O and Multi-processing is CPU bound, but it is not always the case. [link](https://stackoverflow.com/questions/62474360/multi-threading-vs-multi-processing-which-one-to-select). In any case, I haven't been able to make it work with multiprocessing.Pool(). Thanks for your reply – WillyW0nka Jan 16 '23 at 09:28
  • If you've tried using pool, include your attempt in your question, what the problem and ask about resolving that issue / error / etc. – MatBailie Jan 16 '23 at 09:30

1 Answers1

0

I usually use concurrent.futures as described https://docs.python.org/3/library/concurrent.futures.html

Since you mentioned having trouble getting some implementations set up, here's an rough example pasted below that you just need to tweak a bit.

In this example, I'm using ThreadPoolExecutor which you can try out and tune to your machine. However, if you want to start using different processes then you can use ProcessPoolExecutor which has similar syntax and is also explained in the doc link I posted

import concurrent.futures



avg = {'Block': np.arange(1, 17, 1),
       'A': np.random.uniform(0, 1, 16),
       'B': np.random.uniform(0, 1, 16),
       'C': np.random.uniform(0, 1, 16)
       }
avg = pd.DataFrame(avg)
seed = 1234
thr = 100000


def permuta(avg, seed, thr):
    kpi = []

    # Permutations
    if len(pd.unique(avg.Block)) > 9:
        rng = np.random.default_rng(seed)
        perm = rng.permuted(
            np.tile(pd.unique((avg.Block) - 1).astype(int), thr).reshape(thr, (pd.unique(avg.Block) - 1).size), axis=1)
        aa = list(perm)
        aa1 = [tuple(x) for x in aa]
        bb = [np.arange(0, len(avg))]
        bb1 = tuple(bb[0])

        if bb1 not in aa1:
            bb.extend(aa)
            aa = [tuple(x) for x in bb]
        else:
            aa = [tuple(x) for x in aa]
    else:
        perm = permutations(avg.index)
        aa = list(perm)

    n0 = len(aa)

    with concurrent.futures.ThreadPoolExecutor(max_workers=24) as executor:
        futures = {executor.submit(everything_in_for_loop, i, other_args_needed_for_this_method): i for i in aa}
    error_count = 0
    results = [] # the results will be placed into this array
    for future in futures:
        try:
            result = future.result()
            if result is not None:
                results.append(result)
        except Exception as exc:
            error_count += 1
            print(type(exc))
            print(exc.args)

    kpi = pd.DataFrame(kpi, columns=['Block_ord', 'm_A', 'm_B', 'm_C'])

    return kpi

def everything_in_for_loop(i, other_args_needed_for_this_method):
    if (aa.index(i) + 1) % 1000 == 0:
        print('    progress: {:.2f}%'.format((aa.index(i) + 1) / n0 * 100))
    df = avg.loc[list(i)]
    df.reset_index(drop=True, inplace=True)

    model_A = LinearRegression(fit_intercept=True).fit(df.index.values.reshape(-1, 1), df.A)
    model_B = LinearRegression(fit_intercept=True).fit(df.index.values.reshape(-1, 1), df.B)
    model_C = LinearRegression(fit_intercept=True).fit(df.index.values.reshape(-1, 1), df.C)
    block_order_id = tuple(x + 1 for x in i)
    model_kpi = [block_order_id, model_A.coef_[0], model_B.coef_[0], model_C.coef_[0]]

    kpi.append(model_kpi)