2

I have looked through similar posts on SO and they seem to be specific to using Docker environments and haven't been much helpful. Ours is a little different, we do run a docker image of Airflow hosted on Azure App Service but it connects to hosted Azure Database for PostgreSQL server (version 11).

Python = 3.8  
Apache Airflow = 2.1.4  
SQL Alchemy = 1.3.24  
Executor = Local

The environment has been setup and it works fine for most cases. However, when we run DAGs that handle large amounts of data (typically several GB), we suddenly encounter Heartbeat issues. Now, I have tried setting values in Airflow Config for Keep Alives through sql_alchemy_connect_args variable, and also changing the variables web_server_master_timeout and web_server_worker_timeout to a higher value to no avail.

The ERROR:

{base_job.py:222} ERROR - LocalTaskJob heartbeat got an exception
    Traceback (most recent call last):
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2336, in _wrap_pool_connect
        return fn()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 364, in connect
        return _ConnectionFairy._checkout(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 778, in _checkout
        fairy = _ConnectionRecord.checkout(pool)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 495, in checkout
        rec = pool._do_get()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py”, line 241, in _do_get
        return self._create_connection()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 309, in _create_connection
        return _ConnectionRecord(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 440, in __init__
        self.__connect(first_connect_check=True)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 661, in __connect
        pool.logger.debug(“Error on connect(): %s”, e)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py”, line 68, in __exit__
        compat.raise_(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
        raise exception
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 656, in __connect
        connection = pool._invoke_creator(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py”, line 114, in connect
        return dialect.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py”, line 508, in connect
        return self.dbapi.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/psycopg2/__init__.py”, line 122, in connect
        conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
    psycopg2.OperationalError: could not translate host name “<address>” to address: Temporary failure in name resolution
    
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File “/usr/local/lib/python3.8/site-packages/airflow/jobs/base_job.py”, line 194, in heartbeat
        session.merge(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 2166, in merge
        return self._merge(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 2244, in _merge
        merged = self.query(mapper.class_).get(key[1])
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 1018, in get
        return self._get_impl(ident, loading.load_on_pk_identity)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 1135, in _get_impl
        return db_load_fn(self, primary_key_identity)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py”, line 286, in load_on_pk_identity
        return q.one()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3490, in one
        ret = self.one_or_none()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3459, in one_or_none
        ret = list(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3535, in __iter__
        return self._execute_and_instances(context)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3556, in _execute_and_instances
        conn = self._get_bind_args(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3571, in _get_bind_args
        return fn(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py”, line 3550, in _connection_from_session
        conn = self.session.connection(**kw)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 1142, in connection
        return self._connection_for_bind(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 1150, in _connection_for_bind
        return self.transaction._connection_for_bind(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py”, line 433, in _connection_for_bind
        conn = bind._contextual_connect()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2302, in _contextual_connect
        self._wrap_pool_connect(self.pool.connect, None),
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2339, in _wrap_pool_connect
        Connection._handle_dbapi_exception_noconnection(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 1583, in _handle_dbapi_exception_noconnection
        util.raise_(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
        raise exception
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py”, line 2336, in _wrap_pool_connect
        return fn()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 364, in connect
        return _ConnectionFairy._checkout(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 778, in _checkout
        fairy = _ConnectionRecord.checkout(pool)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 495, in checkout
        rec = pool._do_get()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py”, line 241, in _do_get
        return self._create_connection()
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 309, in _create_connection
        return _ConnectionRecord(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 440, in __init__
        self.__connect(first_connect_check=True)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 661, in __connect
        pool.logger.debug(“Error on connect(): %s”, e)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py”, line 68, in __exit__
        compat.raise_(
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py”, line 182, in raise_
        raise exception
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py”, line 656, in __connect
        connection = pool._invoke_creator(self)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py”, line 114, in connect
        return dialect.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py”, line 508, in connect
        return self.dbapi.connect(*cargs, **cparams)
      File “/usr/local/lib/python3.8/site-packages/psycopg2/__init__.py”, line 122, in connect
        conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
    sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not translate host name “<address>” to address: Temporary failure in name resolution
    
    (Background on this error at: http://sqlalche.me/e/13/e3q8)

Could someone throw some light to help me navigate through this issue? I am at my wits end and I am not sure, if I am treading in the right direction debugging this.

Vinay Kulkarni
  • 300
  • 1
  • 5
  • 13
  • Are you able to connect to the postgres via psql from the machine which has got airflow installed? – floating_hammer Oct 27 '21 at 12:28
  • Airflow is not installed on a machine, but instead on a web hosting service in Azure called App Service. Also, we have not been able to get SSH to Docker container working as well - so cannot test psql. – Vinay Kulkarni Oct 27 '21 at 22:09
  • In that case the app service is not able to resolve the endpoint address of the postgres service. – floating_hammer Oct 28 '21 at 05:10
  • Well, it does resolve and it works for most tasks. It only gives this error when processing larger tables. I reckon this might have something to do with heartbeat configuration in Airflow. – Vinay Kulkarni Oct 28 '21 at 21:28
  • @VinayKulkarni, we're having a similar issue only when handling large amounts of data. Did you find a solution to the problem? We're using Airflow in AKS with Azure Database for PostgreSQL server as database. – Nander Speerstra Apr 26 '22 at 12:06
  • @NanderSpeerstra We have attributed this issue to the many connections airflow makes to the database. Haven't been able to fully resolve this yet. We have refactored our code to make connections to the database using context managers which seem to have reduced it. We are contemplating on implementing a pg_bouncer to better manage this in the future. – Vinay Kulkarni Apr 28 '22 at 02:48

1 Answers1

3

I had the same issue with my deployment of airflow on a Kubernetes cluster. This is apparently due to a high number of simultaneous connections to the database.

I fixed it by enabling the pgBouncer as it is recommended in the official production guide.

# PgBouncer settings
pgbouncer:
  # Enable PgBouncer
  enabled: true