azureazure-functionsazure-durable-functions

How to batch 3 incoming HTTP chat messages in Azure Durable Functions (Isolated .NET)?


I'm trying to process chat messages using Azure Durable Functions (in the .NET Isolated Worker). Sometimes 3 or more messages are sent within a short timeframe, and I want to batch or debounce them together before processing.

Here’s what I have so far:

However, I’m not sure if this is the correct approach or if there’s a best practice I’m missing. Also, I’m not certain about how to handle concurrency or if I need any special checks to avoid race conditions.


HTTP Function (OnNewChatMessage.cs)

using System.Net;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.DurableTask;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

public class OnNewChatMessage
{
    [Function("OnNewChatMessage")]
    public async Task<HttpResponseData> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = "chats/{chatId}/message")] 
        HttpRequestData req,
        string chatId,
        FunctionContext executionContext,
        [DurableClient] IDurableClient client)
    {
        var logger = executionContext.GetLogger<OnNewChatMessage>();

        // Parse the message text (assuming raw text in the body)
        string messageText = await req.ReadAsStringAsync();
        
        // We'll use a consistent instance ID per chat
        string instanceId = $"chat-{chatId}";

        // Start the Orchestrator if not already running
        var status = await client.GetStatusAsync(instanceId);
        if (status == null || status.RuntimeStatus is OrchestrationRuntimeStatus.Completed or OrchestrationRuntimeStatus.Failed or OrchestrationRuntimeStatus.Terminated)
        {
            await client.StartNewAsync(nameof(ChatMessageBatchOrchestrator), instanceId, null);
            logger.LogInformation("Started new orchestrator for ChatId = {chatId}", chatId);
        }

        // Raise an event with the new message text
        await client.RaiseEventAsync(instanceId, "NewMessage", messageText);
        logger.LogInformation("Raised NewMessage event for ChatId = {chatId} with text '{messageText}'", chatId, messageText);

        var response = req.CreateResponse(HttpStatusCode.Accepted);
        await response.WriteStringAsync($"Queued message '{messageText}' for chat '{chatId}'.");
        return response;
    }
}

Orchestrator (ChatMessageBatchOrchestrator.cs)

using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.DurableTask;
using Microsoft.Extensions.Logging;

public class ChatMessageBatchOrchestrator
{
    [Function(nameof(ChatMessageBatchOrchestrator))]
    public async Task Run(
        [OrchestrationTrigger] TaskOrchestrationContext context,
        ILogger logger)
    {
            var messages = context.GetInput<List<string>>() ?? new List<string>();

            var timeout = context.CurrentUtcDateTime.AddSeconds(2);
            await context.CreateTimer(timeout, CancellationToken.None);


            // call ProcessChatBatchActivity
    }
}

public class ProcessChatBatchActivity
{
    [Function(nameof(ProcessChatBatch))]
    public void ProcessChatBatch([ActivityTrigger] List<string> messages, FunctionContext context)
    {
       // logic
    }
}

Key points/questions:

  1. Is this the recommended pattern for simply batching?
  2. Do I need to worry about concurrency if multiple HTTP messages come in simultaneously? (It seems the orchestrator handles them in the order they arrive, but is there a race condition possible?)
  3. Should I handle the possibility of the orchestrator finishing before all messages come in? (Right now, I restart a new orchestrator if the status is Completed.)
  4. Is there a better approach to gather N messages and then do the work in one shot?

Solution

  • I'm not sure if it's the best approach, but it doesn't feel like the worst approach either. I'll try to answer questions 2 and 3.

    Do I need to worry about concurrency if multiple HTTP messages come in simultaneously? (It seems the orchestrator handles them in the order they arrive, but is there a race condition possible?)

    No, Durable Functions will handle the events one by one. The order in which they are processed could be different than the order they arrived due to some request handling taking longer than others etc.

    Should I handle the possibility of the orchestrator finishing before all messages come in? (Right now, I restart a new orchestrator if the status is Completed.)

    This part on the other hand feels like a race condition. You could check if the instance does not exist or if it has completed just as it is reaching the last steps. Now then when you send an event, it won't be processed due the instance having gone away.

    I'm thinking a Durable Entity could be one option where you add the messages to an entity and then start an orchestrator to process all messages on the entity after a delay?

    Or you could have your orchestrator be an "eternal orchestrator" where the orchestrator restarts itself as it completes. Waiting for an event does not consume any resources other than one entry in the instances table and a few rows in the history table. This way you could always just send the event and it would get handled.