azureazureservicebusazure-container-appskedaazure-container-app-jobs

How can I access message details within an Azure Container App Job configured for event-based scaling?


I have configured an Azure Container App Job with event-based scaling using messages from an Azure Service Bus queue. Is it possible to access the message details (e.g., message body, message ID, session ID, metadata) inside the container app job when it's triggered by these events? If so, how can I achieve this?


Solution

  • Yes, it is possible to access message details such as the message body, message ID, session ID, and metadata inside an Azure Container App Job triggered by Azure Service Bus events.

    Here’s how you can achieve this:

    Use the Azure Service Bus SDK to connect to the Service Bus queue and process messages. This allows you to retrieve details like the message body, message ID, session ID, and metadata during processing using logging.

    Below sample example code is to processes messages in a session-enabled queue and logs the message ID, session ID, body, and application properties.

    import os
    import asyncio
    import logging
    from azure.servicebus.aio import ServiceBusClient
    from azure.servicebus.exceptions import SessionLockLostError
    
    logging.basicConfig(
        level=logging.INFO,  # Use INFO or DEBUG level for more verbosity
        format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
        handlers=[
            logging.StreamHandler(),  # Logs to stdout
            logging.FileHandler("application.log")  # Logs to a file
        ]
    )
    logger = logging.getLogger("ContainerAppsLogger")
    SERVICE_BUS_CONNECTION_STRING = "AzureServiceConnectionString"
    QUEUE_NAME = "QUEUENAME"
    
    async def process_message(message):
        """Process a single message."""
        try:
            if hasattr(message.body, "__iter__") and not isinstance(message.body, (str, bytes)):
                body = b"".join(message.body)
            else:
                body = message.body
            logger.info(f"Processing message ID: {message.message_id}")
            logger.info(f"Session ID: {message.session_id}")
            logger.info(f"Body: {body.decode('utf-8') if isinstance(body, bytes) else body}")
            logger.info(f"Application Properties: {message.application_properties}")
    
        except Exception as e:
            logger.error(f"Error processing message: {e}")
    
    async def receive_messages():
        """Receive and process messages from the session-enabled queue."""
        logger.info("Starting message receiver...")
        async with ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING) as client:
            while True:
                try:
                    receiver = client.get_queue_receiver(queue_name=QUEUE_NAME, session_id="1")
                    async with receiver:
                        session = receiver.session
                        logger.info(f"Session started with ID: {session.session_id}")
                        logger.info(f"Session State: {await session.get_state()}")
    
                        async for message in receiver:
                            await process_message(message)
                            await receiver.complete_message(message)
                            logger.info("Message processed successfully.")
                            await session.renew_lock()  # Renew the session lock frequently
                except SessionLockLostError as e:
                    logger.warning("Session lock lost, re-establishing the session.")
                    await asyncio.sleep(1)  # Add a short delay before retrying
                except Exception as e:
                    logger.error(f"Unexpected error: {e}")
                    await asyncio.sleep(1)
    
    if __name__ == "__main__":
        logger.info("Application starting...")
        try:
            asyncio.run(receive_messages())
        except KeyboardInterrupt:
            logger.info("Application shutting down...")
        except Exception as e:
            logger.critical(f"Critical error: {e}")
    
    

    Refer to this documentation for enabling sessions in Azure Service Bus and sending and receiving messages with a session ID.

    Build the Image:

    docker build -t <registry-name>.azurecr.io/myapp1:v2 .
    
    

    docker build -t .azurecr.io/myapp1:v2

    Log in to an Azure Container Registry (ACR) using the following command, then push the image:

    az acr login --name <ACR_NAME>
    

    az acr login --name <ACR_NAME>

    Use the az containerapp job command to create a Container App Job, or do it via the Azure portal with the respective docker image.

    Azure portal with Container App Job

    Start the job and send a message to the Service Bus Queue.

    end a message to the Service Bus Queue

    You should be able to see the logs of the Container App Job in the Execution History/Logs section.

     Execution History

    To query Azure Log Analytics for logs that include the message body, message ID, session ID, and metadata, you can filter and project the relevant information from the ContainerAppConsoleLogs_CL table/log source. Here's the query:

    ContainerAppConsoleLogs_CL
    | where ContainerGroupName_s startswith "ravi56778554teja-"
    | where Log_s contains "Message ID" or Log_s contains "Session ID" or Log_s contains "Body" or Log_s contains "Application Properties"
    | project TimeGenerated, Log_s
    | order by TimeGenerated asc
    

    ContainerAppConsoleLogs_CL

    Refer to this link for deploying an event-driven job with Azure Container Apps.

    For more details, refer to this link for Application Logging in Azure Container Apps.