Airflow version 1.10.0 doesn't recognize QUEUED
as a possible state for Dataflow. You can see this by following the execution path of the template operator:
- Here the underline
DataflowHook
is called to start the template
- The
start_template_dataflow()
method calls a "private" _start_template_dataflow()
method
- Inside
_start_template_dataflow()
there's a call to wait_for_done()
method in the _DataflowJob
class
- Finally, in the
wait_for_done()
method we can see that there's an if/else
block handling the expected job status where QUEUED is not considered
This is the same for other Airflow versions supported by Composer, i.e., 1.10.6, 1.10.9 and even the most recent 1.10.12.
As a workaround my suggestion is to use a monkey patch to handle the QUEUED state. For example, you can add the following code to your DAG file to replace at runtime the wait_for_done()
method in _DataflowJob
:
from airflow.contrib.hooks.gcp_dataflow_hook import _DataflowJob
import time
def wait_for_done(self):
while True:
if self._job and 'currentState' in self._job:
if 'JOB_STATE_DONE' == self._job['currentState']:
return True
elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \
'JOB_TYPE_STREAMING' == self._job['type']:
return True
elif 'JOB_STATE_FAILED' == self._job['currentState']:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
self._job['name']))
elif 'JOB_STATE_CANCELLED' == self._job['currentState']:
raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
self._job['name']))
elif 'JOB_STATE_RUNNING' == self._job['currentState']:
time.sleep(self._poll_sleep)
elif 'JOB_STATE_PENDING' == self._job['currentState']:
time.sleep(15)
elif 'JOB_STATE_QUEUED' == self._job['currentState']:
# Uncomment here the behavior desired
# time.sleep(15) # As if QUEUED was a PENDING state
# time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
# return True # As if QUEUED was a final state
else:
self.log.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
else:
time.sleep(15)
self._job = self._get_job()
_DataflowJob.wait_for_done = wait_for_done
The difference with the original code is the elif
statement where we are looking for the QUEUED state:
elif 'JOB_STATE_QUEUED' == self._job['currentState']:
# Uncomment here the behavior desired
# time.sleep(15) # As if QUEUED was a PENDING state
# time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
# return True # As if QUEUED was a final state
Notice that I left commented the three most natural way to handle this state, i.e., sleep (15 seconds or the poll_sleep time) to wait for the job to execute and complete, or simply return true and don't wait for the execution. You can uncomment the line that you want to execute, or you can even add your own logic here.