I am working on an Azure Function that uses a ServiceBusTrigger and queries Azure Table Storage. In order to process multiple messages as quickly as possible we're using the MaxConcurrentCalls setting to enable parallel message processing (e.g. we set MaxConcurrentCalls to 200). We're using Managed Identity to access the Service bus and the Azure Table Storage via DefaultAzureCredential.
Performance testing shows that multiple instances of the Function app are instantiated and are processing messages as expected, however each instance makes a call to the Azure /msi/token endpoint to obtain a ManagedIdentityCredential, and this call is the bottleneck, taking anywhere from 200ms to 5000ms. I.e using the above setting, if 200 messages get dropped onto the service bus then 200 "instances" of the azure function will start processing them, and make 200 calls to get a ManagedIdentityCredential.
We're using Python as programming language.
this is the code to initialize the Azure resources
# helper function to initialize global table service client
def init_azure_resource_clients(config_settings: EligibilitySettings):
"""get table service client for Azure Table Storage and service bus client"""
non_aio_credential = DefaultAzureCredential()
# initialize global Service Bus client
global _azure_servicebus_client
_azure_servicebus_client = ServiceBusClient(fully_qualified_namespace=config_settings.serviceBusNamespaceFQ, credential=non_aio_credential)
# initialize global Table Service Client
global _azure_table_service_client
# prefer connection string if available
if config_settings.tableStorageConnectionString:
_azure_table_service_client = TableServiceClient.from_connection_string(conn_str=config_settings.tableStorageConnectionString)
else:
_azure_table_service_client = TableServiceClient(endpoint=f"https://{config_settings.tableStorageAccount}.table.core.windows.net", credential=non_aio_credential)
And here is some sample code on how is it called:
import logging
import azure.functions as func
# global reference to the azure resources we need to access
_azure_table_service_client = None
_azure_servicebus_client = None
app = func.FunctionApp()
@app.function_name(name="ServiceBusQueueTrigger1")
@app.service_bus_queue_trigger(arg_name="msg",
queue_name="<QUEUE_NAME>",
connection="<CONNECTION_SETTING>")
def test_function(msg: func.ServiceBusMessage):
logging.info('ServiceBus queue trigger processed message: %s',
msg.get_body().decode('utf-8'))
# initialize global azure resources
init_azure_resource_clients(config_settings)
# parse incoming message
message_body = msg.get_body().decode('utf-8')
message_json = json.loads(message_body)
result = process_message(message_json)
To prevent each function instance from making a call to the /msi/token
endpoint when obtaining a ManagedIdentityCredential
, initialize DefaultAzureCredential
once and cache it in _credential
. This prevents redundant calls to /msi/token
for each message.
Additionally, ensure that the Storage Table Data Contributor
and Storage Table Data Reader
roles are assigned.
Refer to this link for guidance on using DefaultAzureCredential()
in an Azure Function with Python.
Below sample code is using DefaultAzureCredential()
in Azure Function with Python
import azure.functions as func
import logging
import json
import asyncio
from azure.identity import DefaultAzureCredential, TokenCachePersistenceOptions
from azure.servicebus.aio import ServiceBusClient
from azure.data.tables.aio import TableServiceClient
_credential = None
_servicebus_client = None
_table_service_client = None
def get_azure_credential():
"""Initialize and cache the Azure Managed Identity credential."""
global _credential
if _credential is None:
logging.info("Initializing Managed Identity Credential with token caching.")
_credential = DefaultAzureCredential(cache_persistence_options=TokenCachePersistenceOptions())
return _credential
async def init_azure_resource_clients():
"""Initialize Azure Service Bus and Table Storage clients asynchronously."""
global _servicebus_client, _table_service_client
credential = get_azure_credential()
if _servicebus_client is None:
_servicebus_client = ServiceBusClient(
fully_qualified_namespace="SERviceBusName.servicebus.windows.net",
credential=credential
)
logging.info("Initialized Azure Service Bus Client.")
if _table_service_client is None:
_table_service_client = TableServiceClient(
endpoint="https://ravitewaja.table.core.windows.net",
credential=credential
)
logging.info("Initialized Azure Table Storage Client.")
app = func.FunctionApp()
@app.function_name(name="ServiceBusQueueTrigger")
@app.service_bus_queue_trigger(
arg_name="msg",
queue_name="queue_name",
connection="CONNECTION_SETTING"
)
async def service_bus_trigger(msg: func.ServiceBusMessage):
"""Function triggered by Service Bus messages."""
logging.info("Processing Service Bus message.")
await init_azure_resource_clients()
message_body = msg.get_body().decode("utf-8").strip()
logging.info(f"Received Message: {message_body}")
# Handle empty message
if not message_body:
logging.warning("Received an empty message. Skipping processing.")
return
try:
message_json = json.loads(message_body)
except json.JSONDecodeError as e:
logging.error(f"Failed to decode JSON: {e}")
return
await process_message(message_json)
async def process_message(message_json):
"""Processes the Service Bus message and interacts with Azure Table Storage."""
global _table_service_client
if _table_service_client is None:
logging.error("Table Service Client is not initialized.")
return
try:
table_name = "sampath"
table_client = _table_service_client.get_table_client(table_name)
entity = {
"PartitionKey": "Messages",
"RowKey": message_json.get("id", "default"),
"Message": json.dumps(message_json)
}
await table_client.upsert_entity(entity)
logging.info("Message successfully written to Azure Table Storage.")
except Exception as e:
logging.error(f"Error processing message: {e}")
Output: