0

I'm stuck with transforming an Azure HTTP triggered function into something more robust that can take more than 230 seconds.

I struggle with dividing the code into functions, not sure how to construct the activity, orchestrator and client function in my case. I would really appreciate some help here.

The google_search module is defined as below:

from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import logging

def calculate_score(link, term):
    if term and term in link:
        return 100
    elif 'xxx' in link and 'yyy' in link:
        return 75
    elif 'xxx' in link:
        return 50
    elif link:
        return 25
    else:
        return None

def search(search_terms, api_key, cse_id, num_results=5, country_code='uk'):
    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
    results = []
    error_values = {key: 'Error' for key in ['urls', 'score']}
    success = True
    error_code = 0
    for term in tqdm(search_terms):
        try:
            if term is None:
                row = {
                    'search_item': term,
                    'urls': [],
                    'score': []
                }
            else:
                result = service.cse().list(q=term, cx=cse_id, num=num_results, gl=country_code).execute()
                items = result.get('items', [])
                top_results = [item.get('link') for item in items[:num_results]]
                scores = [calculate_score(link, term) for link in top_results]

                row = {
                    'search_item': term,
                    'urls': top_results,
                    'score': scores
                }
                logging.info('Search completed successfully')
        except Exception as e:
            success = False
            error_code = 74
            row = {'search_item': term, 
                   **error_values}
            logging.error(f'An error occurred during calling the Search function. {e}')

        results.append(row)

    return success, results, error_code

The init.py function:

import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search_scoring_clean import search, calculate_score
import logging
import os

def main(req: func.HttpRequest) -> func.HttpResponse:
    try:
        logging.info('Python HTTP trigger function processed a request.')
        api_key = os.getenv('apikey')
        cse_id = os.getenv('cseid')
        req_body = req.get_json()
        search_terms = req_body.get('search_terms')
        num_results = int(req_body.get('num_results', 5))
        country_code = req_body.get('country_code', 'uk')
        params = req_body.get('params', {})

        if not search_terms or not api_key or not cse_id:
            logging.error('Missing required parameters')
            return func.HttpResponse('Missing required parameters', status_code=400)

        success, results, error_code = search(search_terms=search_terms,
                                              num_results=num_results,
                                              country_code=country_code,
                                              api_key=api_key,
                                              cse_id=cse_id)

        response_data = {
            'success': int(success),
            'error_code': int(error_code),
            **params,
            'results': results
        }
        response_json = json.dumps(response_data)

        logging.info('API Call completed successfully')
        return func.HttpResponse(response_json, mimetype='application/json')
    
    except Exception as e:
        logging.error(f'An error occurred: {str(e)}')
        error_code = 66
        response_data = {
            'success': 0,
            'error_code': int(error_code),
            **params
        }
        response_json = json.dumps(response_data)
        return func.HttpResponse(response_json, status_code=500, mimetype='application/json')

And a sample request:

{
  "search_terms": ["term1", "term2", "term3"],
  "num_results": 3,
  "params": {
    "search_id": "123",
    "engine_name": "Google Search"}   
}

Desired output example:

