1

Updating DAGs and their sequence in code (Airflow shows the update), but this is not showing in the Airflow GUI (tree/graph view). Have tried refreshing, restarting airflow, etc., as suggested by other posts and their solutions. For some reason it's not picking up 'extract_api_data_task', even though the following is stated: create_psql_schema_task >> create_psql_table_task >> extract_api_data_task >> insert_data_task. Not really sure what to do here. Any help would be appreciated. [1]

import os
import logging
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from psycopg2.extras import execute_values
from airflow import AirflowException

########################################################
#
#   DAG Settings
#
#########################################################

from airflow import DAG

dag_default_args = {
    'owner': 'Nigel',
    'start_date': datetime.now() - timedelta(days=2),
    'email': [],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'depends_on_past': False,
    'wait_for_downstream': False,
}

dag = DAG(
    dag_id='AT2_DAG',
    default_args=dag_default_args,
    schedule_interval="@daily",
    catchup=True,
    max_active_runs=1,
    concurrency=5
)


#########################################################
#
#   Custom Logic for Operator
#
#########################################################

def api_url():
    return f"http://data.insideairbnb.com/australia/nsw/sydney/2021-04-10/data/listings.csv.gz" 

def extract_api_data_func():
    url = api_url()
    df = pd.read_csv(url, compression='gzip', header=0, sep=',', quotechar='"')
    df_dict = df.to_dict()
    return df_dict

def insert_data_func(**kwargs):
         
    ps_pg_hook = PostgresHook(postgres_conn_id="postgres")
    conn_ps = ps_pg_hook.get_conn()

    ti = kwargs['ti']
    insert_df_dict = ti.xcom_pull(task_ids=f'extract_api_data_task_id')
    insert_df = pd.DataFrame.from_dict(insert_df_dict)

    if len(insert_df) > 0:
        col_names = ['listing_url','scrape_id','last_scraped','name','description','neighborhood_overview','picture_url','host_id','host_url','host_name','host_since','host_location','host_about','host_response_time','host_response_rate','host_acceptance_rate','host_is_superhost','host_thumbnail_url','host_picture_url','host_neighbourhood','host_listings_count','host_total_listings_count','host_verifications','host_has_profile_pic','host_identity_verified','neighbourhood','neighbourhood_cleansed','neighbourhood_group_cleansed','latitude','longitude','property_type','room_type','accommodates','bathrooms','bathrooms_text','bedrooms','beds','amenities','price','minimum_nights','maximum_nights','minimum_minimum_nights','maximum_minimum_nights','minimum_maximum_nights','maximum_maximum_nights','minimum_nights_avg_ntm','maximum_nights_avg_ntm','calendar_updated','has_availability','availability_30','availability_60','availability_90','availability_365','calendar_last_scraped','number_of_reviews','number_of_reviews_ltm','number_of_reviews_l30d','first_review','last_review','review_scores_rating','review_scores_accuracy','review_scores_cleanliness','review_scores_checkin','review_scores_communication','review_scores_location','review_scores_value','license','instant_bookable','calculated_host_listings_count','calculated_host_listings_count_entire_homes','calculated_host_listings_count_private_rooms','calculated_host_listings_count_shared_rooms','reviews_per_month']

        values = insert_df[col_names].to_dict('split')
        values = values['data']
        logging.info(values)

        insert_sql = """
                    INSERT INTO bde_at2.airbnb (listing_url,scrape_id, last_scraped, name, description, neighborhood_overview, picture_url, host_id, host_url, host_name, host_since, host_location, host_about, host_response_time, host_response_rate, host_acceptance_rate, host_is_superhost, host_thumbnail_url, host_picture_url, host_neighbourhood, host_listings_count, host_total_listings_count, host_verifications, host_has_profile_pic, host_identity_verified, neighbourhood, neighbourhood_cleansed, neighbourhood_group_cleansed, latitude, longitude, property_type, room_type, accommodates, bathrooms, bathrooms_text, bedrooms, beds, amenities, price, minimum_nights, maximum_nights, minimum_minimum_nights, maximum_minimum_nights, minimum_maximum_nights, maximum_maximum_nights, minimum_nights_avg_ntm, maximum_nights_avg_ntm, calendar_updated, has_availability, availability_30, availability_60, availability_90, availability_365, calendar_last_scraped, number_of_reviews, number_of_reviews_ltm, number_of_reviews_l30d, first_review, last_review, review_scores_rating, review_scores_accuracy, review_scores_cleanliness, review_scores_checkin, review_scores_communication, review_scores_location, review_scores_value, license, instant_bookable, calculated_host_listings_count, calculated_host_listings_count_entire_homes, calculated_host_listings_count_private_rooms, calculated_host_listings_count_shared_rooms, reviews_per_month)
                    VALUES %s
                    """

        result = execute_values(conn_ps.cursor(), insert_sql, values, page_size=len(insert_df))
        conn_ps.commit()
    else:
        None

    return None 

#########################################################
#
#   DAG Operator Setup
#
#########################################################

from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

extract_api_data_task = PythonOperator(
    task_id='extract_api_data_task_id',
    python_callable=extract_api_data_func,
    op_kwargs={},
    provide_context=True,
    dag=dag
)

