0

I have a function that connects to a mysql db and executes a query, that takes quite long (approx. 10 min)

def foo(connections_string): # connection_string something like "mysql://user:key@jost/db"
    statement = "SELECT * FROM largtable"
    conn = None
    df = None
    try:
        engine = sqlalchemy.create_engine(
            connections_string,
            connect_args={
                "connect_timeout": 1500,
                },
            poolclass = QueuePool, 
            pool_pre_ping = True,
            pool_size = 10,
            pool_recycle=3600,
            pool_timeout = 900,
        )
        conn = engine.connect()
        df =  pd.read_sql_query(statement, conn)
    except Exception:
        raise Exception("could not load data")
    finally:
        if conn:
            conn.close()
    return df

When I run this in my local envionment, it works and takes about 600 seconds. When I run this via airflow, it fails after about 5 to 6 Mins with the error (_mysql_exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')

I have tried the suggestions on stakoverflow to adjust the timeout of sqlalchemy (e.g., this and this) and from the sqlalchemy docs, which lead to the additional args (pool_ and connection_args) for the create_engine() function. However, these didn't seem to have any effect at all.

I've also tried to replace sqlalchemy with pymysql, which lead to the same error on airflow. Thus, I didn't try flask-sqlalchemy yet, since I expect the same result.

Since it works in the basically same environment (py version 3.7.x, sqlalchemy 1.3.3 and pandas 1.3.x) if not run by airflow but doesn't when run by airflow, I think there is some global variable, that overrules my timeout settings. But I have no idea where to start the search.

And some additional info, b/c somebody could work with the info: I got it running with airflow twice now in off-hours (5 am and sundays). But not again since.

PS: unfortunately, pagination as suggested here is not an option, since the query runtime results from transformations and calculations.

Racooneer
  • 329
  • 1
  • 2
  • 11
  • I think it depends on your Deployment. For example you can have a firewall between your Airflow worker that will close connections if they are idle for more than 10 minutes for example. I think you should verify with your admins. – Jarek Potiuk Sep 24 '21 at 10:40
  • @JarekPotiuk thanks. I have an airflow test environment running in a local docker container on the same setup that I use for development. The airflow call does not go through, the one from the dev environment does. So, I can exclude the firewall from the problem list. – Racooneer Sep 24 '21 at 13:31
  • 1
    Did you try it to run it via Docker (without Airflow) - that should give you answer if this is a problem with Docker network or not. Also as next step you can exec into running container of airlfow and run it manually toexclude any "Airflow" impact. – Jarek Potiuk Sep 24 '21 at 13:37
  • @JarekPotiuk running the function from within the container is a good idea, I will test it. However, I'm not sure whether I get the difference between the two options in your two sentences, Could you please elaborate? I cannot figure out what different things would happen under the hood then, while both approaches obviously intend to run the py function in the container. – Racooneer Sep 24 '21 at 13:44
  • depending whether you use "bare" image- *Python" or "Airflow". If you have the same problem with just running it from just Python image (not airflow) then it means that the problem is in networking/container. – Jarek Potiuk Sep 24 '21 at 14:16
  • @JarekPotiuk so I ran the function in the container w/o airflow and got the same error which I still don't get running the function outside of the container. That is progress, thanks for that, but I'm clueless from here. I wouldn't know where to start to look for settings that overrule every timeout setting I try to pass via args – Racooneer Sep 27 '21 at 10:54

0 Answers0