3

I am trying to create a datafrmae from fixedwidth file and load into postgresql database. My input file is very huge (~16GB) and 20Million records. So if i create dataframe it is consuming most of the available RAM. It is taking long time to complete. So i thought of using chunksize(using python generator) option and commit records to table. But it is failing with 'AttributeError: 'generator' object has no attribute 'to_sql' error.

Inspired by this answer here https://stackoverflow.com/a/47257676/2799214

input file: test_file.txt

XOXOXOXOXOXO9
AOAOAOAOAOAO8
BOBOBOBOBOBO7
COCOCOCOCOCO6
DODODODODODO5
EOEOEOEOEOEO4
FOFOFOFOFOFO3
GOGOGOGOGOGO2
HOHOHOHOHOHO1

sample.py

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    for chunk in pd.read_fwf(filename, colspecs=[[0,12],[12,13]],index_col=False,header=None, iterator=True, chunksize=chunk_size):
        yield (chunk)

def _generator( engine, filename, header=False,chunk_size = 10 ** 5):
    chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
    yield row

if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://ABCD:ABCD@ip:port/database')
    c = engine.connect()
    conn = c.connection
    generator = _generator(engine=engine, filename=filename)
    while True:
       print(next(generator))
    conn.close()

Error:

    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
AttributeError: 'generator' object has no attribute 'to_sql'

My Primary goal is to improve performance. Please help me in resolving the issue or please suggest better approach. Thanks in advance.

goks
  • 1,196
  • 3
  • 18
  • 37
  • 1
    `chunck_generator` is a generator object which do not have the method `to_sql()`. You may need to use `current_chunk = next(chunk)` to get the chunk. Also, `row` is not define. – TwistedSim May 01 '18 at 15:58
  • 1
    @TwistedSim yes i agree. Is there anyway i can resolve this issue. I should retain dataframe properties. – goks May 01 '18 at 16:00
  • Where are you expecting this `to_sql` method to be defined? Certainly not on all generators, or all iterables, or the specific generator you created by just `yield`ing values from a function? If you want too call a method of a DataFrame, you have to call it on a DataFrame, not on some other kind of object. – abarnert May 01 '18 at 16:01

3 Answers3

4

'chunck_generator' will return a 'generator' object not an actual element of the chunk. You need to iterate the object to get the chunk out of it.

>>> def my_generator(x):
...     for y in range(x):
...         yield y
...
>>> g = my_generator(10)
>>> print g.__class__
<type 'generator'>
>>> ele = next(g, None)
>>> print ele
0
>>> ele = next(g, None)
>>> print ele
1

So to fix your code you just need to either loop over the generator

for chunk in chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    yield chunk.to_sql()

But it seems convoluted. I would just do this:

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def sql_generator(engine, filename, header=False,chunk_size = 10 ** 5):
    frame = pd.read_fwf(
        filename, 
        colspecs=[[0,12],[12,13]],
        index_col=False,
        header=None, 
        iterator=True, 
        chunksize=chunk_size
    ):
   
    for chunk in frame:
        yield chunk.to_sql(
            'sample_table', 
            engine, 
            if_exists='replace', 
            schema='sample_schema', 
            index=False
        )


if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://USEE:PWD@IP:PORT/DB')
    for sql in sql_generator(engine, filename):
        print sql
Polydynamical
  • 242
  • 3
  • 18
gbtimmon
  • 4,238
  • 1
  • 21
  • 36
  • 1
    Code is working. Can i release each chunk memory using del[df] or grabage collector? – goks May 02 '18 at 17:35
  • It should garbage collect on its own. – gbtimmon May 02 '18 at 18:15
  • 1
    Code is working fine but to_sql is running very slow. it took 30mins to insert 100k records ( 100k rows , 98 columns all text type columns). Any insights? – goks May 03 '18 at 14:50
  • 1
    you dont want to chunk -- you want to probably use a database loading utility that can turn off transactions. Postgres seems to provide a COPY function to make large file loading a little more efficient. https://dba.stackexchange.com/questions/151930/import-large-sql-file-to-postgres?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa – gbtimmon May 03 '18 at 17:00
  • 1
    Thanks. I found a way to load 9.8million records. I have usedpart of your code and psycopg2 package to load the data. Data is loaded in 30mins – goks May 04 '18 at 16:23
1

Conclusion: to_sql method is not efficient to load large files. So i used copy_from method in package psycopg2 and used chunksize option while creating dataframe. Loaded 9.8 Million records(~17GB) with 98 columns each in 30mins.

I have removed original refrences of my actual file ( iam using sample file in the original post).

import pandas as pd
import psycopg2
import io

def sql_generator(cur,con, filename, boundries, col_names, header=False,chunk_size = 2000000):
    frame = pd.read_fwf(filename,colspecs=boundries,index_col=False,header=None,iterator=True,chunksize=chunk_size,names=col_names)
    for chunk in frame:
        output = io.StringIO()
        chunk.to_csv(output, sep='|', quoting=3, escapechar='\\' , index=False, header=False,encoding='utf-8')
        output.seek(0)
        cur.copy_from(output, 'sample_schema.sample_table', null="",sep="|")
        yield con.commit()

if __name__ == "__main__":
    boundries = [[0,12],[12,13]]
    col_names = ['col1','col2']
    filename = r'test_file.txt'  #Refer to sample file in the original post
    con = psycopg2.connect(database='database',user='username', password='pwd', host='ip', port='port')
    cur = con.cursor()
    for sql in sql_generator(cur,con, filename, boundries, col_names):
        print(sql)
    con.close()
goks
  • 1,196
  • 3
  • 18
  • 37
0

I suggested you something like:

def _generator( engine, filename, ...):
    for chunk in pd.read_fwf(filename, ...):
        yield chunk.to_sql('sample_table', engine, ...)  # not sure about this since row was not define

for row in _generator(engine=engine, filename=filename)
    print(row)
TwistedSim
  • 1,960
  • 9
  • 23