2

I have a table in a postgresql database that is ~900,000 rows. I want to copy it row by row to another table with some extra columns after transforming each row and adding data to the new columns. The problem is that RAM gets full.

Here is the relevant part of the code:

engine = sqlalchemy.create_engine(URL(**REMOTE), echo=False)
Session = sessionmaker(bind=engine)
session = Session()
n=1000
counter=1
for i in range(1,total+1,n):
    ids=str([j for j in range(i,i+n)])
    **q="SELECT * from table_parts where id in (ids)"%ids**
    r=session.execute(q).fetchall()
    for element in r:
        data={}
        ....
       [taking data from each row, extracting string,calculation,
        and filling extra columns that the new table has]
       ...
    query=query.bindparams(**data)
    try:
        session.execute(query)
    except:
        session.rollback()
        raise 
    if counter%n==0:
        print COMMITING....",counter,datetime.datetime.now("%H:%M:%S")
           session.commit()
    counter+=1

The queries are correct, so there is no errors there. Before I press Ctrl+C, the new table gets correctly updated.

The problem seems to the query: "SELECT * from table_parts where id in (1,2,3,4...1000)" I already tried with a postgresql array.

Things I have already tried:

  • results = (connection .execution_options(stream_results=True) # Added this line .execute(query)) from here . As far as I know this uses a server side cursor when used with postgresql. I ditched the session object I have in my posted code and used engine.connect()

    • creating a new connection object on each iteration, surprisingly this does not work either. RAM gets full

from the documentation ,

Note that the stream_results execution option is enabled automatically if the yield_per() method is used.

so the yield_per from the query api is the same with the stream_result option mentioned above

thanks

dimitris_r
  • 39
  • 1
  • 5
  • Possible duplicate of [memory-efficient built-in SqlAlchemy iterator/generator?](http://stackoverflow.com/questions/7389759/memory-efficient-built-in-sqlalchemy-iterator-generator) – RazerM Mar 13 '17 at 13:34
  • It looks like you're only fetching 1000 rows to memory at a time, which should most likely fit in memory just fine. Does it help if you only do 10 rows at a time, for example? – univerio Mar 13 '17 at 19:20
  • yes fetching 1000 rows per time. I have also tried with 10 rows per time.same result regarding the ram issue, only it gets full slower – dimitris_r Mar 14 '17 at 07:22
  • 1
    That sounds like a memory leak somewhere. Unfortunately, the code you've posted does not have any obvious leaks. It may lie in the part of your code you've omitted. Try the solutions for [this question](http://stackoverflow.com/questions/1435415/python-memory-leaks). – univerio Mar 14 '17 at 07:41
  • thanks..the problem seems to be `query=query.bindparams(**data)` line..it seems that it stores the dict of each iteration in a internal object – dimitris_r Mar 14 '17 at 09:48
  • You're modifying `query` every iteration with new bind params, without ever letting it get garbage collected, so the bind params accumulate inside `query`. There's your leak. – univerio Mar 14 '17 at 22:34

1 Answers1

0
create table table_parts ( id serial primary key, data text );
-- Insert 1M rows of about 32kB data =~ 32GB of data
-- Needs only 0.4GB of disk space because of builtin compression
-- Might take a few minutes
insert into table_parts(data)
  select rpad('',32*1024,'A') from generate_series(1,1000000);

This code below using SQLAlchemy.Core does not use a lot of memory:

import sqlalchemy
import datetime
import getpass

metadata = sqlalchemy.MetaData()
table_parts = sqlalchemy.Table('table_parts', metadata,
    sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column('data', sqlalchemy.String)
)

engine = sqlalchemy.create_engine(
    'postgresql:///'+getpass.getuser(),
    echo=False
)
connection = engine.connect()

n = 1000

select_table_parts_n = sqlalchemy.sql.select([table_parts]).\
    where(table_parts.c.id>sqlalchemy.bindparam('last_id')).\
    order_by(table_parts.c.id).\
    limit(n)

update_table_parts = table_parts.update().\
    where(table_parts.c.id == sqlalchemy.bindparam('table_part_id')).\
    values(data=sqlalchemy.bindparam('table_part_data'))

last_id=0
while True:
    with connection.begin() as transaction:
        row = None
        for row in connection.execute(select_table_parts_n, last_id=last_id):
            data = row.data.replace('A','B')
            connection.execute(
                update_table_parts,
                table_part_id=row.id,
                table_part_data=data
            )
        if not row:
            break
        else:
            print "COMMITING {} {:%H:%M:%S}".\
                format(row.id,datetime.datetime.now())
            transaction.commit()
            last_id=row.id

You don't seem to use ORM features, so I suppose you should also use SQLAlchemy.Core.

Tometzky
  • 22,573
  • 5
  • 59
  • 73
  • thank you for replying..isn`t it pretty much same with that i do in my script?the main problem is the line `q="SELECT * from table_parts where id in (ids)"%ids` ..it constantly accumulates the rows on each iteration.. – dimitris_r Mar 14 '17 at 08:15
  • Your code is fragmentary pseudocode with strange mix of ORM and plain SQL. It's hard to reproduce the problem from it. I'm afraid you'd need to provide a [SSCCE](http://sscce.org/) to expect more help. – Tometzky Mar 14 '17 at 20:02