apache-sparkazure-eventhubazure-synapsespark-avro

Avro bytes from Event hub cannot be deserialized with pyspark


We are sending Avro data encoded with (azure.schemaregistry.encoder.avroencoder) to Event-Hub using a standalone python job and we can deserialize using the same decoder using another standalone python consumer. The schema registry is also supplied to the Avro encoder in this case.

This is the stand alone producer I use

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential


os.environ["AZURE_CLIENT_ID"] = ''
os.environ["AZURE_TENANT_ID"] = ''
os.environ["AZURE_CLIENT_SECRET"] = ''
token_credential = DefaultAzureCredential()

fully_qualified_namespace = ""
group_name = "testSchemaReg"
eventhub_connection_str = ""
eventhub_name = ""

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

I was able to deserialise using the stand alone consumer

async def on_event(partition_context, event):
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'),
                                                                                 partition_context.partition_id))
    print("message type is :")
    print(type(event))
    dec = avro_encoder.decode(event)
    print("decoded msg:\n")
    print(dec)
    await partition_context.update_checkpoint(event)


async def main():
    client = EventHubConsumerClient.from_connection_string(
        "connection str"
        "topic name",
        consumer_group="$Default",
        eventhub_name="")
    async with client:
        await client.receive(on_event=on_event, starting_position="-1")

As a next step , I replaced the standalone python consumer with the py-spark consumer running on synapse notebook. Below are the problems I faced

  1. The from_avro function in spark is not able to deserialize the Avro message encoded with azure encoder.
  2. As a work a round, I tied creating an UDF which makes use of azure encoder , but I see that azure encoder is expecting the event to be of type EventData, but when spark reads the data using event hub API, we get the data in Byte Array.
@udf
def decode(row_msg):
    encoder = AvroEncoder(client=schema_registry_client)
    encoder.decode(bytes(row_msg))
  1. I don't see any proper documentation on the deserializer that we can use with spark or any distributed system. All examples are with Stand Alone clients. Do we have any connector that we can use with spark/Flink ?

Solution

  • Answering my own question, azure event hub schema registry doesn't support spark or any distributed system.

    They are working on it and trying to add this support to spark https://github.com/Azure/azure-event-hubs-spark/pull/573