0

(ANSWERED) My answer to this is down below hope it helps.

I am quite new to SQLAlchemy and Python as a whole and I am looking for some advise. I am looking at moving data from one Postgres DB to another Postgres DB. I am about moving 20M+ records and I would like the job to run daily. I would like to know:

  1. Should I use SQLAlchemy core or the ORM? (I have mostly used the core so far)
  2. I am currently using SQLAlchemy version '1.3.23' (should I move to 1.4/2.x)?
  3. How do I optimize the insert to run faster? (I heard that there might be flags I need to enable?)

I unfortunately can't use pyscopg2 Copy function because I do not have SuperUser access to the DB.

I am trying to follow someone else's stack overflow example: the example i am following

q = select([table_1])

proxy = conn_p.execution_options(stream_results=True).execute(q)

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time
    
    if not batch:
        break

    for row in batch:
        ???

proxy.close()

The part that I get stuck on is in the for loop. How do I write the data to the next db? What function/s should I use?

Is this the best approach or have I gone horribly wrong?

My current iteration of code using version 1.4:

conn_p = create_engine(--db connection string--, echo=True)

conn_sl = create_engine(--db connection string--, echo=False)

q = select([table_1])

proxy = conn_p.execution_options(stream_results=True).execute(q)


while 'batch not empty':      
    batch = proxy.fetchmany(10000)  
    
    list1 = []

    if not batch:
        break
    
    for row in batch:
        d = dict(row.items())
        list1.append(d)    
    
    insert_stmt = table_2.insert().values(list1) 
    conn_sl.execute(insert_stmt)    


proxy.close()

Still very slow, it takes about 15 seconds to move 10k records. Any advise?

IronSoul
  • 1
  • 3
  • https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#psycopg2-executemany-mode – Gord Thompson Jan 10 '22 at 14:56
  • @GordThompson I have added the flag to my connection like this `conn_sl = create_engine(--db connection string--, echo=False, executemany_mode='values')` But I am not seeing any performance enhancements. Have I missed something? I am only doing INSERT's no updates or anything like that. I am just trying to move data. – IronSoul Jan 12 '22 at 13:27
  • `values_only` is the default for SQLAlchemy 1.4 and it sounds like what you want – Gord Thompson Jan 12 '22 at 14:55
  • I updated to 1.4 and ran this code (added to my question above). Im getting the same speed, 15 seconds to move 10k records which 2 and half minutes for 100k. – IronSoul Jan 12 '22 at 15:39
  • Are you pushing the data to the destination database over a WAN or cloud connection? – Gord Thompson Jan 12 '22 at 18:35
  • @GordThompson I am pushing the data over a cloud connection. Could the issue be latency? – IronSoul Jan 13 '22 at 11:00

2 Answers2

0
  1. Postgres will not let you access both databases on the same session at the same time. There are tricks, but if you don't have admin powers you probably can't use them. That would imply the need for a second connection to the same host. It is significantly more expensive to spool data to a client and back to the server than doing a server-side copy, it's worth fighting for a DBA to set up some kind of scheduled copy for periodic operations.
  2. Adding ORM elements to a bulk operation like this will only slow you down. For millions of rows it could become debilitating.
  3. Not sure, sqlalchemy 1.4+ has nice query syntax, but probably not helpful to this particular problem
  4. If using SQL directly the tips are straightforward. Defer indexing until after the data is loaded. You may have to delete the index and create it again once the rows have been added. Watch out for mega-sized transactions. See this discussion on bulk loading for more depth

Therefore, instantiate a second connection/session, drop the indexes, execute basic INSERT statements with the properties from each row, as per statement level docs, and recreate the indexes again.

