0

Title:

How to Perform an HTTP Response in Azure Durable Functions?

Content:

I would like to do an HTTP response with Azure's durable functions and display the results of the activity function on the site.

Environment and Assumptions

  • Azure Free Tier
  • Python Programming Model v2
  • VSCode

I am considering executing the activity function of the Durable Functions in the above environment and having the client function receive the result and display it on the site via HTTP response.

Workflow

  1. client function receives HTTP request and its parameters
  2. the orchestrator function is invoked, and the parameters invoke the activity function
  3. The activity function processes the request and returns the result to the orchestrator function
  4. The orchestrator function returns the result to the client function
  5. display the result of the activity function on the site

Issue/Error Message

While attempting to retrieve the result of an activity function within the orchestrator function using call_activity() in Azure Durable Functions, I encountered the following error:

Exception: TypeError: Object of type AtomicTask is not JSON serializable

This issue prevents me from passing the activity function's result to the client function and subsequently displaying it via an HTTP response.

Source Code (Simplified Version)

Providing a simplified version of the source code, excluding exception handling:

Python Programming Model v2

#####################
## client_function ##
#####################
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    
    # Get parameters from HTTP request
    process = req.params.get("process")
    if process is None:
        try:
            process = req.get_json().get('process')
        except (ValueError, KeyError):
            process = 0
    process = int(process)

    instance_id = await client.start_new("orchestrator", None, {"process": process,})
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    
    # Wait for orchestration to complete
    await client.wait_for_completion_or_create_check_status_response(req, instance_id)

    # Get orchestration execution status
    status = await client.get_status(instance_id)

    # Retrieves orchestration execution results and displays them on the screen
    runtime = status.runtime_status
    output = status.output  
    return f"runtime: {runtime}\n\n  output:{output}"
    

###########################
## orchestrator function ##
###########################
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
    parameters = context.get_input()
    process = parameters.get("process")

    result = None
    if process == 0:
        result = context.call_activity("failed")
    else:
        result = "Executed successfully. Pass a char and string in the query or in the request body."

    context.set_custom_status(result)
    return result

#######################
## activity_function ##
#######################
@app.activity_trigger
def failed() -> str:
    return "failed_function executed successfully."

What I've Tried

I've attempted various methods to retrieve the activity function's result in the client function:

  • Using set_custom_status() to transmit information from the orchestrator function to the client function.
  • Employing client.wait_for_completion_or_create_check_status_response(req, instance_id) to obtain information from the orchestrator function in the client function.

Despite trying these approaches, I have been unable to identify the correct method.

TY00
  • 1
  • 1

1 Answers1

0

After adding yield in result = yield context.call_activity("failed","") it worked for me.

Code:

Please Note: You need to pass a parameter in Activity function, as the method signature requires one or else it will not get trigger.

Refer this github issue which says the same.

import  azure.functions  as  func
import  logging
import  azure.durable_functions  as  df

app  =  df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

#####################
## client_function ##
#####################

@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async  def  client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:

# Get parameters from HTTP request
process  =  req.params.get("process")
if  process  is  None:
try:
process  =  req.get_json().get('process')
except (ValueError, KeyError):
process  =  0
process  =  int(process)
instance_id  =  await  client.start_new("orchestrator", None, {"process": process,})
logging.info(f"Started orchestration with ID = '{instance_id}'.")

# Wait for orchestration to complete
await  client.wait_for_completion_or_create_check_status_response(req, instance_id)     

# Get orchestration execution status
status  =  await  client.get_status(instance_id)     

# Retrieves orchestration execution results and displays them on the screen
runtime  =  status.runtime_status
output  =  status.output
return  f"runtime: {runtime}\n\n output:{output}"

###########################
## orchestrator function ##
###########################

@app.orchestration_trigger(context_name="context")
def  orchestrator(context: df.DurableOrchestrationContext) -> str:
parameters  =  context.get_input()
process  =  parameters.get("process")
result  =  None
if  process  ==  0:
result  =  yield  context.call_activity("failed","")
else:
result  =  "Executed successfully. Pass a char and string in the query or in the request body."
context.set_custom_status(result)
return  result

#######################
## activity_function ##
#######################

@app.activity_trigger(input_name="blank")
def  failed(blank: str) -> str:
return  "failed_function executed successfully."  +  blank

Output:

If you will notice that the Activity trigger function is not showing in the functions list below, due to incorrect signature

enter image description here

After correctly it, It is visible as below-

enter image description here

Run Result-

enter image description here

enter image description here

Ikhtesam Afrin
  • 897
  • 1
  • 1
  • 6
  • I thought that if "failed_function executed successfully." was displayed in the "output" field of the HTTP response, then it was working correctly. ・If "output" is "None", does that mean it is working correctly? ・Also, why do you leave "runtime: OrchestrationRuntimeStatus.Running" instead of "runtime: OrchestrationRuntimeStatus.Completed"? – TY00 Aug 25 '23 at 16:19
  • Modified my answer, please check – Ikhtesam Afrin Aug 25 '23 at 18:46
  • I was able to move it correctly. Thank you from the bottom of my heart. It was a blind spot that I had to provide arguments, so it was really helpful. If I have more than one argument, do I just do something like call_activity("failed", "x", "y")? – TY00 Aug 26 '23 at 07:22
  • Yes and you need to pass @app.activity_trigger(input_name="blank") as well – Ikhtesam Afrin Aug 26 '23 at 07:58
  • ========== @app.activity_trigger(input_name="char"...) def delete(char: str, long_string: str) -> str:. long_string = long_string.replace(char, '') return f "The character [{char}] was deleted => [{long_string}]." ========== The above delete function needs two arguments, how do I pass the two input_names to acrivity_trigger()? – TY00 Aug 26 '23 at 15:32
  • Refer this https://stackoverflow.com/questions/65231702/how-to-pass-multiple-parameters-to-azure-durable-activity-function – Ikhtesam Afrin Aug 27 '23 at 06:28