azureazure-functionsazure-durable-functionsdotnet-isolated

Strange bevahiour of Durable functions in .NET 8.0 Isolated with activityFunctions


I am having very bad experiencie with durable functions, So the case is that I have the following code:

public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context, string input)
{         
  int limit = 200;
  int offset = 0;
  bool isLastPage = false;         

  var options = TaskOptions.FromRetryPolicy(new RetryPolicy(
          maxNumberOfAttempts: 3,
          firstRetryInterval: TimeSpan.FromSeconds(5)));

  while (!isLastPage)
  {
      try
      {
          var batchResult = await context.CallActivityAsync<BatchResult>("FetchZaraProductsBatch", new BatchRequest
          {
              Limit = limit,
              Offset = offset,
              Query = input,
              Section = input
          }, options: options);

          isLastPage = batchResult.IsLastPage;
          offset = batchResult.lastPickedPosition;

          ReportStatus(context, batchResult.ClothingItems.Count, offset, input);
      }
      catch (Exception ex)
      {
          _logger.LogError($"Failed to fetch batch after retries. Error: {ex.Message}");
          throw;
      }
  }
}

enter image description here

So that durable function should handle x invokes of the activity function, and yet after it is deployed to cloud it triggers for example one run per 5 minutes of one invoke and then it runs 6 runs, and then it stops, so it doesn't even run for a 5 minutes. My second question is how can I increase timeout to 10 minutes, I tried with host.json but it doesnt work.


Solution

  • First I'll describe some things that you seem to have misunderstood, so it's easier to understand. And then at the end I'll give you an example of how you could re-write your code.

    Problems with your current approach

    There are several problems.

    Some may contribute to the problem you're seeing, while others will cause your Function to fail in different ways. But they all need addressing. I'll explain later in more detail, but this is the summary:

    In other words there's nothing wrong with Durable Functions here. But rather, you need to re-think how to use them.

    What will help you understand

    You'll understand the above problems better if you read up on how Durable Orchestrations use their replay history.^3

    A brief precis on how the Durable Functions work: they physically exit from memory each time you await an Activity. When they resume after an Activity, they actually begin right from the start again. This is hard to get your head around, because you'd think "that can't be right, because it seems to carry on from just after the await?"

    But the magic is: they check the replay history to see which parts they have already completed. They can therefore skip any Activities that alreayd happened, so it feels like they have resumed from where they left off.

    How you can fix your code

    I can see two possible approaches:

    1. Keep your approach of retrieving a page (i.e. 200) at a time, but use the Eternal Orchestrations pattern^4 instead of the while loop.
    2. Switch to having one function that retrieves the full list of work items (let's say there are 10,000) and then processes them with parallelisation, using the fan-out/fan-in pattern.^5

    I'll give a brief re-write of your code for each of those two approaches.

    Note that I've made some assumptions and simplified the example so it's easier to see the key points (e.g. I removed the try..catch and the retry logic.)

    1. Using Eternal Orchestrations

    We can remove your unsafe while loop, and instead use the ContinueAsNew method. This causes the Orchestration to repeat in a safe way, passing the new offset each time:

    public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
    {         
        bool isLastPage = false;         
    
        var batchResult = await context.CallActivityAsync<BatchResult>("FetchZaraProductsBatch", new BatchRequest
        {
            Limit = 200,
            Offset = offset,
            Query = input,
            Section = input
        };
    
        isLastPage = batchResult.IsLastPage;
        offset = batchResult.lastPickedPosition;
    
        await context.CallActivityAsync("ReportStatus" /*...etc...*/);
    
        if (!isLastPage)
        {
            context.ContinueAsNew(input, offset); // This is the important bit.
        }
    }
    

    2. Using Fan-Out/Fan-In

    You haven't shown what your FetchZaraProductsBatch is doing, so it's hard to say if this is the right approach. But let's assume your process is something like:

    1. Retrieve a list of all the Zara products
    2. For each one, process it

    Your code at the moment would therefore be processing 200 products in each batch, but only sequentially, i.e. one at a time. Let's assume this takes 10 seconds to process each Product. Your Activity Function would take 2,000 seconds, which would cause a timeout. The process would fail.

    A better approach might be fan-out/fan-in. You'd retrieve the full list of Products (let's say there are 10,000 of them) and process each one in its own Activity, but benefitting from a natural degree of parallelisation.

    public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
    {
        ZaraProduct[] itemsToProcess = await context.CallActivityAsync<BatchResult>("FetchZaraProducts_ButDontProcessThem", input);
    
        var asParallelTasks = itemsToProcess
            .Select(item => context.CallActivityAsync<ZaraProduct>("ProcessSingleZaraProduct_AndReportStatus", item))
            .ToArray();
        
        await Task.WhenAll(asParallelTasks);
    }
    

    Note that the Orchestration will stop once it's processed all the current products. You would also need to re-trigger the Orchestration at some point later; either with a timer, or even combining this approach with the Eternal Orchestration pattern above.

    3. Combining Eternal Orchestration with Fan-Out/Fan-In

    You don't give enough information about the use-case, so I'm having to guess what you want. But let's say you want the process to be:

    1. Get all the products (like the Fan example)
    2. Process them all (also the Fan example)
    3. Wait 5 minutes
    4. Run again to process any further products (like the Eternal example)

    ...then the code might look like this:

    public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
    {
        // Fan-Out/Fan-In
        ZaraProduct[] itemsToProcess = await context.CallActivityAsync<BatchResult>("FetchZaraProducts_ButDontProcessThem", input);
    
        var asParallelTasks = itemsToProcess
            .Select(item => context.CallActivityAsync<ZaraProduct>("ProcessSingleZaraProduct_AndReportStatus", item))
            .ToArray();
        
        await Task.WhenAll(asParallelTasks);
    
        // Wait 5 minutes.
        DateTime nextCleanup = context.CurrentUtcDateTime.AddMinutes(5);
        await context.CreateTimer(nextCleanup, CancellationToken.None);
    
        // Repeat Eternally
        context.ContinueAsNew(input);
    }
    

    Hope that helps!