nerdstrike
  • 181
  • 2
  • 4
  • Thank you for your feed back. So I am pulling from one Postgresql DB to A different Postgresql DB, so I can't do server side copy. I have been playing around with the code and I am doing the following in the for loop. `for row in batch: d = dict(row.items()) list1.append(d) insert_stmt = table_2.insert().values(list1) conn_sl.execute(insert_stmt)` Please let me know if this is good or bad. – IronSoul Jan 11 '22 at 07:25
  • There are diminishing returns on increasing the size of `list1` in a single insert statement for the servers I've used. At some point the server parsing the SQL becomes the bottleneck. Depending on the server configuration there will be an optimal length, such as 1000 values per insert beyond which you slow down or fail versus sending more queries with smaller payloads. You'd have to benchmark your setup to know. – nerdstrike Jan 11 '22 at 10:13
  • Using the above code. It takes about 160 seconds to load 140k records. I am using AWS sagemaker to run the job, its run on a **ml.t2.medium (4GB memory)**, which I know has limited memory. I did try on a larger instance **ml.t2.xlarge (16GB memory)** with a batch size of 10k records at a time which took 180 seconds to load the 140k records. – IronSoul Jan 11 '22 at 12:39
0

After about a week of work, testing and optimizing, I am left with the following.

I was working towards some ETL work but mostly lifting and shifting data from one postgresql DB to another one. I needed it to be dynamic in the way I pass tables to it, so that I could move tables easier. Please let me know what you think.

schema_s = "source schema"
schema_d = "destination schema"


source_creds = ('db connection string')
destination_creds = ('db connection string')

#same as destination - I use psycopg2 because it shaved off a few seconds off the insert time
conn_ssyc = psycopg2.connect('db connection string')
cur = conn_ssyc.cursor()


#list of tables to move
tbl_to_load = [
    'table_name_1',
    'table_name_2',
    'table_name_3',
    'table_name_4'
    ]

for current_table in tbl_to_load:
    start_time_per = time.time()
    
    #create the needed engines
    conn_s = create_engine(source_creds, echo=False, echo_pool=False)
    conn_d = create_engine(destination_creds, echo=False, echo_pool=False)
    
    #get the columns in each current_table 
    df = pd.read_sql(
                "SELECT \
                   column_name\
                FROM information_schema.columns\
                    WHERE table_schema = '{}'\
                    AND table_name   = '{}'".format(schema_d, current_table), conn_d)
    cur = conn_ssyc.cursor()
    print(current_table)
    
    
    #creates a list of "%s" corresponding to the number of columns in each table. 
    #Eg table with 5 columns ("%s","%s","%s","%s","%s")

    inst_col = "("
    for i in df['column_name']:
        inst_col = inst_col + "%s,"
    inst_col = inst_col[:-1]    #removes the last comma
    inst_col = inst_col + ")"

    #defining the tables
    meta_p = MetaData(schema = schema_s)
    table_1 = Table(current_table,
                    meta_p,
                    autoload_with=conn_s
                   )


    meta_sl = MetaData(schema = schema_d)
    table_2 = Table(current_table,
                    meta_sl,
                    autoload_with=conn_d
                    )
    
    #truncate current table
    truncate_query = sqlalchemy.text("TRUNCATE TABLE {}.{}".format(schema_d, current_table))
    conn_d.execution_options(autocommit=True).execute(truncate_query)

    start_time= time.time()
    
    #steam the data
    q = select([table_1])
    proxy = conn_s.execution_options(stream_results=True).execute(q)
    
    end_time = time.time()
    total_time = end_time - start_time
    print("Time: ", total_time)
    
    batch_number = 0
    while 'batch not empty':  
        batch = proxy.fetchmany(10000)  # fetching batches of 10,000 rows at a time
        
        start_time= time.time()
        #batch_number is just to monitor progress
        batch_number = batch_number + 1
        print(batch_number)
        
        list1 = []

        if not batch:
            break

        for row in batch:
            d = dict(row.items())
            list1.append(d)    

        df = pd.DataFrame(list1)    

        tpls = [tuple(x) for x in df.to_numpy()]
        
        #Here is my insert, so far based on the table size (52 columns wide, 4.3m records, and full data size in csv is 3.5gb) I can move 10k records in 3-5 seconds per batch
        
        args_str = ','.join(cur.mogrify(inst_col, x).decode('utf-8') for x in tpls)
        cur.execute("INSERT INTO {}.{} VALUES ".format(schema_d, current_table) + args_str)
        conn_ssyc.commit()
        #4-5 sec or so for 10k
        
        end_time = time.time()
        total_time = end_time - start_time
        print("Time: ", total_time)

    cur.close()
    proxy.close()
    
    end_time = time.time()
    total_time = end_time - start_time_per
    print("Time: ", total_time)
IronSoul
  • 1
  • 3