My problem: I am trying to retrieve data from Google Ads. Sometimes the API call hangs indefinitely.
My idea: Create a separate process that monitors if the process doing the API call has returned data or has reached the timeout limit.
What I have tried:
- I looked into doing Threading. This doesn't work because I cannot terminate a thread like I can a process. I have read that I can use flags to make it stop itself but that won't work because the API call hangs and there is no further python code executed.
- I looked into multiprocessing. This works fine in isolation but you cannot spawn a new process from an Airflow task.
- I looked into using signals, as mentioned here. This also works well in isolation but the alarm signal is sent to the main process which is Airflow and not the process doing my task and my error catching only happens in my own task.
- I tried looking into how I can launch a task a new Python interpreter instance but couldn't find how. There is a way to make all tasks start with a new Python interpreter instance apparently, by using the execute_tasks_new_python_interpreter setting in the airflow.cfg file but when I switch that to true my task gets stuck at scheduling and never proceeds to start running.
- I tried using the PythonVirtualenvOperator operator but after installing virtualenv by the method stated in the documentation, Airflow says it can't find the package. But it's there because if I activate the virtual environment of Airflow itself, I can import it.
Can anyone help me?