Let's say I have a script that reads data into a dataframe from a database, runs some logic on that dataframe, and then exports the resulting dataframe into another database table like below. The issue is that the dataframe in transform.py isn't getting overwritten after the exec function.
NOTE: This is a simple example to demonstrate the issue, not an actual problem I'm trying to solve using this approach.
Expectation:
Before exec
+---------+---------------+--------------+----------+
| metric | modified_date | current_date | datediff |
+---------+---------------+--------------+----------+
| metric1 | 2019-03-31 | 2019-05-03 | 33 |
| metric2 | 2019-03-31 | 2019-05-03 | 33 |
| metric3 | 2019-03-31 | 2019-05-03 | 33 |
| metric4 | 2019-03-20 | 2019-05-03 | 44 |
+---------+---------------+--------------+----------+
After exec
+---------+---------------+--------------+----------+
| metric | modified_date | current_date | datediff |
+---------+---------------+--------------+----------+
| metric4 | 2019-03-20 | 2019-05-03 | 44 |
+---------+---------------+--------------+----------+
Actual:
Before exec
+---------+---------------+--------------+----------+
| metric | modified_date | current_date | datediff |
+---------+---------------+--------------+----------+
| metric1 | 2019-03-31 | 2019-05-03 | 33 |
| metric2 | 2019-03-31 | 2019-05-03 | 33 |
| metric3 | 2019-03-31 | 2019-05-03 | 33 |
| metric4 | 2019-03-20 | 2019-05-03 | 44 |
+---------+---------------+--------------+----------+
After exec
+---------+---------------+--------------+----------+
| metric | modified_date | current_date | datediff |
+---------+---------------+--------------+----------+
| metric1 | 2019-03-31 | 2019-05-03 | 33 |
| metric2 | 2019-03-31 | 2019-05-03 | 33 |
| metric3 | 2019-03-31 | 2019-05-03 | 33 |
| metric4 | 2019-03-20 | 2019-05-03 | 44 |
+---------+---------------+--------------+----------+
They are the same!
transform.py
def dataframe_transform(logic, source_table, dest_table, database, existing_rows='truncate'):
...
df = table_to_df(table=source_table, database=database)
try:
exec(logic)
except Exception:
raise
result = df_to_table(dataframe=df, database=database, table=dest_table, existing_rows=existing_rows)
return result
The logic filters out the dataframe to look for records that need to be updated and kicks off another process and overwrites the original dataframe with the new filtered data.
logic.py
# This is just an example I made up - please don't focus on solving this.
late_df = pd.DataFrame()
# Check if data is late
late_cutoff = 40
for index, row in df.iterrows():
if row['datediff'] >= late_cutoff:
late_df = late_df.append(row, ignore_index=True)
... # Do something else
df = late_df # Save flagged records by updating the original dataframe.
Why would I do this? In this case, I know that the input is secure and it allows me to reuse this code for various scripts and separate out the transform logic.