I'm stuck with transforming an Azure HTTP triggered function into something more robust that can take more than 230 seconds.
I struggle with dividing the code into functions, not sure how to construct the activity, orchestrator and client function in my case. I would really appreciate some help here.
The google_search module is defined as below:
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import logging
def calculate_score(link, term):
if term and term in link:
return 100
elif 'xxx' in link and 'yyy' in link:
return 75
elif 'xxx' in link:
return 50
elif link:
return 25
else:
return None
def search(search_terms, api_key, cse_id, num_results=5, country_code='uk'):
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
results = []
error_values = {key: 'Error' for key in ['urls', 'score']}
success = True
error_code = 0
for term in tqdm(search_terms):
try:
if term is None:
row = {
'search_item': term,
'urls': [],
'score': []
}
else:
result = service.cse().list(q=term, cx=cse_id, num=num_results, gl=country_code).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items[:num_results]]
scores = [calculate_score(link, term) for link in top_results]
row = {
'search_item': term,
'urls': top_results,
'score': scores
}
logging.info('Search completed successfully')
except Exception as e:
success = False
error_code = 74
row = {'search_item': term,
**error_values}
logging.error(f'An error occurred during calling the Search function. {e}')
results.append(row)
return success, results, error_code
The init.py function:
import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search_scoring_clean import search, calculate_score
import logging
import os
def main(req: func.HttpRequest) -> func.HttpResponse:
try:
logging.info('Python HTTP trigger function processed a request.')
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
req_body = req.get_json()
search_terms = req_body.get('search_terms')
num_results = int(req_body.get('num_results', 5))
country_code = req_body.get('country_code', 'uk')
params = req_body.get('params', {})
if not search_terms or not api_key or not cse_id:
logging.error('Missing required parameters')
return func.HttpResponse('Missing required parameters', status_code=400)
success, results, error_code = search(search_terms=search_terms,
num_results=num_results,
country_code=country_code,
api_key=api_key,
cse_id=cse_id)
response_data = {
'success': int(success),
'error_code': int(error_code),
**params,
'results': results
}
response_json = json.dumps(response_data)
logging.info('API Call completed successfully')
return func.HttpResponse(response_json, mimetype='application/json')
except Exception as e:
logging.error(f'An error occurred: {str(e)}')
error_code = 66
response_data = {
'success': 0,
'error_code': int(error_code),
**params
}
response_json = json.dumps(response_data)
return func.HttpResponse(response_json, status_code=500, mimetype='application/json')
And a sample request:
{
"search_terms": ["term1", "term2", "term3"],
"num_results": 3,
"params": {
"search_id": "123",
"engine_name": "Google Search"}
}
Desired output example:
{
"success": 1,
"error_code": 0,
"search_id": "123",
"engine_name": "Google Search",
"results": [
{
"search_item": "term1",
"urls": [
"https://sampleresult3.com",
"https://sampleresult2.com",
"https://sampleresult3.com"
],
"score": [
25,
25,
25
]
},
{
"search_item": "term2",
"urls": [
"https://whatever1.com",
"https://whatever.2.com",
"https://whatever3.com"
],
"score": [
25,
25,
75
]
},
{
"search_item": "term3",
"urls": [
"https://www.link1.com",
"https://link2.com",
"https://www.link3.com"
],
"score": [
25,
25,
25
]
}
]
}
EDIT
I tried with the below activity function:
from google_search_scoring_clean import search
import os
def main(search_terms, num_results, country_code):
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
if not search_terms or not api_key or not cse_id:
return False, []
success, results = search(search_terms=search_terms,
num_results=num_results,
country_code=country_code,
api_key=api_key,
cse_id=cse_id)
return success, results
but received an error messege: Result: Failure Exception: FunctionLoadError: cannot load the ActivityFunction function: the following parameters are declared in Python but not in function.json: {'country_code', 'search_terms', 'num_results'}
After editing the function.json to
{
"bindings": [
{
"name": "search_terms",
"type": "string[]",
"direction": "in"
},
{
"name": "num_results",
"type": "int",
"direction": "in"
},
{
"name": "country_code",
"type": "string",
"direction": "in"
}
]
}
however, I receive:
The 'ActivityFunction' function is in error: The binding name country_code is invalid. Please assign a valid name to the binding.
EDIT2:
The below also won't work:
import os
from googleapiclient import discovery
import logging
def main(searchTerm: str) -> str:
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
try:
if searchTerm is None:
results = {
'search_term': searchTerm,
'urls': [],
'scores': []
}
else:
result = service.cse().list(q=searchTerm, cx=cse_id, num=3).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items]
results = {
'search_term': searchTerm,
'urls': top_results,
}
return results
except Exception as e:
error_values = {key: 'Error' for key in ['urls']}
results = {'search_term': searchTerm, **error_values}
logging.error(f'An error occurred during the search: {e}')
return results
I adjusted the name from 'name' to 'searchTerm' in the function.json. The output is:
{
"name": "Orchestrator",
"instanceId": "4de8cc4818554208ad599e8687ca77a7",
"runtimeStatus": "Running",
"input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
"customStatus": null,
"output": null,
"createdTime": "2023-05-31T10:37:24Z",
"lastUpdatedTime": "2023-05-31T10:37:24Z"
}
EDIT3: It worked with the following adjustments
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "activityVar",
"type": "activityTrigger",
"direction": "in"
}
]
}
Orchestrator function:
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms= (requestBody['search_terms'])
print("Orchestrator " + str(search_terms))
tasks = []
for search_term in search_terms:
activity_var = {}
activity_var['search_term'] = search_term
activity_var['num_results'] = requestBody['num_results']
activity_var['params'] = requestBody['params']
print(activity_var)
tasks.append(context.call_activity("ActivityFunction", activity_var))
results = yield context.task_all(tasks)
return results
main = df.Orchestrator.create(orchestrator_function)
Where my activity function folder is named "ActivityFunction".
Activity Function for now, as I have to prettify it:
import os
from googleapiclient import discovery
import logging
def main(activityVar: dict) -> dict:
api_key = os.getenv('apikey')
cse_id = os.getenv('cseid')
service = discovery.build('customsearch', 'v1', developerKey=api_key, cache_discovery=False)
try:
if activityVar['search_term'] is None:
results = {
'search_term': activityVar['search_term'],
'urls': [],
'scores': []
}
else:
result = service.cse().list(q=activityVar['search_term'], cx=cse_id, num=3).execute()
items = result.get('items', [])
top_results = [item.get('link') for item in items]
results = {
'search_term': activityVar['search_term'],
'urls': top_results,
}
return results
except Exception as e:
error_values = {key: 'Error' for key in ['urls']}
results = {'search_term': activityVar['search_term'], **error_values}
logging.error(f'An error occurred during the search: {e}')
return results
God, it's been a long day. Gotta wrap my head around it once again.
The pattern you need to follow is the fan out fan in pattern. I won't be writing the full code for you but you can follow the example given here. My response below should guide you to write the code needed by you.
The aim is to split the search terms list into separate variables so you can trigger multiple activity functions and each of them can do a search for a single variable independently. Since these activity functions are not http triggered functions they can go beyond the 230s limit.
Your http triggered function will look like this. It needs to pass the request body into the orchestrator so you can split the search terms up there before calling the activity functions.
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
requestBody = json.loads(req.get_body().decode())
instance_id = await client.start_new(req.route_params["functionName"], client_input=requestBody)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Your Orchestrator will now recreate the body as a dictionary and pass that as a variable to the activity functions. Only difference is, each activity function will receive only 1 search term. You will get back a list in results which you can format to what you need before returning back a response.
def orchestrator_function(context: df.DurableOrchestrationContext):
requestBody = context.get_input()
search_terms= (requestBody['search_terms'])
print("Orchestrator " + str(search_terms))
tasks = []
for search_term in search_terms:
activity_var = {}
activity_var['search_term'] = search_term
activity_var['num_results'] = requestBody['num_results']
activity_var['params'] = requestBody['params']
print(activity_var)
tasks.append(context.call_activity("Activity", activity_var))
results = yield context.task_all(tasks)
return results
main = df.Orchestrator.create(orchestrator_function)
Finally your activity function will hold the main logic to do the search and return back results for a single search term.
1 important point to remember is that since this entire process is asynchronous, when you call the http starter function, you will immediately get back a dictionary of links while the actual process runs in the background. You will need to implement some kind of polling on the "statusQueryGetUri" link in fixed or exponential backoff intervals to get a status of the execution. Once the result is set to "Completed" you will find your result in the "output" variable.
Below is an example of calling the "statusQueryGetUri" link.
{
"name": "Orchestrator1",
"instanceId": "1a98f11135494cf88fa1d3241b8cc4f3",
"runtimeStatus": "Completed",
"input": "{\"search_terms\": [\"term1\", \"term2\", \"term3\"], \"num_results\": 3, \"params\": {\"search_id\": \"123\", \"engine_name\": \"Google Search\"}}",
"customStatus": null,
"output": [
"Hello {'search_term': 'term1', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
"Hello {'search_term': 'term2', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!",
"Hello {'search_term': 'term3', 'num_results': 3, 'params': {'search_id': '123', 'engine_name': 'Google Search'}}!"
],
"createdTime": "2023-05-30T12:35:22Z",
"lastUpdatedTime": "2023-05-30T12:35:24Z"
}