Good Day Python Gurus! Thank you for taking the time to review my question.
Use Case: I want to find a best solution to compare two DataFrames of data sourced from SQL Server and Snowflake Azure, for Data Validation, and then export to a CSV file ONLY the results in SQL Server DF that do not match the data, or missing data, in Snowflake DF results. This way I can take the results and research in SQL Server to see why those records did not make it over to Snowflake.
Additionally, I need to remove any columns that do not match or are missing between Source and Target tables (did this manually as you can see in my code below), convert columns and data to uppercase, and fill NaN with zero's.
Lastly, I added in a reindex() of all the columns after they were sorted() to make sure the columns are in alphabetical order for the comparison.
Question: Reviewing my code below, and taking into account the code I tried earlier with errors, do you have a more elegant solution or do you see a flaw in my code I can correct and hopefully make this work?
I am attempting the solution linked above in my code, but I keep getting this error:
Traceback (most recent call last):
File "D:\PythonScripts\Projects\DataCompare\DFCmpr_SQL_ManualSQL.py", line 166, in <module>
df_diff = sql_df[sf_df != sql_df].dropna(how='all', axis=1).dropna(how='all', axis=0).astype('Int64')
File "C:\Program Files\Python39\lib\site-packages\pandas\core\ops\common.py", line 69, in new_method
return method(self, other)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\arraylike.py", line 36, in __ne__
return self._cmp_method(other, operator.ne)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\frame.py", line 6851, in _cmp_method
self, other = ops.align_method_FRAME(self, other, axis, flex=False, level=None)
File "C:\Program Files\Python39\lib\site-packages\pandas\core\ops\__init__.py", line 288, in align_method_FRAME
raise ValueError(
ValueError: Can only compare identically-labeled DataFrame objects
I have manually checked the output print(df1.columns) and print(df2.columns) and I do not see any difference. They seem identical to me for the index(). Am I doing something wrong maybe?
SQL Server is a Table of data, Snowflake is a View of Data. Columns are named exactly the same in each DB.
My Code is below (I renamed some columns for security reasons):
# ############## VERSION 2.0 #############
# ###### NOTES ########
# -------------- Import packages needed ----------------------------
import sys, os, pyodbc, datetime, collections
import pandas as pd
import snowflake.connector as sf
import sqlalchemy as sa
import SNCC_Conn as sfconn
pd.set_option("display.max_rows", 999)
# set params for Snowflake Connection
sncc_db = 'DATABASE'
sncc_sch = 'SCHEMA'
sncc_tbl = 'TABLE_1'
sncc_qry = 'SELECT * FROM '+sncc_sch+'.'+sncc_tbl+''
sf_qry = r'' + sncc_qry
# cant use a tupel as that is not able to be updated.
# set params for SQL Connection TST . This is setup for trusted site meaning it will use SSO.
sql_srvr = 'SQLSERVER'
sql_db = 'DATABASE'
sql_sch = 'SCHEMA'
sql_tbl = 'TABLE_1'
ms_sql_qry = 'SELECT * FROM '+sql_sch+'.' +sql_tbl+''
fileName = 'SQL_SF_CombinedPolicy'
# --------------------------- Snowflake Connection ---------------------------
try:
sf_conn = sfconn.snowflake_connect(schema = sncc_sch, database = sncc_db)
except Exception as e:
print('Connection Failed. Please try again.')
print('Error: ' + str(e) )
quit()
print('Snowflake Connection established!')
print(sf_qry)
try:
# excute the query
sf_conn.execute(sf_qry)
# Fetch all snowflake results into a Pandas Dataframe
sf_df = sf_conn.fetch_pandas_all()
# Make all Dataframe Columns Uppercase
sf_df.columns = map(str.upper, sf_df.columns)
#Replace NaN ( not a number ) data values with a zero (0).
sf_df = sf_df.fillna(0)
# Remove columns that are not in source table on a per need basis OR Comment it out with a #.
sf_df = sf_df.loc[:, ~sf_df.columns.isin(['ROWx','ROWy','ROWz'])]
# Sort data by columns available, or can change this to sort only certain columns.
sf_df = sf_df.reindex(sorted(sf_df.columns), axis=1)
# Print out results on screen during development phase.
print(sf_df)
print(sf_df.columns)
print('Snowflake Dataframe Load Successful.')
except Exception as e:
print('Snowflake Dataframe load Unsuccessful. Please try again.')
print('Error: ' + str(e) )
# # --------------------------- SQL Server Connection ---------------------------
try:
# single '\' provides a concat to the DRIVE, SERVER, DATABASE, trusted connection lines, as if a single line of code.
sql_conn = pyodbc.connect('DRIVER={SQL Server}; \
SERVER=' + sql_srvr + '; \
DATABASE=' + sql_db +';\
Trusted_Connection=yes;' # Using Windows User Account for authentication.
)
# cursor = sql_conn.cursor()
print('SQL Connection established!')
except Exception as e:
print('Connection Failed. Please try again.')
print('Error: ' + str(e) )
try:
#SQLquery = input("What is your query for SQL Server?: ") -- Add "IF" statements to check manual input?
# Query results and place them in variable
# cursor.execute(sql_qry)
sql_qry = pd.read_sql_query(ms_sql_qry,sql_conn)
# Put results into a Data Frame from Pandas
sql_df = pd.DataFrame(sql_qry)
# Make all Dataframe Columns Uppercase
sql_df.columns = map(str.upper, sql_df.columns)
#Replace NaN ( not a number ) data values with a zero (0).
sql_df = sql_df.fillna(0)
# Remove columns that are not in target table on a per need basis OR comment it out with a #.
sql_df = sql_df.loc[:, ~sql_df.columns.isin(['ROW1','ROW2','ROW3'])]
# Sort data by columns
sql_df = sql_df.reindex(sorted(sql_df.columns), axis=1)
# Print out results during development phase.
print(sql_df)
print(sql_df.columns)
print('SQL Server Dataframe Load Successful')
print('Comparing SQL to SNCC Dataframes')
#/********************* COMPARISON SCRIPT **************/
#sql_df.compare(sncc_df)
# Compare the two DataFrames and produce results from Source (sql_df) that do not match Target (sf_df).
# ---------- ERROR: ValueError: Can only compare identically-labeled DataFrame objects
df_diff = sql_df[sf_df != sql_df].dropna(how='all', axis=1).dropna(how='all', axis=0).astype('Int64')
# print out results of differences during development phase.
print(df_diff)
# Export out to CSV using a variable for the name of the file, future state.
df_diff.to_csv(r'D:\PythonResults\DataDiff_' + fileName + '.csv', index = False)
print('Datafram output from comparison outputed to PythonResults folder in Documents as DataDiff_' + fileName + 'csv.')
except pyodbc.Error as e:
# Message stating export unsuccessful.
print("MSSQL Dataframe load unsuccessful.")
finally:
sf_conn.close()
print("Connection to Snowflake closed")
sql_conn.commit()
sql_conn.close()
print("Connection to MSSQL Server closed")
EDIT 1:
I wanted to add that these data sets I am bringing in from SQL Server and Snowflake have a mixture of datatypes. Integer, VarChar, Date, DateTime, etc. I am not sure if that makes a difference.