Background: A python tool that stores and manages daily stock prices. The data is stored in a locally hosted postgres database. There is one "master table" with a list of all stock's and each stock has it's own table for pricing data (currently numbering ~500), and these are foreign keyed to the "master table".
There is a python function that pulls a list of stock names / symbols and stores them in the master table (not shown here as not really relevant). A second function, retrieves the list of stock names from this table and then iteratively calls for pricing data using pandas webreader and stores them in the database.
The underlying ORM is SQLalchemy and Psycopg2 is used as the database adapter.
Problem: The database performance seems to degrade over the course of the function running. It starts off strong and retains this performance for roughly the first 150 inserts. After that however, it slows to a crawl and drops to about 1 insert / second, maybe slightly less. I've attached a screenshot of the pgadmin dashboard, you can clearly see it just drops after a time.
This reduction in performance does not seem to be affected by the number of records e.g. If I just pull 1 day's price data for each stock, the performance seems to be identical to pulling 50 days worth of data. If I stop the script, and restart it, the performance starts off strong again, but degrades as before after a time.
Attempts:I've tried a number of methods to solve this. I have tried using both the pandas to_sql function as well as Pangres. I've also tried outputting the dataframe to a CSV and then using psycopg2's copy_from function to insert the records into each table. I've updated by code below to no longer use iter functions and use list comprehension per the advice here, but it seems to me like the primary cause is still the inserting of records over a number of tables.
Questions: As I am not quite sure what may be causing this behaviour, I've put in a few potential thoughts below as to what may be the underlying issue.
Using separate tables is not a useful or scalable solution and all price data should ideally be in one table.
Potentially I am not making the best use of the sqlalchemy core API and relying too heavily on the ORM aspects / not properly implementing sessions?
Is there a better / more efficient way to get records inserted than Pangres / pd.to_sql? Note that copying a CSV didn't perform any better.
# Database basics.
engine = sqla.create_engine(f'postgresql+psycopg2://{dbuser}:{dbpass}@{dbhost}/{dbname}')
conn = engine.connect()
Session = sessionmaker(engine, expire_on_commit=True)
session = Session()
# The main function.
def insertData():
df = pd.read_sql_table('stock', schema=schema_name, con=conn) # Read the names of all stocks in the master table.
for stock_name, stock_id in zip(df['Stock_symbol'], df['Stock_id']): # Use list comprehension as faster than iterrtuples. Get the stock id and stock symbol for each row in the data frame.
read_price_data = pd.read_sql_query(sql=f"SELECT * FROM stockschema.\"{stock_name}\" ORDER BY \"Date\" DESC LIMIT 1", con=conn, parse_dates={'Date'}) # Read the most recent price data (if any) for each stock symbol and obtain the date.
with session.begin(subtransactions=True): # Opern a session. Removing this doesn't appear to impact performance in anyways, looking at the echo output from psycopg2, it seems like pangres commits every transaction anyways.
start, end = dt.datetime(2020, 12, 12), prev_weekday(dt.date.today()) # Set start and end dates to fetch data. Setting this to even 1 day of records is just as slow as 20.
price_data = web.DataReader(stock_name, 'yahoo', start, end) # Fetch the data frame from Yahoo.
price_data['Volume'] = price_data['Volume'].astype(int) # Some of the volume data has decimal (e.g 123.0). Removing these to match-up with DB column type.
count = len(price_data.index) # Some data manipulation to just ensure that the Foreign key - Stock id is captured in each df before being written to the db.
sid = [stock_id] * count # As above
price_data.insert(0, 'Stock_id', sid) # Inserting the above new column into the df.
pangres.upsert(engine=engine, df=price_data, table_name=stock_name, if_row_exists='ignore', schema=schema_name, create_schema=False, add_new_columns=False, adapt_dtype_of_empty_db_columns=False) # Upserting the dataframe into the relevant stcck's table. Utilising df_to_sql, and psycopg2's copy_from also do not provide any benefits performance wise.