{
    "success": 1,
    "error_code": 0,
    "search_id": "123",
    "engine_name": "Google Search",
    "results": [
        {
            "search_item": "term1",
            "urls": [
                "https://sampleresult3.com",
                "https://sampleresult2.com",
                "https://sampleresult3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        },
        {
            "search_item": "term2",
            "urls": [
                "https://whatever1.com",
                "https://whatever.2.com",
                "https://whatever3.com"
            ],
            "score": [
                25,
                25,
                75
            ]
        },
        {
            "search_item": "term3",
            "urls": [
                "https://www.link1.com",
                "https://link2.com",
                "https://www.link3.com"
            ],
            "score": [
                25,
                25,
                25
            ]
        }
    ]
}

EDIT

I tried with the below activity function:

from google_search_scoring_clean import search
import os

def main(search_terms, num_results, country_code):
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    if not search_terms or not api_key or not cse_id:
        return False, []

    success, results = search(search_terms=search_terms,
                              num_results=num_results,
                              country_code=country_code,
                              api_key=api_key,
                              cse_id=cse_id)

    return success, results

but received an error messege: Result: Failure Exception: FunctionLoadError: cannot load the ActivityFunction function: the following parameters are declared in Python but not in function.json: {'country_code', 'search_terms', 'num_results'}

After editing the function.json to

{
  "bindings": [
    {
      "name": "search_terms",
      "type": "string[]",
      "direction": "in"
    },
    {
      "name": "num_results",
      "type": "int",
      "direction": "in"
    },
    {
      "name": "country_code",
      "type": "string",
      "direction": "in"
    }
  ]
}

however, I receive:

The 'ActivityFunction' function is in error: The binding name country_code is invalid. Please assign a valid name to the binding.

EDIT2:

The below also won't work:

import os
from googleapiclient import discovery
import logging

def main(searchTerm: str) -> str:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if searchTerm is None:
            results = {
                'search_term': searchTerm,
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=searchTerm, cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': searchTerm,
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': searchTerm, **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

I adjusted the name from 'name' to 'searchTerm' in the function.json. The output is:

{
    "name": "Orchestrator",
    "instanceId": "4de8cc4818554208ad599e8687ca77a7",
    "runtimeStatus": "Running",
    "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
    "customStatus": null,
    "output": null,
    "createdTime": "2023-05-31T10:37:24Z",
    "lastUpdatedTime": "2023-05-31T10:37:24Z"
}

EDIT3: It worked with the following adjustments

  1. In function.json of Activity Function I changed 'name' to activityVar - somehow it does not accept activity_var name, have no idea why
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "activityVar",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

Orchestrator function:

import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms= (requestBody['search_terms'])
    print("Orchestrator " + str(search_terms))
    tasks = []
    for search_term in search_terms:
        activity_var = {}
        activity_var['search_term'] = search_term
        activity_var['num_results'] = requestBody['num_results']
        activity_var['params'] = requestBody['params']
        print(activity_var)
        tasks.append(context.call_activity("ActivityFunction", activity_var))

    results = yield context.task_all(tasks)
    return results

main = df.Orchestrator.create(orchestrator_function)

Where my activity function folder is named "ActivityFunction".

Activity Function for now, as I have to prettify it:

import os
from googleapiclient import discovery
import logging

def main(activityVar: dict) -> dict:
    api_key = os.getenv('apikey')
    cse_id = os.getenv('cseid')

    service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)

    try:
        if activityVar['search_term'] is None:
            results = {
                'search_term': activityVar['search_term'],
                'urls': [],
                'scores': []
            }
        else:
            result = service.cse().list(q=activityVar['search_term'], cx=cse_id, num=3).execute()
            items = result.get('items', [])
            top_results = [item.get('link') for item in items]

            results = {
                'search_term': activityVar['search_term'],
                'urls': top_results,
            }

        return results

    except Exception as e:
        error_values = {key: 'Error' for key in ['urls']}
        results = {'search_term': activityVar['search_term'], **error_values}
        logging.error(f'An error occurred during the search: {e}')
        return results

God, it's been a long day. Gotta wrap my head around it once again.

Freejack
  • 168
  • 10
  • If the line where you call the search function is taking all of the time, then you would need to somehow split that into executable parts to get benefit of using Durable Functions. As it stands, it doesn't look like it would get any benefit. – juunas May 30 '23 at 09:38
  • In fact all I need is to allow it to run more than 230-240 seconds and durable functions seem to be one of the most 'popular' solutions. I simply need to avoid the timeout. – Freejack May 30 '23 at 09:48
  • The way Durable Functions allow longer execution is by splitting the execution to activity functions. These activity functions still have the same timeout as functions normally have. If you only have one step that takes all the time, moving it to an activity does not change anything. – juunas May 30 '23 at 10:11
  • So I should somehow split the search_terms into chunks or even single queries so they can be conducted in the async way, right? – Freejack May 30 '23 at 11:10
  • Yeah, exactly :) So that each activity function can run one chunk within the normal limits. – juunas May 30 '23 at 11:11
  • And that's exactly where I struggle I believe.:) I can imagine my search_function takes one element but I have no idea how to construct the required functions - activity/trigger and orchestrator to deal with this case. – Freejack May 30 '23 at 11:29
  • @Freejack Can you provide a sample request ? What do the search terms look like in the request? You will need to split the search terms request into individual terms and the search will be conducted by multiple activity functions. Once you give a sample, we can help you more. – Anupam Chand May 30 '23 at 11:36
  • Sure, I will edit the code in a moment. – Freejack May 30 '23 at 11:50
  • Edited the code so it includes all the workflow now... – Freejack May 30 '23 at 11:58

1 Answers1

1

The pattern you need to follow is the fan out fan in pattern. I won't be writing the full code for you but you can follow the example given here. My response below should guide you to write the code needed by you.

The aim is to split the search terms list into separate variables so you can trigger multiple activity functions and each of them can do a search for a single variable independently. Since these activity functions are not http triggered functions they can go beyond the 230s limit.

Your http triggered function will look like this. It needs to pass the request body into the orchestrator so you can split the search terms up there before calling the activity functions.

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    requestBody = json.loads(req.get_body().decode())
    instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)

    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Your Orchestrator will now recreate the body as a dictionary and pass that as a variable to the activity functions. Only difference is, each activity function will receive only 1 search term. You will get back a list in results which you can format to what you need before returning back a response.

def orchestrator_function(context: df.DurableOrchestrationContext):
    requestBody = context.get_input()
    search_terms= (requestBody['search_terms'])
    print("Orchestrator " + str(search_terms))
    tasks = []
    for search_term in search_terms:
        activity_var = {}
        activity_var['search_term'] = search_term
        activity_var['num_results'] = requestBody['num_results']
        activity_var['params'] = requestBody['params']
        print(activity_var)
        tasks.append(context.call_activity("Activity", activity_var))

    results = yield context.task_all(tasks)
    return results

main = df.Orchestrator.create(orchestrator_function)

Finally your activity function will hold the main logic to do the search and return back results for a single search term.

