azureazure-functionsazure-servicebus-queues

How to do throttling for Azure Function through code


We have lots of Azure Functions for Service Bus within a single Azure Functions instance/project. The problem is that some Azure Service Bus has very high load and thus cause a resource starvation. Is it possible to throttle those high load Azure Functions individually through code? If possible, what approaches can we use?


Solution

  • I agree with @Sean Feldman

    Reference here.

    Target-based scaling enables the scaling up of four instances simultaneously, with the scaling determination relying on a straightforward equation:

    To illustrate, the equation is as follows: desired instances = event source length / target executions per instance.

    The default values for target executions per instance are derived from the SDKs employed by Azure Functions extensions. No adjustments are necessary for target-based scaling to function effectively.

    When several functions within the same function app simultaneously request scaling out, the sum of those requests across functions is calculated to determine the adjustment in desired instances. Requests to scale out take precedence over requests to scale in.

    In cases where there are scale-in requests without corresponding scale-out requests, the maximum scale-in value is utilized.

    Azure Functions enables you to set the maximum concurrent executions per function, allowing you to manage simultaneous executions effectively and prevent resource depletion. You can configure concurrent limits in your host.json file like below:-

    {
      "version": "2.0",
      "logging": {
        "applicationInsights": {
          "samplingSettings": {
            "isEnabled": true,
            "excludedTypes": "Request"
          },
          "enableLiveMetricsFilters": true
        }
      },
      "extensions": {
        "queues": {
          "maxConcurrentCalls": 16, 
          "visibilityTimeout": "00:05:00" 
        }
      }
    }
    

    Additionally, Implement backpressure, rate limiting, and the circuit breaker pattern in your functions to handle high loads effectively. Backpressure slows down or temporarily stops message acceptance when load exceeds a threshold. Rate limiting controls message processing rates to prevent overwhelming downstream systems. The circuit breaker pattern halts message processing temporarily during consistent failures or high loads, preventing cascading failures.

    Here's my Azure Functions Service bus queue trigger C# code implementing backpressure handling, rate limiting, and circuit breaker logic:-

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Host;
    using Microsoft.Extensions.Logging;
    
    namespace FunctionApp7
    {
        public class Function1
        {
            private static readonly Random _random = new Random();
            private static bool _circuitBreakerOpen = false;
            private static DateTime _circuitBreakerResetTime = DateTime.MinValue;
            private static int _concurrencyLimit = 16; // Initial concurrency limit
            private static SemaphoreSlim _rateLimitSemaphore = new SemaphoreSlim(10, 10); // Initial rate limit
    
            [FunctionName("Function1")]
            public async Task RunAsync([ServiceBusTrigger("myqueue", Connection = "conn")] string myQueueItem, ILogger log)
            {
                try
                {
                    // Circuit breaker logic
                    if (_circuitBreakerOpen && DateTime.UtcNow < _circuitBreakerResetTime)
                    {
                        log.LogWarning("Circuit breaker is open. Message processing skipped.");
                        return;
                    }
    
                    // Rate limiting
                    await _rateLimitSemaphore.WaitAsync();
    
                    // Backpressure handling
                    if (Interlocked.Increment(ref _concurrencyLimit) > 20) // Adjust as needed
                    {
                        log.LogWarning("Concurrency limit reached. Backing off.");
                        await Task.Delay(TimeSpan.FromSeconds(5)); // Adjust backoff duration as needed
                        Interlocked.Decrement(ref _concurrencyLimit);
                        return;
                    }
    
                    // Simulate processing
                    await SimulateProcessing();
    
                    log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
                }
                catch (Exception ex)
                {
                    log.LogError(ex, $"Error processing message: {myQueueItem}");
    
                    // Implement circuit breaker logic here
                    _circuitBreakerOpen = true;
                    _circuitBreakerResetTime = DateTime.UtcNow.AddMinutes(5); // Reset circuit breaker after 5 minutes
                }
                finally
                {
                    _rateLimitSemaphore.Release();
                    Interlocked.Decrement(ref _concurrencyLimit);
                }
            }
    
            private async Task SimulateProcessing()
            {
                // Simulate processing time
                await Task.Delay(_random.Next(100, 1000));
            }
        }
    }
    

    Output:-

    enter image description here