4

I recently ran into this nasty error where Airflow's apply_defaults decorator is throwing following stack-trace (my **kwargs do contain job_flow_id)

File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/mysql_import_dag.py", line 23, in <module>
    sync_dag_builder.build_sync_dag()
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/builders/sync_dag_builders/emr_sync_dag_builder.py", line 26, in build_sync_dag
    create_emr_task, terminate_emr_task = self._create_job_flow_tasks()
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/builders/sync_dag_builders/emr_sync_dag_builder.py", line 44, in _create_job_flow_tasks
    task_id=GlobalConstants.EMR_TERMINATE_STEP)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/aws/operators/emr_terminate_ancestor_job_flows_operator.py", line 31, in __init__
    EmrTerminateJobFlowOperator.__init__(self, *args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/contrib/operators/emr_terminate_job_flow_operator.py", line 44, in __init__
    super(EmrTerminateJobFlowOperator, self).__init__(*args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 94, in wrapper
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['job_flow_id'] is required

The disturbing parts are

Looking into decorators.py, I felt that sig_cache might be messing-up some things. In fact, from the commit that introduced it, I cannot figure out how function-signature caching is working at all (at least it isn't working in this way)?


I've tried deleting all __pycache__ and restarting scheduler, webserver without luck (I'm running them in separate Linux screens)

  • What could be causing the error?
  • How does sig_cache work and does it need to be cleared forcefully under any circumstances? If so, how to clear it?

Environment

  • Python 3.6.6
  • Airflow 1.10.2
  • LocalExecutor
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • Here's the [link](https://lists.apache.org/thread.html/9f8dd58e612dedbb03a1d5a2675045078dedf17e17bc4a2d3cc3d961@%3Cdev.airflow.apache.org%3E) to my query on `Airflow`'s [dev-mailing-list](https://lists.apache.org/list.html?dev@airflow.apache.org) – y2k-shubham Feb 05 '19 at 09:01
  • 2
    I was using **multiple-inheritance** in my `emr_terminate_ancestor_job_flows_operator` (was feeling adventurous [like `Qubole` guys](https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/operators/qubole_check_operator.py#L27)) and was invoking *individual* `__init__` methods of both `operator`s as shown [here](https://www.journaldev.com/14623/python-multiple-inheritance). The `decorator`s of the *inner* `__init__` calls were creating problems. Finally I had to fall-back on using individual `operator`s (instead of *fusing* them) – y2k-shubham Feb 06 '19 at 17:58

0 Answers0