12

I am writing a Airflow DAG and having some problems with a function. I am trying to debug by printing data to stdout and using the logging library.

My example DAG is:

    from datetime import timedelta
    
    import airflow
    import logging
    
    from airflow.models import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.contrib.hooks.datadog_hook import DatadogHook
    
    def datadog_event(title, text, dag_id, task_id):
        hook = DatadogHook()
        tags = [
            f'dag:{dag_id}',
            f'task:{task_id}',
        ]
    
        hook.post_event(title, text, tags)
    
    def datadog_event_success(context):
        dag_id = context['task_instance'].dag_id
        task_id = context['task_instance'].task_id
        text = f'Airflow DAG failure for {dag_id}\n\nDAG: {dag_id}\nTasks: {task_id}'
        title = f'Airflow DAG success for {dag_id}'
    
        logging.info(title)
        logging.info(text)
        logging.info(dag_id)
        logging.info(task_id)
    
        datadog_event(title, text, dag_id, task_id)
    
    args = {
        'owner': 'airflow',
        'start_date': airflow.utils.dates.days_ago(2),
    }
    
    dag = DAG(
        dag_id='example_callback',
        default_args=args,
        schedule_interval='*/5 * * * *',
        dagrun_timeout=timedelta(minutes=60),
        on_success_callback=datadog_event_success,
    )
    
    my_task = DummyOperator(
        task_id='run_this_last',
        dag=dag,
    )

During a run I get an error:

