I have a function that connects to a mysql db and executes a query, that takes quite long (approx. 10 min)
def foo(connections_string): # connection_string something like "mysql://user:key@jost/db"
statement = "SELECT * FROM largtable"
conn = None
df = None
try:
engine = sqlalchemy.create_engine(
connections_string,
connect_args={
"connect_timeout": 1500,
},
poolclass = QueuePool,
pool_pre_ping = True,
pool_size = 10,
pool_recycle=3600,
pool_timeout = 900,
)
conn = engine.connect()
df = pd.read_sql_query(statement, conn)
except Exception:
raise Exception("could not load data")
finally:
if conn:
conn.close()
return df
When I run this in my local envionment, it works and takes about 600 seconds. When I run this via airflow, it fails after about 5 to 6 Mins with the error (_mysql_exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')
I have tried the suggestions on stakoverflow to adjust the timeout of sqlalchemy (e.g., this and this) and from the sqlalchemy docs, which lead to the additional args (pool_
and connection_args
) for the create_engine()
function. However, these didn't seem to have any effect at all.
I've also tried to replace sqlalchemy with pymysql, which lead to the same error on airflow. Thus, I didn't try flask-sqlalchemy yet, since I expect the same result.
Since it works in the basically same environment (py version 3.7.x, sqlalchemy 1.3.3 and pandas 1.3.x) if not run by airflow but doesn't when run by airflow, I think there is some global variable, that overrules my timeout settings. But I have no idea where to start the search.
And some additional info, b/c somebody could work with the info: I got it running with airflow twice now in off-hours (5 am and sundays). But not again since.
PS: unfortunately, pagination as suggested here is not an option, since the query runtime results from transformations and calculations.