I am using the following modules:
- SQLAlchemy==2.0.18
- pyodbc==4.0.38
- pandas==2.0.3
When I am trying to insert dataframe with to_sql() function it takes me 89 seconds for 44,777 rows and 28 columns which appear to be too slow for me. Back in the days I used earlier version of SQLAlchemy and it worked for me with the same dataframe size in 8 seconds.
How can I make it work faster?
I'm willing to make the inesrtion less than 15 seconds for 44,777 rows.
If I am using fast_executemany=True, I am receiving the following error:
(pyodbc.ProgrammingError) ('String data, right truncation: length 38 buffer 20', 'HY000')
My code: connection.py
import sqlalchemy
import pyodbc
import urllib.parse
class DatabaseConnection:
def __init__(self, connection_string: str) -> None:
try:
self.connection_string = str(connection_string)
except Exception as err:
raise err
def get_engine(self) -> sqlalchemy.Engine:
try:
params = urllib.parse.quote_plus(self.connection_string)
engine: sqlalchemy.Engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
return engine
except Exception as err:
raise err
def connect_to(self) -> sqlalchemy.Connection:
try:
engine = self.get_engine()
conn = engine.connect()
return conn
except Exception as err:
raise err
repository.py
class StagingRepository:
def __init__(self, connection_string: str) -> None:
self.connection = DatabaseConnection(connection_string)
self.log = logger.getLogger('project.StagingRepository')
def write_data_to_stage(self, row: pd.Series, df: pd.DataFrame, dtypedict: dict):
try:
self.drop_stg_table(row)
engine = self.connection.get_engine()
self.log.info('Start insertion to staging database')
chunk_size = 400
start_time = time.time()
for i in range(0, len(df), chunk_size):
chunk = df[i:i+chunk_size]
chunk.to_sql(str(row['DataTable']), engine, if_exists='append', index=False, dtype=dtypedict)
end_time = time.time()
elapsed_time = end_time - start_time
self.log.info(f'Dataframe insertion took {elapsed_time:.2f} seconds')
result = df.shape[0]
return result
except Exception as err:
self.log.error(f'Insertion for File ID: {row["FileID"]}, Name:{row["FileName"]} into table: {row["DataTable"]} failed\n {err}', exc_info=True)
raise err
def drop_stg_table(self, row: pd.Series):
try:
conn: Connection = self.connection.connect_to()
query = text(f"DROP TABLE IF EXISTS {row['DataTable']};")
conn.execute(query)
conn.commit()
except Exception as err:
raise err
finally:
conn.close()