I'm stuck with transforming an Azure HTTP triggered function into something more robust that can take more than 230 seconds.
I struggle with dividing the code into functions, not sure how to construct the activity, orchestrator and client function in my case. I would really appreciate some help here.
The google_search module is defined as below:
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import logging
def calculate_score(link, term):
if term and term in link:
return 100
elif 'xxx' in link and 'yyy' in link:
return 75
elif 'xxx' in link:
return 50
elif link:
return 25
else:
return None
def search(search_terms, api_key, cse_id, num_results=5, country_code='uk'):
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
results = []
error_values = {key: 'Error' for key in ['urls', 'score']}
success = True
error_code = 0
for term in tqdm(search_terms):
try:
if term is None:
row = {
'search_item': term,
'urls': [],
'score': []
}
else:
result = service.cse().list(q=term, cx=cse_id, num=num_results, gl=country_code).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items[:num_results]]
scores = [calculate_score(link, term) for link in top_results]
row = {
'search_item': term,
'urls': top_results,
'score': scores
}
logging.info('Search completed successfully')
except Exception as e:
success = False
error_code = 74
row = {'search_item': term,
**error_values}
logging.error(f'An error occurred during calling the Search function. {e}')
results.append(row)
return success, results, error_code
The init.py function:
import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search_scoring_clean import search, calculate_score
import logging
import os
def main(req: func.HttpRequest) -> func.HttpResponse:
try:
logging.info('Python HTTP trigger function processed a request.')
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
req_body = req.get_json()
search_terms = req_body.get('search_terms')
num_results = int(req_body.get('num_results', 5))
country_code = req_body.get('country_code', 'uk')
params = req_body.get('params', {})
if not search_terms or not api_key or not cse_id:
logging.error('Missing required parameters')
return func.HttpResponse('Missing required parameters', status_code=400)
success, results, error_code = search(search_terms=search_terms,
num_results=num_results,
country_code=country_code,
api_key=api_key,
cse_id=cse_id)
response_data = {
'success': int(success),
'error_code': int(error_code),
**params,
'results': results
}
response_json = json.dumps(response_data)
logging.info('API Call completed successfully')
return func.HttpResponse(response_json, mimetype='application/json')
except Exception as e:
logging.error(f'An error occurred: {str(e)}')
error_code = 66
response_data = {
'success': 0,
'error_code': int(error_code),
**params
}
response_json = json.dumps(response_data)
return func.HttpResponse(response_json, status_code=500, mimetype='application/json')
And a sample request:
{
"search_terms": ["term1", "term2", "term3"],
"num_results": 3,
"params": {
"search_id": "123",
"engine_name": "Google Search"}
}
Desired output example:
{
"success": 1,
"error_code": 0,
"search_id": "123",
"engine_name": "Google Search",
"results": [
{
"search_item": "term1",
"urls": [
"https://sampleresult3.com",
"https://sampleresult2.com",
"https://sampleresult3.com"
],
"score": [
25,
25,
25
]
},
{
"search_item": "term2",
"urls": [
"https://whatever1.com",
"https://whatever.2.com",
"https://whatever3.com"
],
"score": [
25,
25,
75
]
},
{
"search_item": "term3",
"urls": [
"https://www.link1.com",
"https://link2.com",
"https://www.link3.com"
],
"score": [
25,
25,
25
]
}
]
}
EDIT
I tried with the below activity function:
from google_search_scoring_clean import search
import os
def main(search_terms, num_results, country_code):
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
if not search_terms or not api_key or not cse_id:
return False, []
success, results = search(search_terms=search_terms,
num_results=num_results,
country_code=country_code,
api_key=api_key,
cse_id=cse_id)
return success, results
but received an error messege: Result: Failure Exception: FunctionLoadError: cannot load the ActivityFunction function: the following parameters are declared in Python but not in function.json: {'country_code', 'search_terms', 'num_results'}
After editing the function.json to
{
"bindings": [
{
"name": "search_terms",
"type": "string[]",
"direction": "in"
},
{
"name": "num_results",
"type": "int",
"direction": "in"
},
{
"name": "country_code",
"type": "string",
"direction": "in"
}
]
}
however, I receive:
The 'ActivityFunction' function is in error: The binding name country_code is invalid. Please assign a valid name to the binding.
EDIT2:
The below also won't work:
import os
from googleapiclient import discovery
import logging
def main(searchTerm: str) -> str:
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
try:
if searchTerm is None:
results = {
'search_term': searchTerm,
'urls': [],
'scores': []
}
else:
result = service.cse().list(q=searchTerm, cx=cse_id, num=3).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items]
results = {
'search_term': searchTerm,
'urls': top_results,
}
return results
except Exception as e:
error_values = {key: 'Error' for key in ['urls']}
results = {'search_term': searchTerm, **error_values}
logging.error(f'An error occurred during the search: {e}')
return results
I adjusted the name from 'name' to 'searchTerm' in the function.json. The output is:
{
"name": "Orchestrator",
"instanceId": "4de8cc4818554208ad599e8687ca77a7",
"runtimeStatus": "Running",
"input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
"customStatus": null,
"output": null,
"createdTime": "2023-05-31T10:37:24Z",
"lastUpdatedTime": "2023-05-31T10:37:24Z"
}
EDIT3: It worked with the following adjustments
- In function.json of Activity Function I changed 'name' to activityVar - somehow it does not accept activity_var name, have no idea why
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "activityVar",
"type": "activityTrigger",
"direction": "in"
}
]
}
Orchestrator function:
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms= (requestBody['search_terms'])
print("Orchestrator " + str(search_terms))
tasks = []
for search_term in search_terms:
activity_var = {}
activity_var['search_term'] = search_term
activity_var['num_results'] = requestBody['num_results']
activity_var['params'] = requestBody['params']
print(activity_var)
tasks.append(context.call_activity("ActivityFunction", activity_var))
results = yield context.task_all(tasks)
return results
main = df.Orchestrator.create(orchestrator_function)
Where my activity function folder is named "ActivityFunction".
Activity Function for now, as I have to prettify it:
import os
from googleapiclient import discovery
import logging
def main(activityVar: dict) -> dict:
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
try:
if activityVar['search_term'] is None:
results = {
'search_term': activityVar['search_term'],
'urls': [],
'scores': []
}
else:
result = service.cse().list(q=activityVar['search_term'], cx=cse_id, num=3).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items]
results = {
'search_term': activityVar['search_term'],
'urls': top_results,
}
return results
except Exception as e:
error_values = {key: 'Error' for key in ['urls']}
results = {'search_term': activityVar['search_term'], **error_values}
logging.error(f'An error occurred during the search: {e}')
return results
God, it's been a long day. Gotta wrap my head around it once again.