0

I'm using the below code to port the data from MySQL to postgreSQL database and it is working fine, but it's rather slow though( more than 3 minutes is taking for porting some 500 rows, so i think the for loop in one of the function is iterating each rows and inserting it to database, but is there a way to execute the whole rows at once rather than using for loop to loop through all rows? Can it be done via executemany, if so what all changes i need to do? or is there a faster way to do the porting want the gap to be as narrow as possible.

Below the function i need to change(Whether the commit in the for loop is making an issue? Or is it fine that i exclude the commit in for loop?) , If anyone have any reference it will be of great help.

Note: using PostgreSQL and Python, Source server using MySQL.

def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command):
    print("Function Call..")
    msql.execute(msql_command)

    for row in msql:
        try:
            print("Insertion of rows..")
            print(row)
            psql.execute(psql_command, row)
            cnx_psql.commit()

Below is the whole porting script for reference.

import psycopg2
import os
import time
import MySQLdb
import sys
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
from utils.config import Configuration as Config
from utils.postgres_helper import get_connection
from utils.utils import get_global_config


def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command):
    print("Function Call..")
    msql.execute(msql_command)

    for row in msql:
        try:
            print("Insertion of rows..")
            print(row)
            psql.execute(psql_command, row)
            cnx_psql.commit()
        except psycopg2.Error as e:
            print ("Cannot execute the query!!", e.pgerror)
            sys.exit("Problem occured with the query!!!")


def dB_Fetch():

# MySQLdb connection
 try:
    source_host = 'magento'
    conf = get_global_config()
    cnx_msql = MySQLdb.connect(host=conf.get(source_host, 'host'),
                               user=conf.get(source_host, 'user'),
                               passwd=conf.get(source_host, 'password'),
                               port=int(conf.get(source_host, 'port')),
                               db=conf.get(source_host, 'db'))
 except mysql.connector.Error as e:
   print ("MYSQL: Unable to connect!", e.msg)
   sys.exit(1)

# Postgresql connection
 try:
   cnx_psql = get_connection(get_global_config(), 'pg_dwh')
 except psycopg2.Error as e:
   print('PSQL: Unable to connect!\n{0}').format(e)
   sys.exit(1)

