1

I am fairly new to airflow and I am currently trying to pass information between my SimpleHttpOperators.

This is where the data is retrieved:

request_city_information = SimpleHttpOperator(
http_conn_id='overpass',
task_id='basic_city_information',
headers={"Content-Type": "application/x-www-form-urlencoded"},
method='POST',
data=f'[out:json]; node[name={name_city}][capital]; out center;',
response_filter=lambda response: response.json()['elements'][0],
dag=dag,)

And then I want to use the response from this in the following operator:

request_city_attractions = SimpleHttpOperator(
http_conn_id='overpass',
task_id='city_attractions',
headers={"Content-Type": "application/x-www-form-urlencoded"},
method='POST',
data=f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius},"
     f"{request_city_information.xcom_pull(context='ti')['lat']}"
     f",10););out body;>;out skel qt;",
dag=dag)

As you can see I tried to access the response via request_city_information.xcom_pull(context='ti'). However, my context seems to be wrong here.

As my data is already written into the XComs I take it that I don't need XCOM_push='True', as suggested here.

There seem to be changes to XCom since airflow 2.x as many of the suggested solutions I found do not work for me.
I believe there is a major gap in my thought process, I just don't know where.

I would appreciate any references to examples or help! Thanks in advance

HenrikB
  • 121
  • 1
  • 6

1 Answers1

0

I have now solved it with a completely different approach, if you guys know how the first one works I would be happy for an explanation on that.

Here is my solution:

with DAG(
    'city_info',
    default_args=dafault_args,
    description='xcom test',
    schedule_interval=None,
) as dag:
#TODO: Tasks with conn_id
def get_city_information(**kwargs):
    payload = f'[out:json]; node[name={name_city}][capital]; out center;'
    #TODO: Request als Connection
    r = requests.post('https://overpass-api.de/api/interpreter', data=payload)
    ti = kwargs['ti']
    ti.xcom_push('basic_city_information', r.json())


get_city_information_task = PythonOperator(
    task_id='get_city_information_task',
    python_callable=get_city_information
)


def get_city_attractions(**kwargs):
    ti = kwargs['ti']
    city_information = ti.xcom_pull(task_ids='get_city_information_task', key='basic_city_information')
    payload = f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius}" \
              f",{city_information['elements'][0]['lat']},{city_information['elements'][0]['lon']}" \
              f"););out body;>;out skel qt;"
    r = requests.post('https://overpass-api.de/api/interpreter', data=payload)
    #TODO: Json as Object
    ti.xcom_push('city_attractions', r.json())


get_city_attractions_task = PythonOperator(
    task_id='get_city_attractions_task',
    python_callable=get_city_attractions
)

get_city_information_task >> get_city_attractions_task
HenrikB
  • 121
  • 1
  • 6