0

I have 10 functions and they all query against a DB and return a DF.

I can not execute them one after another as I join them at the end and if the timestamp doesn't match I will get null values. As I query large chunk of data so it takes time and so I want to run it in parallel.

def df1(domain,durarion):
    do something
    return df

def df2(domain,durarion):
    do something
    return df

def df3(domain,durarion):
    do something
    return df

def df4(domain,durarion):
    do something
    return df

def df5(domain,durarion):
    do something
    return df

def df6(domain,durarion):
    do something
    return df

def df7(domain,durarion):
    do something
    return df

def df8(domain,durarion):
    do something
    return df

def final_df(domain,duration):
    df = pd.concat([df1(domain,duration),
                    df2(domain,duration),
                    df3(domain,duration),
                    df4(domain,duration),
                    df5(domain,duration),
                    df6(domain,duration),
                    df7(domain,duration),
                    df8(domain,duration)
                    ],axis=1,sort=False).reset_index()
    df = df.set_index('time')
    return df

df = final_df(domain,duration)

I want to call all the 8 functions df1, df2, df3, df4, df5, df6, df7, df8 inside final_df function in parallel.

P.S:- I am familiar with multiprocessing but I just don't want to run them in parallel but also to save its result.

  • Id suggest looking at a Multiprocessing Pool – tomgalpin Mar 26 '20 at 10:53
  • This question lists multiple ways to get the return values from multiprocessing: https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce – backcab Mar 26 '20 at 10:56
  • As @tomgalpin suggested take a look at multiprocessing pool. It has a map function which just does what you want. This is the link: https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.pool.Pool.map . And please do look at the example some lines below where the link points. – Farbod Shahinfar Mar 26 '20 at 11:34

1 Answers1

0

I was able to solve it :-

import pandas as pd
import numpy as np
import threading
import time


def df1(domain,durarion):
    do something
    return df

def df2(domain,durarion):
    do something
    return df

def df3(domain,durarion):
    do something
    return df

def df4(domain,durarion):
    do something
    return df

def df5(domain,durarion):
    do something
    return df

def df6(domain,durarion):
    do something
    return df

def df7(domain,durarion):
    do something
    return df

def df8(domain,durarion):
    do something
    return df

def df9(domain,durarion):
    do something
    return df

def df10(domain,durarion):
    do something
    return df

def df11(domain,durarion):
    do something
    return df

def getdf(domain,duration):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        f1 = executor.submit(df1, domain,duration)
        f2 = executor.submit(df2, domain,duration)
        f3 = executor.submit(df3, domain,duration)
        f4 = executor.submit(df4, domain,duration)
        f5 = executor.submit(df5, domain,duration)
        f6 = executor.submit(df6, domain,duration)
        f7 = executor.submit(df7, domain,duration)
        f8 = executor.submit(df8, domain,duration)
        f9 = executor.submit(df9, domain,duration)
        f10 = executor.submit(df10, domain,duration)
        f11 = executor.submit(df11, domain,duration)
        df = pd.concat([f1.result(),f2.result(),f3.result(),f4.result(),f5.result(),f6.result(),f7.result(),f8.result(),f9.result(),f10.result(),f11.result()],axis=1,sort=False).reset_index()
        df['time'] = pd.to_datetime(df['time']) 
        df = df.set_index('time')
    return df

df = getdf(domain, duration)