The question is somehow related to this one
I need to create a Http triggered Azure Durable Function that:
- Gets an API Call with some parameters, where the most important one is the list of search_terms
- Downloads a source data from Azure Blob (once)
- Conducts some operations / matching search term with the data from 2.
Based on the response from the linked question I was thinking about the chaining pattern mixed with fan-out-fan-in pattern.
My functions are below (simplified, without exceptions handling etc.)
Durable Function Http Start
import json
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
requestBody = json.loads(req.get_body().decode())
instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Orchestrator
import logging
import json
import pandas as pd
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms = requestBody['search_terms']
num_results = requestBody['num_results']
params = requestBody['params']
# Call ActivityFunction1 to read the Parquet file
blob_data = yield context.call_activity("ActivityFunction1", None)
tasks = []
for search_term in search_terms:
# Call ActivityFunction2 to perform matching
activityVar = {
'blob_data': blob_data,
'messy_term': search_term,
'num_matches': num_results
}
tasks.append(context.call_activity("ActivityFunction2", activityVar))
results = yield context.task_all(tasks)
response = {
"success": 1,
"error_code": 0,
**params,
"results": results
}
return response
main = df.Orchestrator.create(orchestrator_function)
ActivityFunction1
import os
import io
import pandas as pd
from azure.storage.blob import BlobServiceClient
def main(name):
account_name = os.getenv('accountname')
account_key = os.getenv('accountkey')
container_name = os.getenv('containername')
blob_name = os.getenv('blobname')
# Create a connection string to the Azure Blob storage account
connect_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
# Create a BlobServiceClient object using the connection string
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Get a reference to the Parquet blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Download the blob data as a stream
blob_data = blob_client.download_blob()
# Read the Parquet data from the stream into a pandas DataFrame
df = pd.read_parquet(io.BytesIO(blob_data.readall()))
# Convert DataFrame to a JSON-serializable dictionary
df_dict = df.to_dict(orient='records')
clean_list = [item['column'] for item in df_dict]
return clean_list
ActivityFunction2
import logging
import time
import numpy as np
import pandas as pd
def main(activityVar: dict) -> dict:
'''
Matching function
'''
# Read the DataFrame from Azure Blob storage
print('Reading data from blob...')
clean_list = activityVar['blob_data']
...further processing
return results
results is a response to an API call. API call example:
{
"search_terms": ["term1", "term2", "term3"],
"num_results": 3,
"params": {
"search_id": "123",
"engine_name": "engine"}
}
Questions:
- Something feels off here, but I'm not sure what it is. Based on the logs the function reads the data every time, not only once - is it possible with such construction? If so - what can//should I change?
When I trigger it locally the initial terminal logs are:
For detailed output, run func with --verbose flag.
[2023-06-05T09:30:43.504Z] Worker process started and initialized.
[2023-06-05T09:30:47.329Z] Host lock lease acquired by instance ID '00000000000000000000000014B240CF'.
[2023-06-05T09:30:51.136Z] Executing 'Functions.DurableFunctionsHttpStart' (Reason='This function was programmatically called via the host APIs.', Id=3befc3b3-c8c0-430e-a240-0c3273c5106f)
[2023-06-05T09:30:51.447Z] Started orchestration with ID = 'b77f67233edc4a6a910c5312ab49455a'.
[2023-06-05T09:30:51.508Z] Executed 'Functions.DurableFunctionsHttpStart' (Succeeded, Id=3befc3b3-c8c0-430e-a240-0c3273c5106f, Duration=389ms)
[2023-06-05T09:30:51.531Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=388c0ef0-608e-465d-bbd2-8ae88e0fd547)
[2023-06-05T09:30:51.574Z] Executed 'Functions.Orchestrator' (Succeeded, Id=388c0ef0-608e-465d-bbd2-8ae88e0fd547, Duration=47ms)
[2023-06-05T09:30:51.665Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=66206934-9d82-4fbf-b83a-d24765b99f2c)
[2023-06-05T09:30:51.674Z] Request URL: 'https://myaccount.blob.core.windows.net/mycontainer/myfile.parquet'
Request method: 'GET'
Request headers:
'x-ms-range': 'REDACTED'
'x-ms-version': 'REDACTED'
'Accept': 'application/xml'
'User-Agent': 'azsdk-python-storage-blob/12.16.0 Python/3.10.11 (Windows-10-10.0.22621-SP0)'
'x-ms-date': 'REDACTED'
'x-ms-client-request-id': 'a9a34d03-0383-11ee-a0ba-f889d283a98c'
'Authorization': 'REDACTED'
No body was attached to the request
[2023-06-05T09:30:51.889Z] Response status: 206
Response headers:
'Content-Length': '33554432'
'Content-Type': 'application/octet-stream'
'Content-Range': 'REDACTED'
'Last-Modified': 'Thu, 01 Jun 2023 08:00:30 GMT'
'Accept-Ranges': 'REDACTED'
'ETag': '"0x8DB627644CFEA3E"'
'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
'x-ms-request-id': '9ae77aaa-d01e-001e-1290-972efb000000'
'x-ms-client-request-id': 'a9a34d03-0383-11ee-a0ba-f889d283a98c'
'x-ms-version': 'REDACTED'
'x-ms-creation-time': 'REDACTED'
'x-ms-blob-content-md5': 'REDACTED'
'x-ms-lease-status': 'REDACTED'
'x-ms-lease-state': 'REDACTED'
'x-ms-blob-type': 'REDACTED'
'Content-Disposition': 'REDACTED'
'x-ms-server-encrypted': 'REDACTED'
'Date': 'Mon, 05 Jun 2023 09:30:50 GMT'
where the part responsible, probably, for reading the parquet from Blob repeats many times.
Then it either states a Success but I get no output in my runtime/webhook durabletask or continues to endlessly:
[2023-06-05T09:32:19.050Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=22e8638a-ea0b-43ef-b039-b72917b47216)
[2023-06-05T09:32:25.231Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=4ac12c65-7b65-4c6d-954d-e74930ba0ab9)
[2023-06-05T09:32:30.864Z] Executed 'Functions.ActivityFunction1' (Succeeded, Id=66206934-9d82-4fbf-b83a-d24765b99f2c, Duration=99199ms)
[2023-06-05T09:32:51.586Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=d6740019-b687-49fa-b047-f5c5f4ecb95d)
[2023-06-05T09:32:54.766Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=7c421729-5e8a-45f3-9a25-2be647b1b8bd)
[2023-06-05T09:33:22.473Z] Error writing message type InvocationRequest to workerId: 499a8573-4cb0-4e06-824d-ca721786c04e
[2023-06-05T09:33:22.474Z] Grpc.AspNetCore.Server: Request aborted while sending the message.
[2023-06-05T09:35:23.431Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=81b99052-e35f-4179-a332-1c3114fc2191)
[2023-06-05T09:35:23.489Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=86d5b3aa-fc9b-4ffd-b808-4c775062c441)
[2023-06-05T09:35:30.044Z] Executing 'Functions.ActivityFunction2' (Reason='(null)', Id=e8b7fd1f-3fae-4db6-b980-e31fb5c3a911)
[2023-06-05T09:35:31.555Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=23197fcf-0235-4044-8482-aa534bfb229c)
[2023-06-05T09:35:41.798Z] Executing 'Functions.ActivityFunction2' (Reason='(null)', Id=03162147-f450-46d0-adc5-1167e1033f61)
- At the moment I convert the result of ActivityFunction1 from dataframe to dict so I pass it to the ActivityFunction2 - is it possible to pass a pandas DataFrame instead of the dictionary?
I feel that I messed something here, maybe ActivityFunction1 shouldn't be defined as an activity function but within the orchestrator? I'm not sure how to continue with it and would appreciate any help.
EDIT: I also tried with a slightly different approach, with Orchestrator created like this:
Alternative Orchestrator
import logging
import json
import os
import pandas as pd
import azure.functions as func
import azure.durable_functions as df
from azure.storage.blob import BlobServiceClient
import io
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms = requestBody['search_terms']
num_results = requestBody['num_results']
params = requestBody['params']
# Call ActivityFunction1 to read the Parquet file
blob_data = yield context.call_activity("ActivityFunction1", None)
# Call ActivityFunction2 to perform matching
activityVar = {
'blob_data': blob_data,
'search_terms': search_terms,
'num_matches': num_results
}
results = yield context.call_activity("ActivityFunction2", activityVar)
response = {
"success": 1,
"error_code": 0,
**params,
"results": results
}
return response
main = df.Orchestrator.create(orchestrator_function)
And for loop in ActivityFunction2:
Alternative ActivityFunction2
def main(activityVar: dict) -> dict:
'''
Matching function
'''
start_time = time.time()
clean_list = activityVar['blob_data']
print('Starting matching...')
start_time_matching = time.time()
results = []
messy_list = activityVar['search_terms']
for i in tqdm(range(len(messy_list))):
term = messy_list[i]
...
Which works locally but fails due to 137 error when deployed to Azure, even when I use a Premium Plan with 14 GB RAM.