pythonazureazure-durable-functions

Azure durable function : processing a list


I have an azure durable function wrote in python with an orchestrator and two activity functions

Orchestrator calls the first activity function and in return receive a list variable (List of names and this list can be dynamic everytime function gets executed)

The next step would be to call the second activity function for each one of those list items (sequential processing - due to API restriction that the second activity function calls)

#dynamically gets generated by the first activity function
payload=[1,2,3,4]            

tasks = [context.call_activity("secondfunction",ps) for ps in payload]
output = yield context.task_all(tasks)

what I am using in fan out method which is not serial but I don’t seem to be able find an alternative for what I am trying to do.

Also in the host.json file I tried to enforce that only one activity function can run at a given time to avoid the parallel processing

  "extensions": {
    "durableTask": {
      "maxConcurrentActivityFunctions": 1,
      "maxConcurrentOrchestratorFunctions": 1
    }
  }

also it's worth noting that I can not pass the whole list to the activity function as if I do the activity function will take more than 5-10 mins which is the timeout limit for the azure function hence trying to iterate the list in orchestration function

But the result is not sequential

Would appreciate your feedback


Solution

  • You can try using below two approaches to achieve your requirement:-

    Approach 1:-

    My function_app.py:-

    import azure.functions as func
    import azure.durable_functions as df
    
    myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    
    # HTTP Starter
    @myApp.route(route="orchestrators/{functionName}")
    @myApp.durable_client_input(client_name="client")
    async def http_start(req: func.HttpRequest, client):
        function_name = req.route_params.get('functionName')
        instance_id = await client.start_new(function_name, None)  # Pass the functionName here
        response = client.create_check_status_response(req, instance_id)
        return response
    
    # Orchestrator
    @myApp.orchestration_trigger(context_name="context")
    def hello_orchestrator(context):
        cities = ["Seattle", "Tokyo", "London"]
    
        tasks = []
        for city in cities:
            tasks.append(context.call_activity("hello", city))
    
        # Wait for all tasks to complete
        results = yield context.task_all(tasks)
    
        return results
    
    # Activity
    @myApp.activity_trigger(input_name="city")
    def hello(city: str):
        print(f"Processing {city}...")
        # Your activity function logic goes here
        result = f"Hello {city}!"
    
        return result
    
    

    Output:-

    enter image description here

    Function URL:-

    http://localhost:7071/api/orchestrators/hello_orchestrator
    

    Approach 2:-

    function_app.py:-

    import azure.functions as func
    import azure.durable_functions as df
    
    myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    
    # HTTP Starter
    @myApp.route(route="orchestrators/{functionName}")
    @myApp.durable_client_input(client_name="client")
    async def http_start(req: func.HttpRequest, client):
        function_name = req.route_params.get('functionName')
        instance_id = await client.start_new(function_name, None)  # Pass the functionName here
        response = client.create_check_status_response(req, instance_id)
        return response
    
    # Orchestrator
    @myApp.orchestration_trigger(context_name="context")
    def hello_orchestrator(context):
        # Call the first activity to get a list of names
        names_list = yield context.call_activity("get_names")
    
        # Process each name sequentially using the second activity
        results = []
        for name in names_list:
            result = yield context.call_activity("process_name", name)
            results.append(result)
    
        return results
    
    # First Activity
    @myApp.activity_trigger
    def get_names():
        # Your logic to retrieve a dynamic list of names goes here
        # For demonstration purposes, returning a hardcoded list
        return ["John", "Alice", "Bob"]
    
    # Second Activity
    @myApp.activity_trigger(input_name="name")
    def process_name(name: str):
        print(f"Processing {name}...")
        # Your logic to process each name goes here
        result = f"Hello {name}!"
    
        return result