0

I am trying to export the whole database schema (around 20 GB) using postgreSQL query to create a final unique hdf5 file.

Because this size don't fit on my computers memory, I am using chuncks argument.

First I use this function to establish conection:

def make_connectstring(prefix, db, uname, passa, hostname, port):
    """return an sql connectstring"""
    connectstring = prefix + "://" + uname + ":" + passa + "@" + hostname + \
                    ":" + port + "/" + db
    return connectstring

Then I created a temporary folder to save each of hdf5 file.

def query_to_hdf5(connectstring, query, verbose=False, chunksize=50000):

    engine = sqlalchemy.create_engine(connectstring, 
        server_side_cursors=True)    

    # get the data to temp chunk filese
    i = 0
    paths_chunks = []
    with tempfile.TemporaryDirectory() as td:
        for df in pd.read_sql_query(sql=query, con=engine, chunksize=chunksize):
            path = td + "/chunk" + str(i) + ".hdf5"
            df.to_hdf(path, key='data')
            print(path)
            if verbose:
                print("wrote", path)
            paths_chunks.append(path)
            i+=1


connectstring = make_connectstring(prefix, db, uname, passa, hostname, port)
query = "SELECT * FROM public.zz_ges"
df = query_to_hdf5(connectstring, query)

What is the best way to merge all these files into 1 single file that represents the whole dataframe ?

I tried something like this :

    df = pd.DataFrame()
    print(path)
    for path in paths_chunks:
        df_scratch = pd.read_hdf(path)
        df = pd.concat([df, df_scratch])
        if verbose:
            print("read", path)

However, the memory goes up very fast. I need something that could be more efficient.

Update:

def make_connectstring(prefix, db, uname, passa, hostname, port):
    """return an sql connectstring"""
    connectstring = prefix + "://" + uname + ":" + passa + "@" + hostname + \
                    ":" + port + "/" + db
    return connectstring

def query_to_df(connectstring, query, verbose=False, chunksize=50000):

    engine = sqlalchemy.create_engine(connectstring, 
        server_side_cursors=True)    

    # get the data to temp chunk filese
    with pd.HDFStore('output.h5', 'w') as store:
        for df in pd.read_sql_query(sql=query, con=engine, chunksize=chunksize):
            store.append('data', df)
Cesar
  • 575
  • 3
  • 16

1 Answers1

1

I'd suggest using a HDFStore directly, that way you can append chunks as you get them from the database, something like:

with pd.HDFStore('output.h5', 'w') as store:
  for df in pd.read_sql_query(sql=query, con=engine, chunksize=chunksize):
    store.append('data', df)

this is based around your existing code so isn't complete, let me know if it isn't clear

note I'm opening the store in w mode so it'll delete the file every time. otherwise append will just keep adding the same rows to the end of the table. alternatively you could remove the key first

when you open the store you also get lots of options like compression to use but it doesn't seem to be well documented, help(pd.HDFStore) describes complevel and complib for me

Sam Mason
  • 15,216
  • 1
  • 41
  • 60
  • Hi Sam, thanks for your answer. See if my updated example is like you suggested – Cesar Sep 30 '19 at 18:58
  • When I run this function, the output file is created, but with no data. – Cesar Sep 30 '19 at 19:00
  • Hi Sam, it was related with dtypes and column sizes issues. – Cesar Sep 30 '19 at 20:13
  • 1
    glad to help! note that there are various things you can do to make things faster, e.g. various suggestions in https://stackoverflow.com/a/20084843/1358308 – Sam Mason Oct 01 '19 at 12:38
  • Hi Sam, I am facing an error: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. – Cesar Oct 02 '19 at 12:37
  • I added an Try/expect, but I am not sure how to restablish connection – Cesar Oct 02 '19 at 12:38
  • sounds like a database issue unrelated to HDF5. I'd suggest trying to isolate the issue and posting a new question when you can get a reproducible test case – Sam Mason Oct 03 '19 at 08:19