4

I'm trying to figure out how to best approach the below problem. Essentially I have an external API Service that I am sending requests to and getting results for.

POST = Send request and the response you get back is a URL which you can use for the GET requests to retrieve your results.

GET = Poll the returned URL from the POST request until you get a successful result.

What would be the best way to approach this in airflow? My idea is to essentially have 2 tasks running in parallel.

  1. One sending the POST requests and then saving the response URL to XCOM.
  2. The other would be continuously running in a while loop, reading from the XCOM store for new URL responses and getting responses. It would then delete from XCOM store once it has a retrieved a succesfuly result from that URL.

Do you think this is the correct way of going about it? Or possibly should i use the asyncio library in python?

Any help much appreciated

Thanks,

adan11
  • 647
  • 1
  • 7
  • 24

1 Answers1

3

You can achieve what you are describing using SimpleHttpOperator and HttpSensor from Airflow (no need to install any extra package).

Consider this example that uses http_default connection to http bin.

The task to perform POST request:

task_post_op = SimpleHttpOperator(
    task_id='post_op',
    # http_conn_id='your_conn_id',
    endpoint='post',
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    response_check=lambda response: response.json()['json']['priority'] == 5,
    response_filter=lambda response: 'get', # e.g  lambda response: json.loads(response.text)
    dag=dag, 
)

By providing response_filter you can manipulate the response result, which will be the value pushed to XCom. In your case, you should return the endpoint you want to poll in the next task.

response_filter: A function allowing you to manipulate the response text. e.g response_filter=lambda response: json.loads(response.text). The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. :type response_filter: A lambda or defined function.

Note that response_check param it's optional.

The task to perform GET requests:

Use the HttpSensor to poke until the response_check callable evaluates to true.

task_http_sensor_check = HttpSensor(
    task_id='http_sensor_check',
    # http_conn_id='your_conn_id',
    endpoint=task_post_op.output, 
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=5,
    dag=dag,
)

As endpoint param we are passing the XCom value pulled from previous task, using XComArg. Use poke_interval to define the time in seconds that the job should wait in between each tries.

Remember to create a Connection of your own defining the base URL, port, etc.

Let me know if that worked for you!

NicoE
  • 4,373
  • 3
  • 18
  • 33
  • Hi NicoE. Thanks a lot for your response. Would this solution be able to handle multiple POST/GET requests at the same time? ie: How would I handle these operators if i had 10 HTTPBin requests that i needed to send at the same time, and continuously 'poke' each of them HttpSensor? Also, would the SimpleHttpOperator work with HTTPS API calls? Thanks again, – adan11 Jun 16 '21 at 00:54
  • @adan11 In order to handle multiple of this request cycles in parallel simply iterate over a list and create as many tasks as you need dinamycally. Apply that pattern/approach within a `TaskGroup` to keep it the UI cleaner. Check [this answer](https://stackoverflow.com/a/66907844/10569220) for a detail on how to implement it. – NicoE Jun 16 '21 at 02:27
  • @adan11 `SimpleHttpOperator` uses Python `requests` module under the hood so there is no problem with HTTPS. Give it a try and if it works for you please do remember to mark the answer as [accepted](https://stackoverflow.com/help/accepted-answer) – NicoE Jun 16 '21 at 02:32
  • Thanks NicoE. The end goal would be to have 100,000 + API calls running asynchronously. Do you think if i am dynamically creating this with one task for each API call, it could get a bit messy? I was thinking of following an approach of using the requests library in one Task, to loop through my 100,000 API calls and save the endpoint to a list. Then in the same Task, once the POST API calls are complete, loop through the list and get the response from each endpoint GET request, and remove from list if succesfully found. Do you think this approach would also work? @NicoE – adan11 Jun 16 '21 at 04:47
  • Although I am not sure whether to have the saved responses in a list in the same task, or have them being sent to another task via Xcom. My reasoning to have it in the same task was to keep the Tasks atomic. – adan11 Jun 16 '21 at 04:50
  • If passed to another task via xcom, i would need to delete the xcom value for each successful endpoint result, otherwise the second task will continuously 'poll' previously successful results. – adan11 Jun 16 '21 at 05:15
  • I think it's a broader discussion and domain requirements must be put into consideration. In general my suggestion would be to, begin with a proof of concept and work from there trying to split the whole batch into manageble parts and trigger them consecutively. I don't think dynamic task generation in such a big numbers would be performant (even for the UI). Maybe you coud loop from a file and keep the cycle (POST-GET) into one single task. Watchout for timeouts and retries to avoid eternal lockups and such scenarios. – NicoE Jun 16 '21 at 16:03