0

I have a process that's becoming IO bound where I pull a large dataset from a database into a pandas dataframe and then try to do some line by line processing and then persist to a gzip file. I'm trying to find a way to use multiprocessing to be able to split the creation of the gzip into multiple processes and then merge them into one file. Or process in parallel without overwriting a previous thread. I found this package p_tqdm but i'm running into EOF issues probably because the threads overwrite each other. Here's a sample of my current solution:

from p_tqdm import p_map

df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
    things.append(row)    
p_map(process, things)

def process():
    with gzip.open("final.gz", "wb") as f:
        value = do_somthing(row)
        f.write(value.encode())
dweeb
  • 539
  • 2
  • 6
  • 21

1 Answers1

0

I don't know about the p_tqdm but if I understand your question, it might be easily done with multiprocessing.

something like this

import multiprocessing

def process(row):
    # take care that "do_somthing" must return class with encode() method (e.g. string)
    return do_somthing(row)

df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
    things.append(row)


with gzip.open("final.gz", "wb") as f, multiprocessing.Pool() as pool:
    for processed_row in pool.imap(process, things):
        f.write(processed_row.encode())

Just few sidenotes:

  • The pandas iterrows method is slow - avoid if possible (see Does pandas iterrows have performance issues?).

  • Also, you don't need to create things, just pass iterable to imap(even passing df.iterrows() directly should be possible) save yourself some memory.

  • And finally, since it appears that you are reading sql data, why not connect to the db dicectly and iterate over the cursor from SELECT ... query, skipping pandas altogether.

Marek Schwarz
  • 578
  • 6
  • 10
  • Thank a for you answer and thoughtful comments. I also realized after further research that iterrows() is a nightmare on performance. I changed my data frame to a dask data frame then passed the df.to_array() and then iterated over this to build .gz file. The performance is much better. I wonder if using passing the df.array() into the multiprocessing pool will do better. I am curious about reading the SQL Tables- i don’t want to hamper the DBs performance but I’d like to speed up that part. Right now I basically do a fetch all and just wait blinding for 10s on millions of rows. Any tips? – dweeb Feb 01 '20 at 14:45
  • As I've said, iterate directly over the cursor object. You don't say what is the db engine you are using, but, if, for example, it is something like sql then something like this is usually possible: `cur.execute("SELECT * FROM x";)` and then `for row in cur: ...` should be possible. (The row is usually a tuple of data.) Check out e.g. https://docs.python.org/3.8/library/sqlite3.html. I don't think that the multiprocessing would give you a lot of perf improvement (it has quite an overhead) but it depends on your usecase. – Marek Schwarz Feb 02 '20 at 19:48