I have a script where I need to append the contents of a dataframe to a SQL database table I created. I need to do this many times to several tables with several dataframes as sources.
I am using Pandas with a sqlalchemy engine on a pyodbc connection to an MSSQL database
To ensure that I am only appending the data from the dataframe which has a corresponding column in the database, I have an "append data to sql" function:
def append_data_to_sql(db_connection, new_rows: pd.DataFrame, table_name: str) -> bool:
# Get column names for the destination table
query = 'SELECT column_name, data_type ' \
'FROM information_schema.columns ' \
'WHERE table_name=?'
result = db_connection.execute(query, table_name).fetchall()
columns_in_sql = pd.DataFrame(data=result, columns=['COLUMN_NAME', 'DATA_TYPE'])
new_table = pd.DataFrame(columns=list(columns_in_sql['COLUMN_NAME']))
new_rows.columns = new_rows.columns.str.lower()
new_table.columns = new_table.columns.str.lower()
# Only keep the columns that are in destination and if there is no
# column in the data to be appended then create an empty column
for column in new_table.columns:
if column in new_rows.columns:
new_table[column] = new_rows[column]
else:
new_table[column] = pd.NA
try:
new_table.to_sql(table_name, db_connection, if_exists='append', index=False)
except sqlalchemy.exc.DBAPIError as e:
logging.exception(f'Error while appending to {table_name}: {e}', exc_info=True)
return True
return False
The context data I'm passing to my function is:
new_rows = pd.DataFrame.from_records({
'system': 'the_system_name',
'data_update_time': Timestamp('2022-03-02 10:00:48.958701'),
'first_available_data_point': None,
'last_available_data_point': None,
'line_name': 'the_line_name',
'server': 'the_server_name',
'day_start_hours': 0.0,
'bu': 'the_bu_name',
'number_of_constraints': 3
})
columns_in_sql = pd.DataFrame.(data= [
('system', 'varchar'),
('data_update_time', 'datetime'),
('first_available_data_point', 'datetime'),
('last_available_data_point', 'datetime'),
('line_name', 'varchar'),
('server', 'varchar'),
('day_start_hours', 'numeric'),
('bu', 'varchar'),
('number_of_constraints', 'int')
], columns=['COLUMN_NAME', 'DATA_TYPE'])
The error that I am getting is:
sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('Invalid parameter type. param-index=8 param-type=numpy.int64', 'HY105')
[SQL: INSERT INTO my_table (system, data_update_time, first_available_data_point, last_available_data_point, line_name, server, day_start_hours, bu, number_of_constraints) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('the_system_name', Timestamp('2022-03-02 10:00:48.958701'), None, None, 'the_line_name', 'the_server_name', 0.0, 'the_bu_name', 3)]
(Background on this error at: https://sqlalche.me/e/14/f405)
The issue is clearly that the '3' on the end is the wrong kind of integer for the SQL database, and I found a similar issue which addresses this exact issue, but with a direct executemany()
call through pyodbc.
The problem is that I'm trying to use SqlAlchemy through Pandas, so I'm not actually creating the insert statement myself.
I've tried specifying the dtype of each column by adding:
from sqlalchemy import types
sql_dtypes = {'varchar': types.String(),
'int': types.SmallInteger(),
'datetime': types.DateTime(),
'date': types.Date(),
'nvarchar': types.String(),
'numeric': types.Numeric(),
'float': types.Float(),
'real': types.Float(),
'bool': types.Boolean(),
}
for index, row in columns_in_sql.iterrows():
new_dtypes[row['COLUMN_NAME']] = sql_dtypes[row['DATA_TYPE']]
and adding the dtype arg to to_sql
:
new_table.to_sql(table_name, db_connection, if_exists='append', index=False, dtype=new_dtypes)
I then tried all the different Integer Types on the SqlAlchemy docs page, Integer()
, BigInteger()
, SmallInteger()
, with the same error.
I'm hoping I can find a solution for this here before I re-write the function to do all the things pandas and sqlalchemy should (I think) be taking care of already.