I'm using the below code to port the data from MySQL to postgreSQL database and it is working fine, but it's rather slow though( more than 3 minutes is taking for porting some 500 rows, so i think the for loop in one of the function is iterating each rows and inserting it to database, but is there a way to execute the whole rows at once rather than using for loop to loop through all rows? Can it be done via executemany, if so what all changes i need to do? or is there a faster way to do the porting want the gap to be as narrow as possible.
Below the function i need to change(Whether the commit in the for loop is making an issue? Or is it fine that i exclude the commit in for loop?) , If anyone have any reference it will be of great help.
Note: using PostgreSQL and Python, Source server using MySQL.
def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command):
print("Function Call..")
msql.execute(msql_command)
for row in msql:
try:
print("Insertion of rows..")
print(row)
psql.execute(psql_command, row)
cnx_psql.commit()
Below is the whole porting script for reference.
import psycopg2
import os
import time
import MySQLdb
import sys
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
from utils.config import Configuration as Config
from utils.postgres_helper import get_connection
from utils.utils import get_global_config
def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command):
print("Function Call..")
msql.execute(msql_command)
for row in msql:
try:
print("Insertion of rows..")
print(row)
psql.execute(psql_command, row)
cnx_psql.commit()
except psycopg2.Error as e:
print ("Cannot execute the query!!", e.pgerror)
sys.exit("Problem occured with the query!!!")
def dB_Fetch():
# MySQLdb connection
try:
source_host = 'magento'
conf = get_global_config()
cnx_msql = MySQLdb.connect(host=conf.get(source_host, 'host'),
user=conf.get(source_host, 'user'),
passwd=conf.get(source_host, 'password'),
port=int(conf.get(source_host, 'port')),
db=conf.get(source_host, 'db'))
except mysql.connector.Error as e:
print ("MYSQL: Unable to connect!", e.msg)
sys.exit(1)
# Postgresql connection
try:
cnx_psql = get_connection(get_global_config(), 'pg_dwh')
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# Cursors initializations
cur_msql = cnx_msql.cursor()
cur_psql = cnx_psql.cursor()
try:
print("creating table using cursor")
SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;"""
SQL_create_sales_flat_quote="""DROP TABLE IF EXISTS staging.sales_flat_quote;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote
(
customer_id BIGINT
, entity_id BIGINT
, store_id BIGINT
, customer_email TEXT
, customer_firstname TEXT
, customer_middlename TEXT
, customer_lastname TEXT
, customer_is_guest BIGINT
, customer_group_id BIGINT
, created_at TIMESTAMP WITHOUT TIME ZONE
, updated_at TIMESTAMP WITHOUT TIME ZONE
, is_active BIGINT
, items_count BIGINT
, items_qty BIGINT
, base_currency_code TEXT
, grand_total NUMERIC(12,4)
, base_to_global_rate NUMERIC(12,4)
, base_subtotal NUMERIC(12,4)
, base_subtotal_with_discount NUMERIC(12,4)
)
;"""
SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.sales_flat_quote_item;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item
(store_id INTEGER
, row_total NUMERIC
, updated_at TIMESTAMP WITHOUT TIME ZONE
, qty NUMERIC
, sku CHARACTER VARYING
, free_shipping INTEGER
, quote_id INTEGER
, price NUMERIC
, no_discount INTEGER
, item_id INTEGER
, product_type CHARACTER VARYING
, base_tax_amount NUMERIC
, product_id INTEGER
, name CHARACTER VARYING
, created_at TIMESTAMP WITHOUT TIME ZONE
);"""
SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.catalog_product_flat_1;CREATE TABLE IF NOT EXISTS staging.catalog_product_flat_1
(name CHARACTER VARYING
, sku CHARACTER VARYING
, type_id CHARACTER VARYING
, created_at TIMESTAMP WITHOUT TIME ZONE
, url_path CHARACTER VARYING
, price NUMERIC
, short_description CHARACTER VARYING
, url_key CHARACTER VARYING
, thumbnail_label CHARACTER VARYING
, small_image CHARACTER VARYING
, thumbnail CHARACTER VARYING
);"""
print("Creating Schema...")
cur_psql.execute(SQL_create_Staging_schema)
print("Creating tables...")
cur_psql.execute(SQL_create_sales_flat_quote)
cur_psql.execute(SQL_create_sales_flat_quote_item)
cnx_psql.commit();
print("Fetching data from source server")
#select from Magento & insert into DWH: SELECT & INSERT SQL statements are created as PAIRS for looping.
commands = [
("""SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename
, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active
, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
, base_subtotal_with_discount from sales_flat_quote
where is_active=1
AND items_count != '0'
AND updated_at > '2019-05-12 00:00:00';""",
"""INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname
, customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at
, is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
, base_subtotal_with_discount)
SELECT %s, %s, %s,%s,%s,%s,%s,%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;"""),
("""SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type
,base_tax_amount,product_id,name,created_at from sales_flat_quote_item WHERE updated_at > '2019-05-12 00:00:00';""",
"""INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id
,price,no_discount ,item_id,product_type,base_tax_amount,product_id,name
,created_at)
SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")
,("""SELECT created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail
,name,sku,type_id from catalog_product_flat_1;""",
"""INSERT INTO staging.catalog_product_flat_1 (created_at,url_path,price,short_description,url_key,thumbnail_label
,small_image,thumbnail,name,sku,type_id)
SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")
]
for msql_command, psql_command in commands:
psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,cur_msql, cur_psql, msql_command, psql_command)
except (Exception, psycopg2.Error) as error:
print ("Error while fetching data from PostgreSQL", error)
finally:
## Closing cursors
cur_msql.close()
cur_psql.close()
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql.close()
cnx_psql.close()
if __name__ == '__main__':
dB_Fetch()