I can programmatically receive telemetry from Azure IoT Hub using Event Hub, like so:
async def receive_events(consumer_client: EventHubConsumerClient):
async def on_event(partition_context: Any, event: EventData) -> None:
print(event.body_as_str())
...
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONN_STR,
consumer_group="$Default"
)
asyncio.run(receive_events(consumer_client))
...
But - the device_id doesn't seem to be there? How can I find it progammatically?
If that's not possible, the only workaround I can think of it is to list all devices and create a separate event stream for each one, so that I know the device_id when it comes in. Is there a simpler solution? How do I solve this?
You can programmatically extract the device ID
when receiving telemetry from Azure IoT Hub
using Event Hub
without manually adding it to the message payload.
When routing messages of device sends telemetry to Azure IoT Hub
, the message metadata (system properties) automatically includes the device ID
in event hub as show in the below image
If above properties exist you don’t need to explicitly add it in the message payload. So that the device ID
is included in the system properties under the key:
b'iothub-connection-device-id'
Modify your event handler to extract and print deviceID
:
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
BLOB_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=sampath56;AccountKey=YOUR_STORAGE_KEY"
BLOB_CONTAINER_NAME = "sampath"
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://sampath4.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_EVENT_HUB_KEY"
EVENT_HUB_NAME = "rav55"
async def on_event(partition_context, event):
"""Handles received events and extracts device ID."""
print(f"\nReceived event from partition: {partition_context.partition_id}")
message_body = event.body_as_str(encoding='UTF-8')
print(f"Message Body: {message_body}")
device_id = event.system_properties.get(b'iothub-connection-device-id', b'').decode()
print(f"Device ID: {device_id}")
print("\nSystem Properties:")
for key, value in event.system_properties.items():
print(f" - {key}: {value}")
# Print event metadata
print("\nEvent Metadata:")
print(f" - Offset: {event.offset}")
print(f" - Sequence Number: {event.sequence_number}")
print(f" - Enqueued Time: {event.enqueued_time}")
await partition_context.update_checkpoint(event)
async def main():
"""Starts the Event Hub consumer and listens for messages."""
checkpoint_store = BlobCheckpointStore.from_connection_string(
BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
)
client = EventHubConsumerClient.from_connection_string(
EVENT_HUB_CONNECTION_STR,
consumer_group="$Default",
eventhub_name=EVENT_HUB_NAME,
checkpoint_store=checkpoint_store,
)
async with client:
print("Listening for messages...")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == "__main__":
asyncio.run(main())
** Alternative Workaround (If System Properties Are Missing)**
If, for some reason, `b'iothub-connection-device-id'` is missing, you can manually include the device ID in the telemetry payload like this:
message_payload = json.dumps({
"deviceId": device_id, # Manually adding device ID
"temperature": 22.5,
"humidity": 60
})