I am having very bad experiencie with durable functions, So the case is that I have the following code:
public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context, string input)
{
int limit = 200;
int offset = 0;
bool isLastPage = false;
var options = TaskOptions.FromRetryPolicy(new RetryPolicy(
maxNumberOfAttempts: 3,
firstRetryInterval: TimeSpan.FromSeconds(5)));
while (!isLastPage)
{
try
{
var batchResult = await context.CallActivityAsync<BatchResult>("FetchZaraProductsBatch", new BatchRequest
{
Limit = limit,
Offset = offset,
Query = input,
Section = input
}, options: options);
isLastPage = batchResult.IsLastPage;
offset = batchResult.lastPickedPosition;
ReportStatus(context, batchResult.ClothingItems.Count, offset, input);
}
catch (Exception ex)
{
_logger.LogError($"Failed to fetch batch after retries. Error: {ex.Message}");
throw;
}
}
}
So that durable function should handle x invokes of the activity function, and yet after it is deployed to cloud it triggers for example one run per 5 minutes of one invoke and then it runs 6 runs, and then it stops, so it doesn't even run for a 5 minutes. My second question is how can I increase timeout to 10 minutes, I tried with host.json but it doesnt work.
First I'll describe some things that you seem to have misunderstood, so it's easier to understand. And then at the end I'll give you an example of how you could re-write your code.
There are several problems.
Some may contribute to the problem you're seeing, while others will cause your Function to fail in different ways. But they all need addressing. I'll explain later in more detail, but this is the summary:
while
loop in a Durable Function Orchestration. The replay history will eventually fill up and break the Durable Orchestration entirely.^1while
loop means that it's one continuous orchestration run. The Orchestration may choose to re-run the same Activity call with the same parameters... And when it does this, it expects the same result^2. You haven't shown your Activity code, but I suspect it's non-deterministic; i.e. it can return different results each time you call it. This will cause lots of strange problems.ReportStatus
method needs to be inside another Activity Function. I'll explain below, but this is because the Orchestration effectively executes each code line more than once (the "replay" effect) meaning that it will call ReportStatus
repeatedly with the same values. It will not behave how you expect.In other words there's nothing wrong with Durable Functions here. But rather, you need to re-think how to use them.
You'll understand the above problems better if you read up on how Durable Orchestrations use their replay history.^3
A brief precis on how the Durable Functions work: they physically exit from memory each time you await
an Activity. When they resume after an Activity, they actually begin right from the start again. This is hard to get your head around, because you'd think "that can't be right, because it seems to carry on from just after the await?"
But the magic is: they check the replay history to see which parts they have already completed. They can therefore skip any Activities that alreayd happened, so it feels like they have resumed from where they left off.
I can see two possible approaches:
while
loop.I'll give a brief re-write of your code for each of those two approaches.
Note that I've made some assumptions and simplified the example so it's easier to see the key points (e.g. I removed the try..catch and the retry logic.)
We can remove your unsafe while
loop, and instead use the ContinueAsNew
method. This causes the Orchestration to repeat in a safe way, passing the new offset each time:
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
{
bool isLastPage = false;
var batchResult = await context.CallActivityAsync<BatchResult>("FetchZaraProductsBatch", new BatchRequest
{
Limit = 200,
Offset = offset,
Query = input,
Section = input
};
isLastPage = batchResult.IsLastPage;
offset = batchResult.lastPickedPosition;
await context.CallActivityAsync("ReportStatus" /*...etc...*/);
if (!isLastPage)
{
context.ContinueAsNew(input, offset); // This is the important bit.
}
}
You haven't shown what your FetchZaraProductsBatch
is doing, so it's hard to say if this is the right approach. But let's assume your process is something like:
Your code at the moment would therefore be processing 200 products in each batch, but only sequentially, i.e. one at a time. Let's assume this takes 10 seconds to process each Product. Your Activity Function would take 2,000 seconds, which would cause a timeout. The process would fail.
A better approach might be fan-out/fan-in. You'd retrieve the full list of Products (let's say there are 10,000 of them) and process each one in its own Activity, but benefitting from a natural degree of parallelisation.
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
{
ZaraProduct[] itemsToProcess = await context.CallActivityAsync<BatchResult>("FetchZaraProducts_ButDontProcessThem", input);
var asParallelTasks = itemsToProcess
.Select(item => context.CallActivityAsync<ZaraProduct>("ProcessSingleZaraProduct_AndReportStatus", item))
.ToArray();
await Task.WhenAll(asParallelTasks);
}
Note that the Orchestration will stop once it's processed all the current products. You would also need to re-trigger the Orchestration at some point later; either with a timer, or even combining this approach with the Eternal Orchestration pattern above.
You don't give enough information about the use-case, so I'm having to guess what you want. But let's say you want the process to be:
...then the code might look like this:
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, string input, int offset = 0)
{
// Fan-Out/Fan-In
ZaraProduct[] itemsToProcess = await context.CallActivityAsync<BatchResult>("FetchZaraProducts_ButDontProcessThem", input);
var asParallelTasks = itemsToProcess
.Select(item => context.CallActivityAsync<ZaraProduct>("ProcessSingleZaraProduct_AndReportStatus", item))
.ToArray();
await Task.WhenAll(asParallelTasks);
// Wait 5 minutes.
DateTime nextCleanup = context.CurrentUtcDateTime.AddMinutes(5);
await context.CreateTimer(nextCleanup, CancellationToken.None);
// Repeat Eternally
context.ContinueAsNew(input);
}
Hope that helps!