We are sending Avro data encoded with (azure.schemaregistry.encoder.avroencoder) to Event-Hub using a standalone python job and we can deserialize using the same decoder using another standalone python consumer. The schema registry is also supplied to the Avro encoder in this case.
This is the stand alone producer I use
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
os.environ["AZURE_CLIENT_ID"] = ''
os.environ["AZURE_TENANT_ID"] = ''
os.environ["AZURE_CLIENT_SECRET"] = ''
token_credential = DefaultAzureCredential()
fully_qualified_namespace = ""
group_name = "testSchemaReg"
eventhub_connection_str = ""
eventhub_name = ""
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
I was able to deserialise using the stand alone consumer
async def on_event(partition_context, event):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'),
partition_context.partition_id))
print("message type is :")
print(type(event))
dec = avro_encoder.decode(event)
print("decoded msg:\n")
print(dec)
await partition_context.update_checkpoint(event)
async def main():
client = EventHubConsumerClient.from_connection_string(
"connection str"
"topic name",
consumer_group="$Default",
eventhub_name="")
async with client:
await client.receive(on_event=on_event, starting_position="-1")
As a next step , I replaced the standalone python consumer with the py-spark consumer running on synapse notebook. Below are the problems I faced
@udf
def decode(row_msg):
encoder = AvroEncoder(client=schema_registry_client)
encoder.decode(bytes(row_msg))
Answering my own question, azure event hub schema registry doesn't support spark or any distributed system.
They are working on it and trying to add this support to spark https://github.com/Azure/azure-event-hubs-spark/pull/573