I have an azure durable function wrote in python with an orchestrator and two activity functions
Orchestrator calls the first activity function and in return receive a list variable (List of names and this list can be dynamic everytime function gets executed)
The next step would be to call the second activity function for each one of those list items (sequential processing - due to API restriction that the second activity function calls)
#dynamically gets generated by the first activity function
payload=[1,2,3,4]
tasks = [context.call_activity("secondfunction",ps) for ps in payload]
output = yield context.task_all(tasks)
what I am using in fan out method which is not serial but I don’t seem to be able find an alternative for what I am trying to do.
Also in the host.json file I tried to enforce that only one activity function can run at a given time to avoid the parallel processing
"extensions": {
"durableTask": {
"maxConcurrentActivityFunctions": 1,
"maxConcurrentOrchestratorFunctions": 1
}
}
also it's worth noting that I can not pass the whole list to the activity function as if I do the activity function will take more than 5-10 mins which is the timeout limit for the azure function hence trying to iterate the list in orchestration function
But the result is not sequential
Would appreciate your feedback
You can try using below two approaches to achieve your requirement:-
Approach 1:-
My function_app.py:-
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
# HTTP Starter
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name, None) # Pass the functionName here
response = client.create_check_status_response(req, instance_id)
return response
# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
cities = ["Seattle", "Tokyo", "London"]
tasks = []
for city in cities:
tasks.append(context.call_activity("hello", city))
# Wait for all tasks to complete
results = yield context.task_all(tasks)
return results
# Activity
@myApp.activity_trigger(input_name="city")
def hello(city: str):
print(f"Processing {city}...")
# Your activity function logic goes here
result = f"Hello {city}!"
return result
Output:-
Function URL:-
http://localhost:7071/api/orchestrators/hello_orchestrator
Approach 2:-
function_app.py:-
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
# HTTP Starter
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name, None) # Pass the functionName here
response = client.create_check_status_response(req, instance_id)
return response
# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
# Call the first activity to get a list of names
names_list = yield context.call_activity("get_names")
# Process each name sequentially using the second activity
results = []
for name in names_list:
result = yield context.call_activity("process_name", name)
results.append(result)
return results
# First Activity
@myApp.activity_trigger
def get_names():
# Your logic to retrieve a dynamic list of names goes here
# For demonstration purposes, returning a hardcoded list
return ["John", "Alice", "Bob"]
# Second Activity
@myApp.activity_trigger(input_name="name")
def process_name(name: str):
print(f"Processing {name}...")
# Your logic to process each name goes here
result = f"Hello {name}!"
return result