3

I have setup Airflow a SQL Server as backend (SQL Azure). Init DB is successful. I am trying to run a simple dag every 2 minutes.

The dag has 2 tasks:

  1. print date
  2. sleep

When it start the airflow scheduler, it creates tasks instances for both the tasks, the first one succeeds & the second one seems to be stuck in "running" state.

Looking at scheduler logs, I see the following error repeatedly.

[2019-01-04 11:38:48,253] {jobs.py:397} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 389, in helper
    pickle_dags)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/jobs.py", line 1816, in process_file
    dag.sync_to_db()
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/home/sshuser/.local/lib/python2.7/site-packages/airflow/models.py", line 4296, in sync_to_db
    DagModel).filter(DagModel.dag_id == self.dag_id).first()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2755, in first
    ret = list(self[0:1])
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2547, in __getitem__
    return list(res)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2855, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2876, in _execute_and_instances
    close_with_result=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2885, in _get_bind_args
    **kw
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2867, in _connection_from_session
    conn = self.session.connection(**kw)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1019, in connection
    execution_options=execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 1024, in _connection_for_bind
    engine, execution_options)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 409, in _connection_for_bind
    conn = bind.contextual_connect()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2112, in contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2151, in _wrap_pool_connect
    e, dialect, self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1465, in _handle_dbapi_exception_noconnection
    exc_info
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2147, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 387, in connect
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 768, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 516, in checkout
    rec = pool._do_get()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1140, in _do_get
    self._dec_overflow()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1137, in _do_get
    return self._create_connection()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 333, in _create_connection
    return _ConnectionRecord(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 461, in __init__
    self.__connect(first_connect_check=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 651, in __connect
    connection = pool._invoke_creator(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/strategies.py", line 105, in connect
    return dialect.connect(*cargs, **cparams)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 393, in connect
    return self.dbapi.connect(*cargs, **cparams)
InterfaceError: (pyodbc.InterfaceError) ('28000', u"[28000] [unixODBC][Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Login failed for user 'airflowuser'. (18456) (SQLDriverConnect)")

Airflow is configured to use LocalExecutor & pyodbc to connect to SQL Azure

    # The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
#executor = SequentialExecutor
executor = LocalExecutor


# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/sshuser/airflow/airflow.db
#connection string to MS SQL Serv er
sql_alchemy_conn = mssql+pyodbc://airflowuser@afdsqlserver76:<password>@afdsqlserver76.database.windows.net:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server


# The encoding for the databases
sql_engine_encoding = utf-8

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 10

# The SqlAlchemy pool recycle is the number of seconds a connection
# can be idle in the pool before it is invalidated. This config does
# not apply to sqlite. If the number of DB connections is ever exceeded,
# a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 180

# How many seconds to retry re-establishing a DB connection after
# disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300

Dag is as follows

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 4),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval='*/2 * * * *', max_active_runs=1, catchup=False)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

I am a bit lost on why scheduler would not be able to connect to DB after it has run the first task successfully. Any pointers to resolve this is much appreciated.

I have a sample program that uses sqlalchemy to connect to SQL Azure using the same credentials & this works.

import sqlalchemy

from sqlalchemy import create_engine

engine = create_engine("mssql+pyodbc://afdadmin@afdsqlserver76:<password>@afdsqlserver76.database.windows.net:1433/airflowdb?driver=ODBC+Driver+17+for+SQL+Server")

connection = engine.connect()
result = connection.execute("select version_num from alembic_version")
for row in result:
    print("Version:", row['version_num'])
connection.close()
Phani
  • 61
  • 1
  • 5
  • Have you tried using the very same login credentials and attempted to log into Azure SQL with them from another GUI? – Evaldas Buinauskas Jan 04 '19 at 12:35
  • It looks like you have too many bare `@` signs in your connection string. You may need to to some URL encoding to avoid SQLAlchemy getting confused. See [this question](https://stackoverflow.com/q/1423804/2144390) for more information. – Gord Thompson Jan 04 '19 at 12:38
  • The same credentials to SQL Azure work in SQL Management studio. Also, I wrote a simple program that uses sqlalchemy to connect to DB using the same credentials & that works too... I have updated the question with that code sample as well... – Phani Jan 04 '19 at 13:42
  • @GordThompson - I will check if this is related to bare @ signs & if url encoding solves the issue. However, what is puzzling is that the first task in the dag gets executed. I would expect that, if this is an issue with connection string, it would have issues in creating/updating database records related to execution of the first task. – Phani Jan 04 '19 at 13:54
  • @Phani I am running into some issues here: When I `airflow initdb`, I get this-> `A table can only have one timestamp column. Because table 'task_reschedule' already has one, the column 'start_date' cannot be added.` Did you run into this problem? If so how did you resolve this. – Javiar Sandra Feb 15 '19 at 02:31
  • @JaviarSandra - no I did not run into the issue that you have mentioned. One option is to try airflow resetdb & then try to initialize again with airflow initdb. Do you see this issue if you point the sql_alchemy_conn to a brand new database? – Phani Feb 19 '19 at 05:57
  • @JaviarSandra , Phani I am facing the same issue. Were you able to resolve it? – R.S.K Feb 25 '19 at 11:13
  • @R.S.K I did not face the issue that was encountered with airflow initdb. I am using Airflow 1.10.1 & Azure SQL as the backend. I am assuming SLUGIFY_USES_TEXT_UNIDECODE=yes is already done. – Phani Feb 26 '19 at 12:05
  • @Phani thanks, I am trying this version out - I have had 1.10.4 and that gave me index creation "ti_pool" error on airflow initdb – khanna Sep 06 '19 at 09:52

1 Answers1

3

The issue was resolved after Pooling = True was set in odbcinst.ini

[ODBC] Pooling = Yes

Phani
  • 61
  • 1
  • 5