1

I have some code that creates a generator with read_sql() and loops through the generator to print each chunk:

execute.py

import pandas as pd
from sqlalchemy import event, create_engine

engine = create_engine('path-to-driver')

def getDistance(chunk):
    print(chunk)
    print(type(chunk))

df_chunks = pd.read_sql("select top 2 * from SCHEMA.table_name", engine, chunksize=1)

for chunk in df_chunks:
    result = getDistance(chunk)

It works, and each chunk is printed as as DataFrame. When I attempt to do the same thing with multiprocessing like this...

outside_function.py

def getDistance(chunk):
    print(chunk)
    print(type(chunk))
    df = chunk
    return df

execute.py

import pandas as pd
from sqlalchemy import event, create_engine

engine = create_engine('path-to-driver')

df_chunks = pd.read_sql("select top 2 * from SCHEMA.table_name", engine, chunksize=1)

if __name__ == '__main__':
    global result
    p = Pool(20)
    for chunk in df_chunks:
        print(chunk)
        result = p.map(getDistance, chunk)
    p.terminate()
    p.join()

...the chunks print as column names in the console with the type 'str'. Printing out result reveals this ['column_name'].

Why are the chunks turning into strings that are just the column names when multiprocessing is applied?

OverflowingTheGlass
  • 2,324
  • 1
  • 27
  • 75
  • 1
    You are not properly looping over the data_frame. The chunk will be a string regardless of using multiprocessing or not. If you want to loop over the values or rows you should use `df.itertuples()` or `df.iterrows()`. What exactly are you trying to do? – Scott Skiles Mar 20 '19 at 14:43
  • I guess I'm saying you should not be surprised here. If all you did was `for a in df:` that is a string. So no matter what you do with it after that, `a` is a string. Doesn't matter if you're using multiprocessing or not. – Scott Skiles Mar 20 '19 at 14:45
  • In my experience with multiprocessing, you want to avoid terminating processes how you are doing. Check [this answer](https://stackoverflow.com/questions/32053618/how-to-to-terminate-process-using-pythons-multiprocessing) for a pretty in-depth explanation on "gracefully" exiting processes. – d_kennetz Mar 20 '19 at 14:51

1 Answers1

1

This is because p.map expects a function and an iterable. Iterating over a dataframe (in this case your chunk) will yield the column names.

You need to pass in a collection of dataframes to the map method. I.e.:

    global result
    p = Pool(20)
    result = p.map(getDistance, df_chunks)
    p.terminate()
    p.join()
John Sloper
  • 1,813
  • 12
  • 14