Say I have the following DAG (stuff omitted for clarity)
#dag.py
from airflow.operators.python import PytonOperator
def main():
print("Task 1")
#some code
print("Task 2")
#some more code
print("Done")
return 0
t1 = PythonOperator(python_callable=main)
t1
Say the program fails at #some more code
due to e.g RAM-issues I just get an error in my log e.g
[2021-05-25 12:49:54,211] {process_utils.py:137} INFO - Output:
[2021-05-25 12:52:44,605] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 493, in execute
super().execute(context=serializable_context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python.py", line 531, in execute_callable
string_args_filename,
File "/usr/local/lib/python3.6/site-packages/airflow/utils/process_utils.py", line 145, in execute_in_subprocess
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venv2wbjnabi/bin/python', '/tmp/venv2wbjnabi/script.py', '/tmp/venv2wbjnabi/script.in', '/tmp/venv2wbjnabi/script.out', '/tmp/venv2wbjnabi/string_args.txt']' died with <Signals.SIGKILL: 9>.
[2021-05-25 13:00:55,733] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=test_dag, task_id=clean_data, execution_date=20210525T105621, start_date=20210525T105732, end_date=20210525T110055
[2021-05-25 13:00:56,555] {local_task_job.py:146} INFO - Task exited with return code 1
but none of the print-statements are printed thus I don't know where the program failed (I know it now due to debugging).
I assume, due to that, that Airflow don't flush before the task is marked as "success". Is there a way to make Airflow flush on runtime/print on runtime?