4

I have a 10 million row table in MySQL DB which I need to read, do some validation checks on my client machine and load into a table in postgres database. I am able to successfully get the data into my machine but I am having out of memory issues while trying to process the data and load into the postgres database

Is there a way to use an iterator to process the data in memory and insert into postgres in chunks?

Here is the code I currently have:

from sqlalchemy import create_engine, MetaData, Table

# MySQL database connection
source_engine = create_engine('mysql+pymysql://user:pwd@serveraddress:3306/dbname')
source_connection = engine.connect()

# Read the entire data
data = source_connection.execute('SELECT * FROM table')

# close the MySQL connection
source_connection.close()

# function to transform data
def transform(data):

    def process_row(row):
    """do data validation on the row"""
    return row

    # process and return the incoming dataset as a list of dicts
    processed_data = [dict(zip(data.keys(), process_row(d)) for d in data]
    return processed_data

transformed_data = transform(data)

# Postgres database connection
dest_connection = create_engine('postgresql://user:pwd@serveraddress:5432/dbname')
dest_meta = MetaData(bind=dest_connection, reflect=True, schema='test')

table = Table('table_name', self.meta, autoload=True)
dest_connection.execute(table.insert().values(transformed_data))

dest_connection.close()

Can anyone suggest a simple way to do this?

user2714753
  • 115
  • 3
  • 11

1 Answers1

3

You are on the right path! I had the same problems with the code I was working a couple of weeks ago.

One way to accomplish what you want and avoid memory issues is to do the reading part inside a function that loops over your query and ends with yield. This is good to save memory and do you operation in chunks. The downside is that it will take more time to execute, but you will definitely save lots of computer horse powers. I don't have much information about your data, but the code would look something like this:

from sqlalchemy import create_engine, MetaData, Table

# MySQL database connection
source_engine = create_engine('mysql+pymysql://user:pwd@serveraddress:3306/dbname')
source_connection = engine.connect()

# Read the entire data
def read_data():
    ''' reads all the data and returns it row by row to save memory'''
    data = source_connection.execute('SELECT * FROM table')
    batch_counter = 0
    batch_of_rows = []
    for row in data:
        batch_of_rows.append(row)
        batch_counter = batch_counter + 1
        if batch counter == 5000: # set this to be the batch size that optimizes your code for memory and time of execution.
            batch_counter = 0
            yield batch_of_rows

# close the MySQL connection
source_connection.close()

# function to transform data
def transform(data):

    def process_row(row):
    """do data validation on the row"""
    return row

    # process and return the incoming dataset as a list of dicts
    processed_data = [dict(zip(data.keys(), process_row(d)) for d in data]
    return processed_data


# Postgres database connection
dest_connection = create_engine('postgresql://user:pwd@serveraddress:5432/dbname')
dest_meta = MetaData(bind=dest_connection, reflect=True, schema='test')

table = Table('table_name', self.meta, autoload=True)
for data_row in read_data():
    transformed_data = transform(data)
    dest_connection.execute(table.insert().values(transformed_data))

dest_connection.close()

I think that will solve your memory issues.

Note: if you need some extra explanations about yield, visit this stackoverflow question.

Marcus Vinicius Melo
  • 1,118
  • 16
  • 23
  • Thanks for answering this question. The solution looks elegant with good use of yield statement. But wouldnt this execute 10 million SQL insert statements? Any idea to insert in batches? – user2714753 Jan 19 '18 at 20:40
  • I see. Well, you can put a counter inside the for loop of read_data. Also concatenate the rows in each iteration of this for loop. Then add a if statement that will contain the yield line and something to reset the row concatenation. And set the condition of this if statement to be when the counter reach some kind of multiple of a number. For example, every time the counter is a multiple of 5000 it does the yield thing. This way you would divide by 5000 the number of inserts. – Marcus Vinicius Melo Jan 19 '18 at 21:39
  • Let me know if it solve your question for me to add to the answer above. Also, if it does, don’t forget to flag it as a accepted answer to help others that have the same issue as you :) – Marcus Vinicius Melo Jan 19 '18 at 21:41
  • Working on it now and will flag this as the accepted answer. – user2714753 Jan 20 '18 at 13:35
  • added the batch part to the answer – Marcus Vinicius Melo Jan 22 '18 at 17:42