0

I have a script where I need to append the contents of a dataframe to a SQL database table I created. I need to do this many times to several tables with several dataframes as sources.

I am using Pandas with a sqlalchemy engine on a pyodbc connection to an MSSQL database

To ensure that I am only appending the data from the dataframe which has a corresponding column in the database, I have an "append data to sql" function:

def append_data_to_sql(db_connection, new_rows: pd.DataFrame, table_name: str) -> bool:
    # Get column names for the destination table
    query = 'SELECT column_name, data_type ' \
            'FROM information_schema.columns ' \
            'WHERE table_name=?'
    result = db_connection.execute(query, table_name).fetchall()
    columns_in_sql = pd.DataFrame(data=result, columns=['COLUMN_NAME', 'DATA_TYPE'])
    new_table = pd.DataFrame(columns=list(columns_in_sql['COLUMN_NAME']))
    new_rows.columns = new_rows.columns.str.lower()
    new_table.columns = new_table.columns.str.lower()

    # Only keep the columns that are in destination and if there is no
    # column in the data to be appended then create an empty column
    for column in new_table.columns:
        if column in new_rows.columns:
            new_table[column] = new_rows[column]
        else:
            new_table[column] = pd.NA

    try:
        new_table.to_sql(table_name, db_connection, if_exists='append', index=False)
    except sqlalchemy.exc.DBAPIError as e:
        logging.exception(f'Error while appending to {table_name}: {e}', exc_info=True)
        return True

    return False

The context data I'm passing to my function is:

 new_rows = pd.DataFrame.from_records({
                                       'system': 'the_system_name', 
                                       'data_update_time': Timestamp('2022-03-02 10:00:48.958701'), 
                                       'first_available_data_point': None, 
                                       'last_available_data_point': None, 
                                       'line_name': 'the_line_name', 
                                       'server': 'the_server_name', 
                                       'day_start_hours': 0.0, 
                                       'bu': 'the_bu_name', 
                                       'number_of_constraints': 3 
                                       })

columns_in_sql = pd.DataFrame.(data= [
                                      ('system', 'varchar'),
                                      ('data_update_time', 'datetime'), 
                                      ('first_available_data_point', 'datetime'), 
                                      ('last_available_data_point', 'datetime'), 
                                      ('line_name', 'varchar'), 
                                      ('server', 'varchar'), 
                                      ('day_start_hours', 'numeric'), 
                                      ('bu', 'varchar'), 
                                      ('number_of_constraints', 'int')
                                     ], columns=['COLUMN_NAME', 'DATA_TYPE']) 

The error that I am getting is:

sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('Invalid parameter type.  param-index=8 param-type=numpy.int64', 'HY105')
[SQL: INSERT INTO my_table (system, data_update_time, first_available_data_point, last_available_data_point, line_name, server, day_start_hours, bu, number_of_constraints) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('the_system_name', Timestamp('2022-03-02 10:00:48.958701'), None, None, 'the_line_name', 'the_server_name', 0.0, 'the_bu_name', 3)]
(Background on this error at: https://sqlalche.me/e/14/f405)

The issue is clearly that the '3' on the end is the wrong kind of integer for the SQL database, and I found a similar issue which addresses this exact issue, but with a direct executemany() call through pyodbc.

The problem is that I'm trying to use SqlAlchemy through Pandas, so I'm not actually creating the insert statement myself.

I've tried specifying the dtype of each column by adding:

from sqlalchemy import types

sql_dtypes = {'varchar': types.String(),
              'int': types.SmallInteger(),
              'datetime': types.DateTime(),
              'date': types.Date(),
              'nvarchar': types.String(),
              'numeric': types.Numeric(),
              'float': types.Float(),
              'real': types.Float(),
              'bool': types.Boolean(),
              }
    
for index, row in columns_in_sql.iterrows():
        new_dtypes[row['COLUMN_NAME']] = sql_dtypes[row['DATA_TYPE']]

and adding the dtype arg to to_sql:

new_table.to_sql(table_name, db_connection, if_exists='append', index=False, dtype=new_dtypes)

I then tried all the different Integer Types on the SqlAlchemy docs page, Integer(), BigInteger(), SmallInteger(), with the same error.

I'm hoping I can find a solution for this here before I re-write the function to do all the things pandas and sqlalchemy should (I think) be taking care of already.

John K
  • 3
  • 4
  • What versions of pandas, numpy, and pyodbc are you using? I am unable to reproduce your issue with pandas 1.4.1, numpy 1.22.2 and pyodbc 4.0.32 using ODBC Driver 17 for SQL Server. – Gord Thompson Mar 02 '22 at 21:49
  • pandas 1.4.0, numpy 1.22.2, pyodbc 4.0.32, sqlalchemy 1.4.31 (hit enter before trying again with updated libs, will report back in a sec) – John K Mar 07 '22 at 14:49
  • I just tried it with the updated pandas, and I also noticed there was a new version of sqlalchemy - still having the issue. (Also ODBC Driver 17 for SQL Server, both) I've tried this on multiple different venvs and computers and I get the same one. – John K Mar 07 '22 at 14:57
  • [This code](https://pastebin.com/HV36xj5F) works for me. Does it work for you? – Gord Thompson Mar 07 '22 at 15:34
  • Yes, the provided code works on a simple table with only the id and n columns in the same venv and database server as my problem code. I was able to re-write my function to use the bare pyodbc dbapi connection in sqlalchemy with executemany() per the [sqlalchemy docs](https://docs.sqlalchemy.org/en/13/_modules/examples/performance/bulk_inserts.html). I reconstructed the dataframe as a list of lists and re-typed each element to the basic python type and I no longer get the error, but I am not happy with that solution cause it doesn't actually solve the problem (and is extremely inefficient) – John K Mar 08 '22 at 16:12

0 Answers0