pythonazurefunctionazureservicebusazure-managed-identity

Azure function with service bus trigger - ManagedIdentityCredential performance


I am working on an Azure Function that uses a ServiceBusTrigger and queries Azure Table Storage. In order to process multiple messages as quickly as possible we're using the MaxConcurrentCalls setting to enable parallel message processing (e.g. we set MaxConcurrentCalls to 200). We're using Managed Identity to access the Service bus and the Azure Table Storage via DefaultAzureCredential.

Performance testing shows that multiple instances of the Function app are instantiated and are processing messages as expected, however each instance makes a call to the Azure /msi/token endpoint to obtain a ManagedIdentityCredential, and this call is the bottleneck, taking anywhere from 200ms to 5000ms. I.e using the above setting, if 200 messages get dropped onto the service bus then 200 "instances" of the azure function will start processing them, and make 200 calls to get a ManagedIdentityCredential.

  1. What is the mechanism behind how Azure functions are processing messages concurrently, will it create multiple processes or multiple threads within the same process?
  2. is there a way to share/cache the credential once it's obtained to be used by the other message processing instances as well and eliminate the redundant calls to /msi/token ?

We're using Python as programming language.

this is the code to initialize the Azure resources

# helper function to initialize global table service client
def init_azure_resource_clients(config_settings: EligibilitySettings):
    """get table service client for Azure Table Storage and service bus client"""

    non_aio_credential = DefaultAzureCredential()

    # initialize global Service Bus client
    global _azure_servicebus_client
    _azure_servicebus_client = ServiceBusClient(fully_qualified_namespace=config_settings.serviceBusNamespaceFQ, credential=non_aio_credential)

    # initialize global Table Service Client
    global _azure_table_service_client
    # prefer connection string if available
    if config_settings.tableStorageConnectionString:
        _azure_table_service_client = TableServiceClient.from_connection_string(conn_str=config_settings.tableStorageConnectionString)
    else:
        _azure_table_service_client = TableServiceClient(endpoint=f"https://{config_settings.tableStorageAccount}.table.core.windows.net", credential=non_aio_credential)

And here is some sample code on how is it called:

import logging
import azure.functions as func

# global reference to the azure resources we need to access
_azure_table_service_client = None
_azure_servicebus_client = None

app = func.FunctionApp()

@app.function_name(name="ServiceBusQueueTrigger1")
@app.service_bus_queue_trigger(arg_name="msg", 
                           queue_name="<QUEUE_NAME>", 
                           connection="<CONNECTION_SETTING>")
def test_function(msg: func.ServiceBusMessage):
    
    logging.info('ServiceBus queue trigger processed message: %s',
             msg.get_body().decode('utf-8'))
            
    # initialize global azure resources
    init_azure_resource_clients(config_settings)

    # parse incoming message
    message_body = msg.get_body().decode('utf-8')
    message_json = json.loads(message_body)

    result = process_message(message_json) 

Solution

  • To prevent each function instance from making a call to the /msi/token endpoint when obtaining a ManagedIdentityCredential, initialize DefaultAzureCredential once and cache it in _credential. This prevents redundant calls to /msi/token for each message.

    Additionally, ensure that the Storage Table Data Contributor and Storage Table Data Reader roles are assigned.

    Refer to this link for guidance on using DefaultAzureCredential() in an Azure Function with Python.

    Below sample code is using DefaultAzureCredential() in Azure Function with Python

    import azure.functions as func
    import logging
    import json
    import asyncio
    from azure.identity import DefaultAzureCredential, TokenCachePersistenceOptions
    from azure.servicebus.aio import ServiceBusClient
    from azure.data.tables.aio import TableServiceClient
    
    _credential = None
    _servicebus_client = None
    _table_service_client = None
    
    def get_azure_credential():
        """Initialize and cache the Azure Managed Identity credential."""
        global _credential
        if _credential is None:
            logging.info("Initializing Managed Identity Credential with token caching.")
            _credential = DefaultAzureCredential(cache_persistence_options=TokenCachePersistenceOptions())
        return _credential
    
    async def init_azure_resource_clients():
        """Initialize Azure Service Bus and Table Storage clients asynchronously."""
        global _servicebus_client, _table_service_client
    
        credential = get_azure_credential()
    
        if _servicebus_client is None:
            _servicebus_client = ServiceBusClient(
                fully_qualified_namespace="SERviceBusName.servicebus.windows.net",
                credential=credential
            )
            logging.info("Initialized Azure Service Bus Client.")
    
        if _table_service_client is None:
            _table_service_client = TableServiceClient(
                endpoint="https://ravitewaja.table.core.windows.net",
                credential=credential
            )
            logging.info("Initialized Azure Table Storage Client.")
    
    app = func.FunctionApp()
    
    @app.function_name(name="ServiceBusQueueTrigger")
    @app.service_bus_queue_trigger(
        arg_name="msg",
        queue_name="queue_name",
        connection="CONNECTION_SETTING"
    )
    async def service_bus_trigger(msg: func.ServiceBusMessage):
        """Function triggered by Service Bus messages."""
        logging.info("Processing Service Bus message.")
    
        await init_azure_resource_clients()
        message_body = msg.get_body().decode("utf-8").strip()
    
        logging.info(f"Received Message: {message_body}")
    
        # Handle empty message
        if not message_body:
            logging.warning("Received an empty message. Skipping processing.")
            return
        
        try:
            message_json = json.loads(message_body)
        except json.JSONDecodeError as e:
            logging.error(f"Failed to decode JSON: {e}")
            return
        await process_message(message_json)
    
    async def process_message(message_json):
        """Processes the Service Bus message and interacts with Azure Table Storage."""
        global _table_service_client
    
        if _table_service_client is None:
            logging.error("Table Service Client is not initialized.")
            return
    
        try:
            table_name = "sampath"
            table_client = _table_service_client.get_table_client(table_name)
            entity = {
                "PartitionKey": "Messages",
                "RowKey": message_json.get("id", "default"),
                "Message": json.dumps(message_json)
            }
    
            await table_client.upsert_entity(entity)
            logging.info("Message successfully written to Azure Table Storage.")
    
        except Exception as e:
            logging.error(f"Error processing message: {e}")
    
    

    Output:

    Azure function

    Azure function