1 important point to remember is that since this entire process is asynchronous, when you call the http starter function, you will immediately get back a dictionary of links while the actual process runs in the background. You will need to implement some kind of polling on the "statusQueryGetUri" link in fixed or exponential backoff intervals to get a status of the execution. Once the result is set to "Completed" you will find your result in the "output" variable.
Below is an example of calling the "statusQueryGetUri" link.

{
    "name": "Orchestrator1",
    "instanceId": "1a98f11135494cf88fa1d3241b8cc4f3",
    "runtimeStatus": "Completed",
    "input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
    "customStatus": null,
    "output": [
        "Hello {'search_term': 'term1', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
        "Hello {'search_term': 'term2', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
        "Hello {'search_term': 'term3', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!"
    ],
    "createdTime": "2023-05-30T12:35:22Z",
    "lastUpdatedTime": "2023-05-30T12:35:24Z"
}
Anupam Chand
  • 2,209
  • 1
  • 5
  • 14
  • Thank you for pointing out to the fan out fan in pattern. However I still have no idea how to adjust my activity function. Any samples I try fail during the local testing... – Freejack May 30 '23 at 16:22
  • Did you already refer to the link I provided? An example activity would be https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-cloud-backup?tabs=python#e2_copyfiletoblob-activity-function. Whatever you have already written will need to go in this activity. And you need to return the response you get back for that particular search item. Your orchestrator can then merge all the responses together. What error is your activity failing with ? Maybe update your question with your new code and the new error. – Anupam Chand May 30 '23 at 16:41
  • I added the function to the question - followed by other workarounds and outputs. It seems that I simply don't get the point of the durable functions; never dealt with such a tool before and really struggle with the solution. – Freejack May 30 '23 at 17:11
  • As far as I know, you cannot send multiple parameters from Orchestrator to activity. If you refer to my Orchestrator you can see that I'm creating 1 dictionary variable called activity_var and only passing that to the activity. There is no need to change the binding. You can then unpack it in the activity itself. You can refer to this answer on that problem which address it in the same way. https://stackoverflow.com/questions/65231702/how-to-pass-multiple-parameters-to-azure-durable-activity-function. – Anupam Chand May 31 '23 at 00:49
  • Thank you for the provided link - I will try to somehow adjust the activity function, however as my deadline is in like 2 days I will probably need to experiment either with the requests sent in chunks to avoid exceeding the time limit or some 'keep_alive' function that will send requests every x seconds - durable functions seem to be too hard for me at the moment with the search function I constructed. – Freejack May 31 '23 at 08:25
  • Edited the code but still no success, the same refers to lack of activity_var in function json. :( – Freejack May 31 '23 at 09:30
  • Have you tried using the default bindings ? I didn't have to do anything on the function.json and was able to pass the variable from orchestrator to activity. – Anupam Chand May 31 '23 at 11:55
  • as default bindings "name" is "name" - it would work if I change the "activityVar" to "name" in my activity function. In fact all the issues have been caused by this detail... – Freejack May 31 '23 at 12:11
  • Yes undertsood and what you did was correct. Since you used activityVar in your Activity function, that needs to go in the function.json as well. Possibly the binding name doesn't allow underscores. – Anupam Chand May 31 '23 at 12:28
  • Indeed. But figuring these two details took me a while... and like 3 cups of coffee. :) Thank you for help. – Freejack May 31 '23 at 12:32
  • If this is for some production work you also need to consider cleaning up information in the table storage attached to your function on Azure. Table storage is how durable functions maintains its state fulness. There is sample code online on how to do that. – Anupam Chand May 31 '23 at 13:37
  • Does it appear in the linked storage account? For now I'm debugging it locally, so there was no need, so far. The API keys and Function Key for the ordinary App are stored in Key Vault. – Freejack May 31 '23 at 13:47
  • Yes the linked storage account. Even when testing locally durable functions needs to store the state somewhere. Assuming you've set it to an azure storage, you should see the tables there contains data related to the state. So make sure you have a frequently triggered function (weekly or monthly) to clear the old rows from the tables. You can Google for the code snippet. – Anupam Chand May 31 '23 at 13:53
  • Thank you for the info! Just one more question - will this workflow also be good for a scenario when I read the parquet from container/blob and conduct something like matching that may last even couple of hours? I believe the file should be read once so then queries can call it. – Freejack May 31 '23 at 14:15
  • If the read is only once you are probably looking for the chaining pattern. But you need to use a premium or dedicated appservice to allow for unlimited function timelimit – Anupam Chand May 31 '23 at 14:49
  • I will set a separate topić then :) one last thing - do you happen to know best logging / error handling practices for durable functions? – Freejack May 31 '23 at 17:41
  • The logging for durable functions is no different from normal function logging. Just be sure you have application insights switched on – Anupam Chand Jun 01 '23 at 00:24
  • I posted a bit more complex question about chaining pattern here https://stackoverflow.com/questions/76405171/chaining-pattern-activity-function-with-reading-data-from-blob-and-fan-out-fan-i - as your knowledge was extremely helpful - would really appreciate if you could have a look – Freejack Jun 05 '23 at 09:42