0

I want to parse and act on a huge amount of data stored in a Pandas dataframe. I've tried using multiprocessing and despite getting "results", I don't feel that I'm doing it right and I would very much appreciate a sanity check.

Essentially, the code takes a pandas dataframe (df) and splits the df into a number of smaller dataframes (if 10 cores then 10 splits). It then starts up multiprocessing.Process and supply one of the smaller dataframes to each Process instance along with the target (doTheWork). The target method (doTheWork), gets busy with the smaller passed-in dataframe and then adds the results of the dataframe manipulation to a queue. Later, when all processes are finished, data is then retrieved from the queue.

Here's a trimmed-down version of the code:

def doTheWork(df, someString, someData, queue):
    newPatientsList = []
    #do all the Pandas work in here on object df
    #lots of Pandas and data manipulation here
    #then add my results to the queue
    queue.put(newPatientsList)


def startHere(self, df, numCores):
    splitList = []
    #split the pandas dataframe by the number of cores and add each
    #as an element to the splitList

    queue = multiprocessing.Queue()
    print("...using " + str(numCores) + " cores.")

    with multiprocessing.Manager() as manager:
        workers = []
        for i in range(numCores):
            #build a new thread for each process and point to the doTheWork method
            workers.append(multiprocessing.Process(
                           target=self.doTheWork,
                           args=(splitList[i], someString, someData, queue)))

        #go through each worker and start their job
        for worker in workers:
            worker.start()

        #go through the queue pulling out data and do stuff
        newPatients = Patients()
        for i in range(numCores):
            patientList = queue.get()
            for patient in patientList:
                newPatients.addPatient(patient)

        for worker in workers:
            worker.join()

        return newPatients
Anthony Nash
  • 834
  • 1
  • 9
  • 26
  • Have you tried working with the Dask framework? It has native Pandas-like behavior. Note that improvements are dependent on your system, the data size, and partition of the dataframe. In many cases, you can get good improvements with multi-threading rather than multi-processing: https://www.dask.org – SNygard Jul 07 '22 at 11:22
  • Keep in mind that if the dataframe is huge, you would be essentially making a whole copy of that data and sending it other processes after splitting. Not only will this bloat your memory, but the pickling and unpickling calls (needed for sending data to other processes) will add additional overhead as well when starting the processes. If performance like this is something you want to improve on, I suggest using managers to share the dataframe like in this [answer](https://stackoverflow.com/a/72817277/16310741) – Charchit Agarwal Jul 07 '22 at 11:27
  • @SNygard thanks for that information. dask looks great, but unfortunately, I won't be able to control partitioning a dataframe precisely how I need to i.e., there is one column with a "person id", and thus multiple rows in the dataframe are grouped by the same "person id". – Anthony Nash Jul 07 '22 at 11:32
  • @Charchit thanks for your help. The huge dataframe is split into N smaller dataframes (e.g., where N is 36 for 36 cores) before any multiprocessing tasks begin. I've recorded the overhead, and it is negligible compared to the time it takes to parse the data in each dataframe e.g., approx 20 seconds to set up the workers, then 3 hours to process the N dataframe splits. When it's N=1, it takes around 14 hours. – Anthony Nash Jul 07 '22 at 11:34

0 Answers0