0

The question is somehow related to this one

I need to create a Http triggered Azure Durable Function that:

  1. Gets an API Call with some parameters, where the most important one is the list of search_terms
  2. Downloads a source data from Azure Blob (once)
  3. Conducts some operations / matching search term with the data from 2.

Based on the response from the linked question I was thinking about the chaining pattern mixed with fan-out-fan-in pattern.

My functions are below (simplified, without exceptions handling etc.)

Durable Function Http Start

import json
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    requestBody = json.loads(req.get_body().decode())
    instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)

    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Orchestrator

import logging
import json
import pandas as pd
import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms = requestBody['search_terms']
    num_results = requestBody['num_results']
    params = requestBody['params']

    # Call ActivityFunction1 to read the Parquet file
    blob_data = yield context.call_activity("ActivityFunction1", None)

    tasks = []
    for search_term in search_terms:
        # Call ActivityFunction2 to perform matching
        activityVar = {
            'blob_data': blob_data,
            'messy_term': search_term,
            'num_matches': num_results
        }
        tasks.append(context.call_activity("ActivityFunction2", activityVar))

    results = yield context.task_all(tasks)

    response = {
        "success": 1,
        "error_code": 0,
        **params,
        "results": results
    }

    return response

main = df.Orchestrator.create(orchestrator_function)

ActivityFunction1

import os
import io
import pandas as pd
from azure.storage.blob import BlobServiceClient

def main(name):
    account_name = os.getenv('accountname')
    account_key = os.getenv('accountkey')
    container_name = os.getenv('containername')
    blob_name = os.getenv('blobname')

    # Create a connection string to the Azure Blob storage account
    connect_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"

    # Create a BlobServiceClient object using the connection string
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)

    # Get a reference to the Parquet blob
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    # Download the blob data as a stream
    blob_data = blob_client.download_blob()

    # Read the Parquet data from the stream into a pandas DataFrame
    df = pd.read_parquet(io.BytesIO(blob_data.readall()))

    # Convert DataFrame to a JSON-serializable dictionary
    df_dict = df.to_dict(orient='records')

    clean_list = [item['column'] for item in df_dict]

    return clean_list

ActivityFunction2

import logging
import time
import numpy as np
import pandas as pd

def main(activityVar: dict) -> dict:
    '''
    Matching function
    '''

    # Read the DataFrame from Azure Blob storage
    print('Reading data from blob...')
    clean_list = activityVar['blob_data']

    ...further processing

    return results

results is a response to an API call. API call example:

{
  "search_terms": ["term1", "term2", "term3"],
  "num_results": 3,
  "params": {
    "search_id": "123",
    "engine_name": "engine"}   
}

Questions:

  1. Something feels off here, but I'm not sure what it is. Based on the logs the function reads the data every time, not only once - is it possible with such construction? If so - what can//should I change?

When I trigger it locally the initial terminal logs are:

For detailed output, run func with --verbose flag.
[2023-06-05T09:30:43.504Z] Worker process started and initialized.
[2023-06-05T09:30:47.329Z] Host lock lease acquired by instance ID '00000000000000000000000014B240CF'.
[2023-06-05T09:30:51.136Z] Executing 'Functions.DurableFunctionsHttpStart' (Reason='This function was programmatically called via the host APIs.', Id=3befc3b3-c8c0-430e-a240-0c3273c5106f)
[2023-06-05T09:30:51.447Z] Started orchestration with ID = 'b77f67233edc4a6a910c5312ab49455a'.
[2023-06-05T09:30:51.508Z] Executed 'Functions.DurableFunctionsHttpStart' (Succeeded, Id=3befc3b3-c8c0-430e-a240-0c3273c5106f, Duration=389ms)
[2023-06-05T09:30:51.531Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=388c0ef0-608e-465d-bbd2-8ae88e0fd547)
[2023-06-05T09:30:51.574Z] Executed 'Functions.Orchestrator' (Succeeded, Id=388c0ef0-608e-465d-bbd2-8ae88e0fd547, Duration=47ms)
[2023-06-05T09:30:51.665Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=66206934-9d82-4fbf-b83a-d24765b99f2c)
[2023-06-05T09:30:51.674Z] Request URL: 'https://myaccount.blob.core.windows.net/mycontainer/myfile.parquet'
Request method: 'GET'
Request headers:
    'x-ms-range': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'User-Agent': 'azsdk-python-storage-blob/12.16.0 Python/3.10.11 (Windows-10-10.0.22621-SP0)'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'a9a34d03-0383-11ee-a0ba-f889d283a98c'
    'Authorization': 'REDACTED'
