Updating DAGs and their sequence in code (Airflow shows the update), but this is not showing in the Airflow GUI (tree/graph view). Have tried refreshing, restarting airflow, etc., as suggested by other posts and their solutions. For some reason it's not picking up 'extract_api_data_task', even though the following is stated: create_psql_schema_task >> create_psql_table_task >> extract_api_data_task >> insert_data_task. Not really sure what to do here. Any help would be appreciated. [1]
import os
import logging
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from psycopg2.extras import execute_values
from airflow import AirflowException
########################################################
#
# DAG Settings
#
#########################################################
from airflow import DAG
dag_default_args = {
'owner': 'Nigel',
'start_date': datetime.now() - timedelta(days=2),
'email': [],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'depends_on_past': False,
'wait_for_downstream': False,
}
dag = DAG(
dag_id='AT2_DAG',
default_args=dag_default_args,
schedule_interval="@daily",
catchup=True,
max_active_runs=1,
concurrency=5
)
#########################################################
#
# Custom Logic for Operator
#
#########################################################
def api_url():
return f"http://data.insideairbnb.com/australia/nsw/sydney/2021-04-10/data/listings.csv.gz"
def extract_api_data_func():
url = api_url()
df = pd.read_csv(url, compression='gzip', header=0, sep=',', quotechar='"')
df_dict = df.to_dict()
return df_dict
def insert_data_func(**kwargs):
ps_pg_hook = PostgresHook(postgres_conn_id="postgres")
conn_ps = ps_pg_hook.get_conn()
ti = kwargs['ti']
insert_df_dict = ti.xcom_pull(task_ids=f'extract_api_data_task_id')
insert_df = pd.DataFrame.from_dict(insert_df_dict)
if len(insert_df) > 0:
col_names = ['listing_url','scrape_id','last_scraped','name','description','neighborhood_overview','picture_url','host_id','host_url','host_name','host_since','host_location','host_about','host_response_time','host_response_rate','host_acceptance_rate','host_is_superhost','host_thumbnail_url','host_picture_url','host_neighbourhood','host_listings_count','host_total_listings_count','host_verifications','host_has_profile_pic','host_identity_verified','neighbourhood','neighbourhood_cleansed','neighbourhood_group_cleansed','latitude','longitude','property_type','room_type','accommodates','bathrooms','bathrooms_text','bedrooms','beds','amenities','price','minimum_nights','maximum_nights','minimum_minimum_nights','maximum_minimum_nights','minimum_maximum_nights','maximum_maximum_nights','minimum_nights_avg_ntm','maximum_nights_avg_ntm','calendar_updated','has_availability','availability_30','availability_60','availability_90','availability_365','calendar_last_scraped','number_of_reviews','number_of_reviews_ltm','number_of_reviews_l30d','first_review','last_review','review_scores_rating','review_scores_accuracy','review_scores_cleanliness','review_scores_checkin','review_scores_communication','review_scores_location','review_scores_value','license','instant_bookable','calculated_host_listings_count','calculated_host_listings_count_entire_homes','calculated_host_listings_count_private_rooms','calculated_host_listings_count_shared_rooms','reviews_per_month']
values = insert_df[col_names].to_dict('split')
values = values['data']
logging.info(values)
insert_sql = """
INSERT INTO bde_at2.airbnb (listing_url,scrape_id, last_scraped, name, description, neighborhood_overview, picture_url, host_id, host_url, host_name, host_since, host_location, host_about, host_response_time, host_response_rate, host_acceptance_rate, host_is_superhost, host_thumbnail_url, host_picture_url, host_neighbourhood, host_listings_count, host_total_listings_count, host_verifications, host_has_profile_pic, host_identity_verified, neighbourhood, neighbourhood_cleansed, neighbourhood_group_cleansed, latitude, longitude, property_type, room_type, accommodates, bathrooms, bathrooms_text, bedrooms, beds, amenities, price, minimum_nights, maximum_nights, minimum_minimum_nights, maximum_minimum_nights, minimum_maximum_nights, maximum_maximum_nights, minimum_nights_avg_ntm, maximum_nights_avg_ntm, calendar_updated, has_availability, availability_30, availability_60, availability_90, availability_365, calendar_last_scraped, number_of_reviews, number_of_reviews_ltm, number_of_reviews_l30d, first_review, last_review, review_scores_rating, review_scores_accuracy, review_scores_cleanliness, review_scores_checkin, review_scores_communication, review_scores_location, review_scores_value, license, instant_bookable, calculated_host_listings_count, calculated_host_listings_count_entire_homes, calculated_host_listings_count_private_rooms, calculated_host_listings_count_shared_rooms, reviews_per_month)
VALUES %s
"""
result = execute_values(conn_ps.cursor(), insert_sql, values, page_size=len(insert_df))
conn_ps.commit()
else:
None
return None
#########################################################
#
# DAG Operator Setup
#
#########################################################
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
extract_api_data_task = PythonOperator(
task_id='extract_api_data_task_id',
python_callable=extract_api_data_func,
op_kwargs={},
provide_context=True,
dag=dag
)
create_psql_schema_task = PostgresOperator(
task_id="create_psql_schema_task_id",
postgres_conn_id="postgres",
sql="""
CREATE SCHEMA IF NOT EXISTS bde_at2;
""",
dag=dag
)
create_psql_table_task = PostgresOperator(
task_id="create_psql_table_task_id",
postgres_conn_id="postgres",
sql="""
CREATE TABLE IF NOT EXISTS bde_at2.airbnb (
id TEXT,
listing_url TEXT,
scrape_id TEXT,
last_scraped TEXT,
name TEXT,
description TEXT,
neighborhood_overview TEXT,
picture_url TEXT,
host_id TEXT,
host_url TEXT,
host_name TEXT,
host_since TEXT,
host_location TEXT,
host_about TEXT,
host_response_time TEXT,
host_response_rate TEXT,
host_acceptance_rate TEXT,
host_is_superhost TEXT,
host_thumbnail_url TEXT,
host_picture_url TEXT,
host_neighbourhood TEXT,
host_listings_count TEXT,
host_total_listings_count TEXT,
host_verifications TEXT,
host_has_profile_pic TEXT,
host_identity_verified TEXT,
neighbourhood TEXT,
neighbourhood_cleansed TEXT,
neighbourhood_group_cleansed TEXT,
latitude TEXT,
longitude TEXT,
property_type TEXT,
room_type TEXT,
accommodates TEXT,
bathrooms TEXT,
bathrooms_text TEXT,
bedrooms TEXT,
beds TEXT,
amenities TEXT,
price TEXT,
minimum_nights TEXT,
maximum_nights TEXT,
minimum_minimum_nights TEXT,
maximum_minimum_nights TEXT,
minimum_maximum_nights TEXT,
maximum_maximum_nights TEXT,
minimum_nights_avg_ntm TEXT,
maximum_nights_avg_ntm TEXT,
calendar_updated TEXT,
has_availability TEXT,
availability_30 TEXT,
availability_60 TEXT,
availability_90 TEXT,
availability_365 TEXT,
calendar_last_scraped TEXT,
number_of_reviews TEXT,
number_of_reviews_ltm TEXT,
number_of_reviews_l30d TEXT,
first_review TEXT,
last_review TEXT,
review_scores_rating TEXT,
review_scores_accuracy TEXT,
review_scores_cleanliness TEXT,
review_scores_checkin TEXT,
review_scores_communication TEXT,
review_scores_location TEXT,
review_scores_value TEXT,
license TEXT,
instant_bookable TEXT,
calculated_host_listings_count TEXT,
calculated_host_listings_count_entire_homes TEXT,
calculated_host_listings_count_private_rooms TEXT,
calculated_host_listings_count_shared_rooms TEXT,
reviews_per_month TEXT
);
""",
dag=dag
)
insert_data_task = PythonOperator(
task_id='insert_data_task_id',
python_callable=insert_data_func,
op_kwargs={},
provide_context=True,
dag=dag
)
create_psql_schema_task >> create_psql_table_task >> extract_api_data_task >> insert_data_task```
[1]: https://i.stack.imgur.com/bcYVv.png
Log:
Reading local file: /opt/airflow/logs/AT2_DAG/insert_data_task_id/2021-05-12T00:00:00+00:00/1.log
[2021-05-13 06:05:52,878] {taskinstance.py:845} INFO - Dependencies not met for <TaskInstance: AT2_DAG.insert_data_task_id 2021-05-12T00:00:00+00:00 [queued]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'extract_api_data_task_id'}
[2021-05-13 06:05:52,880] {local_task_job.py:93} INFO - Task is not able to be run