I have configured an Azure Container App Job with event-based scaling using messages from an Azure Service Bus queue. Is it possible to access the message details (e.g., message body, message ID, session ID, metadata) inside the container app job when it's triggered by these events? If so, how can I achieve this?
Yes, it is possible to access message details such as the message body, message ID, session ID, and metadata inside an Azure Container App Job triggered by Azure Service Bus events.
Here’s how you can achieve this:
Use the Azure Service Bus SDK to connect to the Service Bus queue and process messages. This allows you to retrieve details like the message body, message ID, session ID, and metadata during processing using logging.
Below sample example code is to processes messages in a session-enabled queue and logs the message ID, session ID, body, and application properties.
import os
import asyncio
import logging
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.exceptions import SessionLockLostError
logging.basicConfig(
level=logging.INFO, # Use INFO or DEBUG level for more verbosity
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
handlers=[
logging.StreamHandler(), # Logs to stdout
logging.FileHandler("application.log") # Logs to a file
]
)
logger = logging.getLogger("ContainerAppsLogger")
SERVICE_BUS_CONNECTION_STRING = "AzureServiceConnectionString"
QUEUE_NAME = "QUEUENAME"
async def process_message(message):
"""Process a single message."""
try:
if hasattr(message.body, "__iter__") and not isinstance(message.body, (str, bytes)):
body = b"".join(message.body)
else:
body = message.body
logger.info(f"Processing message ID: {message.message_id}")
logger.info(f"Session ID: {message.session_id}")
logger.info(f"Body: {body.decode('utf-8') if isinstance(body, bytes) else body}")
logger.info(f"Application Properties: {message.application_properties}")
except Exception as e:
logger.error(f"Error processing message: {e}")
async def receive_messages():
"""Receive and process messages from the session-enabled queue."""
logger.info("Starting message receiver...")
async with ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING) as client:
while True:
try:
receiver = client.get_queue_receiver(queue_name=QUEUE_NAME, session_id="1")
async with receiver:
session = receiver.session
logger.info(f"Session started with ID: {session.session_id}")
logger.info(f"Session State: {await session.get_state()}")
async for message in receiver:
await process_message(message)
await receiver.complete_message(message)
logger.info("Message processed successfully.")
await session.renew_lock() # Renew the session lock frequently
except SessionLockLostError as e:
logger.warning("Session lock lost, re-establishing the session.")
await asyncio.sleep(1) # Add a short delay before retrying
except Exception as e:
logger.error(f"Unexpected error: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
logger.info("Application starting...")
try:
asyncio.run(receive_messages())
except KeyboardInterrupt:
logger.info("Application shutting down...")
except Exception as e:
logger.critical(f"Critical error: {e}")
Refer to this documentation for enabling sessions in Azure Service Bus and sending and receiving messages with a session ID.
Build the Image:
docker build -t <registry-name>.azurecr.io/myapp1:v2 .
Log in to an Azure Container Registry (ACR) using the following command, then push the image:
az acr login --name <ACR_NAME>
Use the az containerapp job
command to create a Container App Job, or do it via the Azure portal with the respective docker image.
Start the job and send a message to the Service Bus Queue.
You should be able to see the logs of the Container App Job in the Execution History/Logs section.
To query Azure Log Analytics for logs that include the message body, message ID, session ID, and metadata, you can filter and project the relevant information from the ContainerAppConsoleLogs_CL
table/log source. Here's the query:
ContainerAppConsoleLogs_CL
| where ContainerGroupName_s startswith "ravi56778554teja-"
| where Log_s contains "Message ID" or Log_s contains "Session ID" or Log_s contains "Body" or Log_s contains "Application Properties"
| project TimeGenerated, Log_s
| order by TimeGenerated asc
Refer to this link for deploying an event-driven job with Azure Container Apps.
For more details, refer to this link for Application Logging in Azure Container Apps.