No body was attached to the request
[2023-06-05T09:30:51.889Z] Response status: 206
Response headers:
    'Content-Length': '33554432'
    'Content-Type': 'application/octet-stream'
    'Content-Range': 'REDACTED'
    'Last-Modified': 'Thu, 01 Jun 2023 08:00:30 GMT'
    'Accept-Ranges': 'REDACTED'
    'ETag': '"0x8DB627644CFEA3E"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': '9ae77aaa-d01e-001e-1290-972efb000000'
    'x-ms-client-request-id': 'a9a34d03-0383-11ee-a0ba-f889d283a98c'
    'x-ms-version': 'REDACTED'
    'x-ms-creation-time': 'REDACTED'
    'x-ms-blob-content-md5': 'REDACTED'
    'x-ms-lease-status': 'REDACTED'
    'x-ms-lease-state': 'REDACTED'
    'x-ms-blob-type': 'REDACTED'
    'Content-Disposition': 'REDACTED'
    'x-ms-server-encrypted': 'REDACTED'
    'Date': 'Mon, 05 Jun 2023 09:30:50 GMT'

where the part responsible, probably, for reading the parquet from Blob repeats many times.

Then it either states a Success but I get no output in my runtime/webhook durabletask or continues to endlessly:

[2023-06-05T09:32:19.050Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=22e8638a-ea0b-43ef-b039-b72917b47216)
[2023-06-05T09:32:25.231Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=4ac12c65-7b65-4c6d-954d-e74930ba0ab9)
[2023-06-05T09:32:30.864Z] Executed 'Functions.ActivityFunction1' (Succeeded, Id=66206934-9d82-4fbf-b83a-d24765b99f2c, Duration=99199ms)
[2023-06-05T09:32:51.586Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=d6740019-b687-49fa-b047-f5c5f4ecb95d)
[2023-06-05T09:32:54.766Z] Executing 'Functions.Orchestrator' (Reason='(null)', Id=7c421729-5e8a-45f3-9a25-2be647b1b8bd)
[2023-06-05T09:33:22.473Z] Error writing message type InvocationRequest to workerId: 499a8573-4cb0-4e06-824d-ca721786c04e
[2023-06-05T09:33:22.474Z] Grpc.AspNetCore.Server: Request aborted while sending the message.
[2023-06-05T09:35:23.431Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=81b99052-e35f-4179-a332-1c3114fc2191)
[2023-06-05T09:35:23.489Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=86d5b3aa-fc9b-4ffd-b808-4c775062c441)
[2023-06-05T09:35:30.044Z] Executing 'Functions.ActivityFunction2' (Reason='(null)', Id=e8b7fd1f-3fae-4db6-b980-e31fb5c3a911)
[2023-06-05T09:35:31.555Z] Executing 'Functions.ActivityFunction1' (Reason='(null)', Id=23197fcf-0235-4044-8482-aa534bfb229c)
[2023-06-05T09:35:41.798Z] Executing 'Functions.ActivityFunction2' (Reason='(null)', Id=03162147-f450-46d0-adc5-1167e1033f61)
  1. At the moment I convert the result of ActivityFunction1 from dataframe to dict so I pass it to the ActivityFunction2 - is it possible to pass a pandas DataFrame instead of the dictionary?

I feel that I messed something here, maybe ActivityFunction1 shouldn't be defined as an activity function but within the orchestrator? I'm not sure how to continue with it and would appreciate any help.

EDIT: I also tried with a slightly different approach, with Orchestrator created like this:

Alternative Orchestrator

import logging
import json
import os
import pandas as pd
import azure.functions as func
import azure.durable_functions as df
from azure.storage.blob import BlobServiceClient
import io

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms = requestBody['search_terms']
    num_results = requestBody['num_results']
    params = requestBody['params']

    # Call ActivityFunction1 to read the Parquet file
    blob_data = yield context.call_activity("ActivityFunction1", None)

    # Call ActivityFunction2 to perform matching

    activityVar = {
        'blob_data': blob_data,
        'search_terms': search_terms,
        'num_matches': num_results
    }

    results = yield context.call_activity("ActivityFunction2", activityVar)

    response = {
        "success": 1,
        "error_code": 0,
        **params,
        "results": results
    }

    return response

main = df.Orchestrator.create(orchestrator_function)

And for loop in ActivityFunction2:

Alternative ActivityFunction2

def main(activityVar: dict) -> dict:
    '''
    Matching function
    '''
    start_time = time.time()

    clean_list = activityVar['blob_data']
    
    print('Starting matching...')
    start_time_matching = time.time()
    results = []

    messy_list = activityVar['search_terms']

    for i in tqdm(range(len(messy_list))):
        term = messy_list[i]

...

Which works locally but fails due to 137 error when deployed to Azure, even when I use a Premium Plan with 14 GB RAM.

Freejack
  • 168
  • 10
  • So just initial thoughts, For question 2: No you cannot pass a dataframe from Activity1 to Activity2. What you are doing is correct that is convert to dict and pass. Also, the logic in Activity1 should not be inside the Orchestrator. It should be separate as you have done. Is it possible to attach the parquet file so we can recreate exactly what you've done. Not sure if there is some failure during the parquet read and conversion to dataframe and then to dict. Have you tried debugging the Activity1 to make sure it runs through properly? – Anupam Chand Jun 07 '23 at 04:48
  • Unfortunately I cannot send the parquet as it contains confidential data. It's not extremely big though (app 200 mb) - the function works just fine locally when is not wrapped into the Durable Azure Function; however, when I test it (even locally) it almost never runs succesfully; a couple of seconds ago I received [2023-06-07T07:16:54.147Z] Exception encountered while listening to EventStream [2023-06-07T07:16:54.148Z] System.Private.CoreLib: Exception of type 'System.OutOfMemoryException' was thrown. even though I use 32 GB RAM... – Freejack Jun 07 '23 at 07:28
  • I added an edited versions to the question, it works locally, but also encounters 137 error when deployed to Azure. :/ – Freejack Jun 07 '23 at 08:04

1 Answers1

0

For question 2: No you cannot pass a dataframe from Activity1 to Activity2. What you are doing is correct that is convert to dict and pass. Also, the logic in Activity1 should not be inside the Orchestrator. It should be separate as you have done. The parquet file may be 200MB but when converted to a dataframe, it will be more.

I found some online documentation regarding out of memory errors in Durable functions and some of the suggestions may help. It seems it has something to do with Orchestrator replays. But I cannot reproduce the error as I do not have a sample of the parquet file to work with. I have never encountered this error myself.

Have a look at this question and its proposed solution.

Anupam Chand
  • 2,209
  • 1
  • 5
  • 14
  • Thank you - I will test it during the weekend. Do you think the alternatywę approach with simplified Orchestrator and for loop in ActivityFunction2 - as in Edited version - may be a valid alternative? – Freejack Jun 08 '23 at 09:08
  • I would not recommend it. With the alt approach you are moving away from fan out pattern which is the main advantage of durable functions. For sure this approach would take longer than the fan out approach which is definitely not what you would want. – Anupam Chand Jun 08 '23 at 15:04
  • I've been experimenting with the arguments in host.json for a while, but still no success. The parquet table has app 15 mln rows and I'm still encountering System.Private.CoreLib: Exception of type 'System.OutOfMemoryException' was thrown - even when using the local machine. Not sure how to cope with it without taking this preprocessing to ADF. – Freejack Jun 12 '23 at 12:16