I have an Airflow pipeline that produces 12 staging tables from Google Cloud Storage files and then performs some downstream processing. I have a DummyOperator to collect these tasks before proceeding to the next stages.
I'm getting an error on the wait_stg_load operator saying it's in an upstream_failed state. However all of the upstream tasks are marked as success. The DAG itself is now marked as failed. If I clear the status on wait_stg_load
, everything proceeds fine. Any ideas on what I'm doing wrong?
I am using Google Cloud Composer which is version Airflow v 1.9 on Python 3
with DAG('load_data',
default_args=default_args,
schedule_interval='0 9 * * *',
concurrency=3
) as dag:
t2 = DummyOperator(
task_id='wait_stg_load',
dag=dag
)
for t in tables:
t1 = GoogleCloudStorageToBigQueryOperator(
task_id='load_stg_{}'.format(t.replace('.','_')),
bucket='my-bucket',
source_objects=['data/{}.json'.format(t)],
destination_project_dataset_table='{}.stg_{}'.format(DATASET_NAME, t.replace('.','_')),
schema_object='data/schemas/{}.json'.format(t),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag
)
t1 >> t2
Update 1
I believe this is a concurrency issue within Airflow. I noticed that the task does indeed fail at some point, but later runs anyway. It gets marked complete, yet the DummyOperator doesn't see that.
[2019-02-14 09:00:14,734] {cli.py:374} INFO - Running on host airflow-worker
[2019-02-14 09:00:16,686] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [queued]>
[2019-02-14 09:00:16,694] {models.py:1189} INFO - Dependencies not met for <TaskInstance: dag.task 2019-02-13 09:00:00 [queued]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (3) for this task's DAG 'dag' has been reached.
[2019-02-14 09:00:16,694] {models.py:1389} WARNING -
-------------------------------------------------------------------------------
FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 1. State set to NONE
-------------------------------------------------------------------------------
[2019-02-14 09:00:16,694] {models.py:1392} INFO - Queuing into pool None
[2019-02-14 09:00:26,619] {cli.py:374} INFO - Running on host airflow-worker
[2019-02-14 09:00:28,563] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [failed]>
[2019-02-14 09:00:28,570] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [failed]>
[2019-02-14 09:00:28,570] {models.py:1406} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of
-------------------------------------------------------------------------------
[2019-02-14 09:00:28,607] {models.py:1427} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): task> on 2019-02-13 09:00:00