airflow[9490]: Process DagFileProcessor4195-Process:
airflow[9490]: Traceback (most recent call last):
airflow[9490]:   File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
airflow[9490]:     self.run()
airflow[9490]:   File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
airflow[9490]:     self._target(*self._args, **self._kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 148, in _run_file_processor
airflow[9490]:     result = scheduler_job.process_file(file_path, pickle_dags)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1542, in process_file
airflow[9490]:     self._process_dags(dagbag, dags, ti_keys_to_schedule)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1239, in _process_dags
airflow[9490]:     self._process_task_instances(dag, tis_out)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 732, in _process_task_instances
airflow[9490]:     run.update_state(session=session)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/models/dagrun.py", line 318, in update_state
airflow[9490]:     dag.handle_callback(self, success=True, reason='success', session=session)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
airflow[9490]:     return func(*args, **kwargs)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/models/dag.py", line 620, in handle_callback
airflow[9490]:     callback(context)
airflow[9490]:   File "/home/airflow/analytics/etl_v2/airflow_data/dags/example_bash_operator_andy.py", line 68, in datadog_event_success
airflow[9490]:     datadog_event(title, text, dag_id, task_id)
airflow[9490]:   File "/home/airflow/analytics/etl_v2/airflow_data/dags/example_bash_operator_andy.py", line 45, in datadog_event
airflow[9490]:     hook.post_event(title, text, tags)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/contrib/hooks/datadog_hook.py", line 157, in post_event
airflow[9490]:     self.validate_response(response)
airflow[9490]:   File "/home/airflow/virtualenv/lib/python3.6/site-packages/airflow/contrib/hooks/datadog_hook.py", line 58, in validate_response
airflow[9490]:     if response['status'] != 'ok':
airflow[9490]: KeyError: 'status'

But none of my logged into is before or after the error in the scheduler, webserver, worker, or task logs.

I have tested the datadog_event call on my Airflow worker by manually importing the code and it logs properly when I run it that way:

airflow@airflow-worker-0:~/analytics$ /home/airflow/virtualenv/bin/python -i /home/airflow/analytics/etl_v2/airflow_data/dags/example_bash_operator_andy.py
[2019-08-07 20:48:01,890] {settings.py:213} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=29941
[2019-08-07 20:48:02,227] {__init__.py:51} INFO - Using executor DaskExecutor

>>> datadog_event('My title', 'My task', 'example_bash_operator_andy', 'run_this_last')
[2019-08-07 20:51:17,542] {datadog_hook.py:54} INFO - Setting up api keys for Datadog
[2019-08-07 20:51:17,544] {example_bash_operator_andy.py:38} INFO - My title
[2019-08-07 20:51:17,544] {example_bash_operator_andy.py:39} INFO - My task
[2019-08-07 20:51:17,544] {example_bash_operator_andy.py:40} INFO - example_bash_operator_andy
[2019-08-07 20:51:17,545] {example_bash_operator_andy.py:41} INFO - run_this_last
[2019-08-07 20:51:17,658] {api_client.py:139} INFO - 202 POST https://api.datadoghq.com/api/v1/events (113.2174ms)

My airflow.cfg is posted at https://gist.github.com/andyshinn/d743ddc61956ed7440c500fca962ce92 and I am using Airflow 1.10.4.

How can I output logging or messages from the DAG itself to better debug what might be happening?

iamtodor
  • 734
  • 8
  • 21
Andy Shinn
  • 26,561
  • 8
  • 75
  • 93
  • 1
    what is your logging_level on airflow.cfg? – Chengzhi Aug 08 '19 at 02:34
  • `INFO` and I can post the `airflow.cfg` if it is helpful. It is default except for settings to use Dask as an executor and setting the proper connections for my environment. Also, I have finally fixed my main issue. But my question is still valid for future debugging and logging from a DAG. – Andy Shinn Aug 08 '19 at 02:42
  • Can you also add which version of Airflow you are using? Is it just on_success_callback or logging in other places won't work. Two things I can think of you may want to check, 1. have you set up the logging_config_class in the config https://github.com/apache/airflow/blob/master/UPDATING.md#logging-update. 2. Do you have remote_logging setup, if yes, are you getting any data there? – Chengzhi Aug 08 '19 at 03:29
  • I am using Airflow 1.10.4. I have edited the question to add a link to the `airflow.cfg`. I haven't changed any logging. It is default (logs to files, no remote logging or custom class). – Andy Shinn Aug 08 '19 at 18:58
  • one thing I think you can try is to setup the logging_config_class, it is not something out of box for now. – Chengzhi Aug 08 '19 at 21:21
  • I'm also seeing this behavior. Did you reach any resolution? I think it's because we're both using the on_success_callback/on_failure_callback param on the DAG model and not passing it in (via default_args or directly) as params to the TASK instances. However, I don't want to pass to the TASK instance, because then the callback is called PER TASK and I want it to be PER DAG. However, it looks like DAG level logging just goes to the void? – fizloki Jan 29 '20 at 19:34
  • No, I still haven't figured this out. Direct `print()` and the logger in a DAG seem to get swallowed up somewhere. – Andy Shinn Jan 29 '20 at 20:51
  • This issue says 1.10.7+ but it seems like it is very relevant and maybe the same issue: https://github.com/apache/airflow/issues/8484. I'm now on 1.10.9 and the issue persists. – Andy Shinn May 22 '20 at 01:18

1 Answers1

6

DAG-level callbacks (on_success, on_failure) occur in the main scheduler loop. See this open issue to move execution of these functions out of the scheduler thread. Exceptions raised in your callback function will appear in the scheduler logs. However, annoyingly, print and logging do NOT appear to make it into the scheduler logs.

For debugging purposes, I typically just raise as an exception the info I'm trying to log so it will appear in the scheduler logs.

Alternatively, you could move the callback to the TASK level. This means moving it into your default_args, like so:

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'on_success_callback': datadog_event_success
}

dag = DAG(
    dag_id='example_callback',
    default_args=args,
    schedule_interval='*/5 * * * *',
    dagrun_timeout=timedelta(minutes=60)
)

Your callback's logging will now appear in the task logs (not the scheduler logs). However, this means the callback will be called for every qualifying task, not just once for the DAG.

fizloki
  • 151
  • 3
  • 5
  • Maybe this has changed in a recent Airflow version? I'm not seeing this work. I have non-callback tasks that use the regular logging library and the logs are still nowhere to be found. I'm logging a static word `ETLPipeline` and cannot find the word appearing in any log file on the system or stdout from the processes. – Andy Shinn Jan 29 '20 at 22:56
  • I had a typo. Let me know if my reworded answer helps. – fizloki Jan 30 '20 at 02:33
  • 1
    Yes I confirm this works, when the function is run at task level the logs do appear in the tasks log, whereas DAG level callbacks are run at the scheduler level and hence so are their logs – Xavier Bourret Sicotte Nov 15 '21 at 18:36