I am using Azure Container Apps Jobs with an event driven trigger through Azure Event Hubs with blobMetadata
as the check point strategy. The job gets triggered as it should, the check point store gets updated by the job, as it should. The problem is that the jobs get triggered immediately after finishing in an endless loop, even though there ARE no new events. This I have confirmed through the logs of the job, for the first run, all events are logged, all succeeding events has no event logs at all.
Here is the eventTriggerConfig
of the job:
eventTriggerConfig: {
parallelism: 1
replicaCompletionCount: 1
scale: {
rules: [
{
name: 'event-hub-trigger'
type: 'azure-eventhub'
auth: [
{
secretRef: 'event-hub-connection-string'
triggerParameter: 'connection'
}
{
secretRef: 'storage-account-connection-string'
triggerParameter: 'storageConnection'
}
]
metadata: {
blobContainer: containerName
checkPointStrategy: 'blobMetadata'
consumerGroup: eventHubConsumerGroupName
eventHubName: eventHubName
connectionFromEnv: 'EVENT_HUB_CONNECTION_STRING'
storageConnectionFromEnv: 'STORAGE_ACCOUNT_CONNECTION_STRING'
activationUnprocessedEventThreshold: 1
unprocessedEventThreshold: 5
}
}
]
}
}
Here is the Python based job logic:
import asyncio
from datetime import datetime, timedelta, timezone
import logging
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.identity.aio import DefaultAzureCredential
BLOB_STORAGE_ACCOUNT_URL = os.getenv("BLOB_STORAGE_ACCOUNT_URL")
BLOB_CONTAINER_NAME = os.getenv("BLOB_CONTAINER_NAME")
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = os.getenv("EVENT_HUB_FULLY_QUALIFIED_NAMESPACE")
EVENT_HUB_NAME = os.getenv("EVENT_HUB_NAME")
EVENT_HUB_CONSUMER_GROUP = os.getenv("EVENT_HUB_CONSUMER_GROUP")
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
credential = DefaultAzureCredential()
# Global variable to track the last event time
last_event_time = None
WAIT_DURATION = timedelta(seconds=30)
async def on_event(partition_context, event):
global last_event_time
if event is not None:
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
else:
print(f"Received a None event from partition ID: {partition_context.partition_id}")
# Update the last event time
last_event_time = datetime.now(timezone.utc)
await partition_context.update_checkpoint(event)
async def receive():
global last_event_time
checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=credential,
)
client = EventHubConsumerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
consumer_group=EVENT_HUB_CONSUMER_GROUP,
checkpoint_store=checkpoint_store,
credential=credential,
)
# Initialize the last event time
last_event_time = datetime.now(timezone.utc)
async with client:
# client.receive method is a blocking call, so we run it in a separate thread.
receive_task = asyncio.create_task(
client.receive(
on_event=on_event,
starting_position="-1",
)
)
# Wait until no events are received for the specified duration
while True:
await asyncio.sleep(1)
if datetime.now(timezone.utc) - last_event_time > WAIT_DURATION:
break
# Close the client and the receive task
await client.close()
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
# Close credential when no longer needed.
await credential.close()
def run():
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())
The job depends on azure-eventhub-checkpointstoreblob-aio
(1.2.0) and azure-identity
(1.21.0).
I finally found the root cause, it was the casing off "checkpoint", checkPointStrategy
should be checkpointStrategy
. If anyone stumbles upon this, this configuration worked for me:
metadata: {
blobContainer: containerName
checkpointStrategy: 'blobMetadata'
consumerGroup: eventHubConsumerGroupName
eventHubName: eventHubName
connectionFromEnv: 'EVENT_HUB_CONNECTION_STRING'
storageConnectionFromEnv: 'STORAGE_ACCOUNT_CONNECTION_STRING'
activationUnprocessedEventThreshold: '0'
unprocessedEventThreshold: '5'
}
Here is a working example project.