create_psql_schema_task = PostgresOperator(
    task_id="create_psql_schema_task_id",
    postgres_conn_id="postgres",
    sql="""
        CREATE SCHEMA IF NOT EXISTS bde_at2;
    """,
    dag=dag
)

create_psql_table_task = PostgresOperator(
    task_id="create_psql_table_task_id",
    postgres_conn_id="postgres",
    sql="""
        CREATE TABLE IF NOT EXISTS bde_at2.airbnb (
            id                                              TEXT,
            listing_url                                     TEXT,
            scrape_id                                       TEXT,
            last_scraped                                    TEXT,
            name                                            TEXT,
            description                                     TEXT,
            neighborhood_overview                           TEXT,
            picture_url                                     TEXT,
            host_id                                         TEXT,
            host_url                                        TEXT,
            host_name                                       TEXT,
            host_since                                      TEXT,
            host_location                                   TEXT,
            host_about                                      TEXT,
            host_response_time                              TEXT,
            host_response_rate                              TEXT,
            host_acceptance_rate                            TEXT,
            host_is_superhost                               TEXT,
            host_thumbnail_url                              TEXT,
            host_picture_url                                TEXT,
            host_neighbourhood                              TEXT,
            host_listings_count                             TEXT,
            host_total_listings_count                       TEXT,
            host_verifications                              TEXT,
            host_has_profile_pic                            TEXT,
            host_identity_verified                          TEXT,
            neighbourhood                                   TEXT,
            neighbourhood_cleansed                          TEXT,
            neighbourhood_group_cleansed                    TEXT,
            latitude                                        TEXT,
            longitude                                       TEXT,
            property_type                                   TEXT,
            room_type                                       TEXT,
            accommodates                                    TEXT,
            bathrooms                                       TEXT,
            bathrooms_text                                  TEXT,
            bedrooms                                        TEXT,
            beds                                            TEXT,
            amenities                                       TEXT,
            price                                           TEXT,
            minimum_nights                                  TEXT,
            maximum_nights                                  TEXT,
            minimum_minimum_nights                          TEXT,
            maximum_minimum_nights                          TEXT,
            minimum_maximum_nights                          TEXT,
            maximum_maximum_nights                          TEXT,
            minimum_nights_avg_ntm                          TEXT,
            maximum_nights_avg_ntm                          TEXT,
            calendar_updated                                TEXT,
            has_availability                                TEXT,
            availability_30                                 TEXT,
            availability_60                                 TEXT,
            availability_90                                 TEXT,
            availability_365                                TEXT,
            calendar_last_scraped                           TEXT,
            number_of_reviews                               TEXT,
            number_of_reviews_ltm                           TEXT,
            number_of_reviews_l30d                          TEXT,
            first_review                                    TEXT,
            last_review                                     TEXT,
            review_scores_rating                            TEXT,
            review_scores_accuracy                          TEXT,
            review_scores_cleanliness                       TEXT,
            review_scores_checkin                           TEXT,
            review_scores_communication                     TEXT,
            review_scores_location                          TEXT,
            review_scores_value                             TEXT,
            license                                         TEXT,
            instant_bookable                                TEXT,
            calculated_host_listings_count                  TEXT,
            calculated_host_listings_count_entire_homes     TEXT,
            calculated_host_listings_count_private_rooms    TEXT,
            calculated_host_listings_count_shared_rooms     TEXT,
            reviews_per_month                               TEXT

            );
    """,
    dag=dag
)

insert_data_task = PythonOperator(
    task_id='insert_data_task_id',
    python_callable=insert_data_func,
    op_kwargs={},
    provide_context=True,
    dag=dag
)

create_psql_schema_task >> create_psql_table_task >> extract_api_data_task >> insert_data_task```


  [1]: https://i.stack.imgur.com/bcYVv.png

Log:

Reading local file: /opt/airflow/logs/AT2_DAG/insert_data_task_id/2021-05-12T00:00:00+00:00/1.log
[2021-05-13 06:05:52,878] {taskinstance.py:845} INFO - Dependencies not met for <TaskInstance: AT2_DAG.insert_data_task_id 2021-05-12T00:00:00+00:00 [queued]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'extract_api_data_task_id'}
[2021-05-13 06:05:52,880] {local_task_job.py:93} INFO - Task is not able to be run
  • 2
    This code generated the 4 expected tasks for me without any modifications. Is there any useful information from the Airflow webserver logs? Was the missing task added to the DAG code at the same time as the others? Side note, I would highly recommend to not set a dynamic start_date for a DAG. Check out this SO post [here](https://stackoverflow.com/questions/41134524/why-is-it-recommended-against-using-a-dynamic-start-date-in-airflow) – Josh Fell May 13 '21 at 01:02
  • Thanks Josh—will do. And about the file it's now working. I was so baffled I copied and pasted the code into another file and uploaded it as a DAG and it worked. – Nigel Visser May 13 '21 at 06:05
  • I've posted the error log. The upstream task that failed had no log. – Nigel Visser May 13 '21 at 06:10

0 Answers0