# Cursors initializations
 cur_msql = cnx_msql.cursor()
 cur_psql = cnx_psql.cursor()

 try:
   print("creating table using cursor")

   SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;"""

   SQL_create_sales_flat_quote="""DROP TABLE IF EXISTS staging.sales_flat_quote;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote
            (
                  customer_id         BIGINT
                , entity_id           BIGINT
        , store_id        BIGINT
        , customer_email      TEXT
        , customer_firstname  TEXT
        , customer_middlename TEXT
        , customer_lastname   TEXT
        , customer_is_guest   BIGINT
        , customer_group_id   BIGINT
        , created_at          TIMESTAMP WITHOUT TIME ZONE
        , updated_at          TIMESTAMP WITHOUT TIME ZONE
        , is_active           BIGINT
        , items_count         BIGINT
        , items_qty           BIGINT
        , base_currency_code  TEXT
        , grand_total         NUMERIC(12,4)
        , base_to_global_rate NUMERIC(12,4)
        , base_subtotal       NUMERIC(12,4)
        , base_subtotal_with_discount   NUMERIC(12,4)
        )
   ;"""

   SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.sales_flat_quote_item;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item
                 (store_id        INTEGER
        , row_total       NUMERIC
        , updated_at      TIMESTAMP WITHOUT TIME ZONE
        , qty             NUMERIC
        , sku             CHARACTER VARYING
        , free_shipping   INTEGER
        , quote_id        INTEGER
        , price       NUMERIC
        , no_discount     INTEGER
        , item_id     INTEGER
        , product_type    CHARACTER VARYING
        , base_tax_amount NUMERIC
        , product_id      INTEGER
        , name        CHARACTER VARYING
        , created_at      TIMESTAMP WITHOUT TIME ZONE
        );"""

   SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.catalog_product_flat_1;CREATE TABLE IF NOT EXISTS staging.catalog_product_flat_1
                 (name              CHARACTER VARYING
        , sku               CHARACTER VARYING
        , type_id           CHARACTER VARYING
        , created_at        TIMESTAMP WITHOUT TIME ZONE
        , url_path          CHARACTER VARYING
        , price             NUMERIC
        , short_description CHARACTER VARYING
        , url_key           CHARACTER VARYING
        , thumbnail_label   CHARACTER VARYING
        , small_image       CHARACTER VARYING
        , thumbnail         CHARACTER VARYING

        );"""


   print("Creating  Schema...")   
   cur_psql.execute(SQL_create_Staging_schema)

   print("Creating tables...")
   cur_psql.execute(SQL_create_sales_flat_quote)

   cur_psql.execute(SQL_create_sales_flat_quote_item)
   cnx_psql.commit();

   print("Fetching data from source server")
   #select from Magento & insert into DWH: SELECT & INSERT SQL statements are created as PAIRS for looping.
   commands = [
              ("""SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename
                 , customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active
                 , items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
                 , base_subtotal_with_discount from sales_flat_quote
                 where is_active=1
                 AND items_count != '0'
                 AND updated_at > '2019-05-12 00:00:00';""",
              """INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname
                , customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at
                , is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
                , base_subtotal_with_discount) 
              SELECT %s, %s, %s,%s,%s,%s,%s,%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;"""),

            ("""SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type
                       ,base_tax_amount,product_id,name,created_at from sales_flat_quote_item WHERE updated_at > '2019-05-12 00:00:00';""",
             """INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id
                                                          ,price,no_discount ,item_id,product_type,base_tax_amount,product_id,name
                                                          ,created_at)
                SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")

             ,("""SELECT created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail
                        ,name,sku,type_id from catalog_product_flat_1;""", 
             """INSERT INTO staging.catalog_product_flat_1 (created_at,url_path,price,short_description,url_key,thumbnail_label
                                                            ,small_image,thumbnail,name,sku,type_id) 
                SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")            
            ]

   for msql_command, psql_command in commands:
       psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,cur_msql, cur_psql, msql_command, psql_command)

 except (Exception, psycopg2.Error) as error:
     print ("Error while fetching data from PostgreSQL", error)
 finally:
     ## Closing cursors
     cur_msql.close()
     cur_psql.close()
     ## Committing
     cnx_psql.commit()
     ## Closing database connections
     cnx_msql.close()
     cnx_psql.close()


if __name__ == '__main__':
    dB_Fetch()
Linu
  • 589
  • 1
  • 10
  • 23
  • 1
    You can try disabling constraints on the target db and seeing if it speed things up. You can also try to insert several lines at a time using `COPY ... FROM STDIN` (see https://www.postgresql.org/docs/9.2/sql-copy.html). – Paulo Scardine Jun 11 '19 at 07:56
  • 1
    It is not uncommon to get 100x speed increase by using `COPY` instead of `INSERT`. You may find this https://stackoverflow.com/questions/8144002/use-binary-copy-table-from-with-psycopg2 useful – Roman-Stop RU aggression in UA Jun 11 '19 at 08:08
  • @PauloScardine Thanks a lot for your reply! i will check this and update,Other than that any other thing i have to modify on the above code? – Linu Jun 11 '19 at 08:48
  • @RomanKonoval iIs there any thing i have to modify on the current code? – Linu Jun 11 '19 at 08:49
  • @Linu yes. What exactly you need to change depends on your intentions. Basically whether you will try to optimize solution with inserts or adapt the solution with `COPY` – Roman-Stop RU aggression in UA Jun 11 '19 at 09:23
  • @RomanKonoval mainly im trying to optimize the existing solution, and check how the execution goes. If its still taking more time i will try to modify the solution with COPY method.So will be grateful if you can point out the mistakes in the current code – Linu Jun 11 '19 at 09:35
  • I second Roman's advice. I can't see anything you can do to optimize the code you have other than using COPY with several lines instead of INSERT line by line. On the target db you can try to run the same statements you get from `pg_dump --section=pre-data` in order to disable triggers and constraints. – Paulo Scardine Jun 11 '19 at 10:16
  • @PauloScardine Thanks a lot paulo, But is the commit() inside the for loop is making an issue? And if we use executemany() instead of execute() will it be beneficial? – Linu Jun 11 '19 at 10:20
  • It is hard to give advice without knowing your system - many speedups trade memory for CPU for I/O - I guess `executemany` will not help that much, but prepared statements would skip the parsing step. I wish I had time to post a proper answer so forgive me, you will have to research that. – Paulo Scardine Jun 11 '19 at 10:30
  • @Linu don't commit on each row. This will slow you down and will burn through transaction ids. Unless you are very concerned about being able to start where you left off after any error, this is just wasteful. I'd also reconsider printing out each row. The best advice is to just copy all the mysql data to a csv file (either in memory or disk, depending on the size) and use COPY to push it all in at once. – Jeremy Jun 11 '19 at 12:37
  • @Linu now we can give only some very generic recommendations about some best practices (like it was already said to use `COPY` or don't commit transaction after every insert). Given that the current speed is 1 record per second this is too slow. You need to profile your program and figure out where it spends the time. And optimize the part that takes the most. It is quite possible that it is not insertion that is slow but querying. You never know before you profile and have number that backs up your theories. And only after you know what is slow you can fix it. – Roman-Stop RU aggression in UA Jun 11 '19 at 13:30

0 Answers0