apache-kafkaavromasstransitconfluent-kafka-dotnet

Configure Avro with MassTransit and Kafka


How to configure MassTransit to serialize/deserialize using Avro when producing to, and consuming from Confluent Kafka topics? I saw that Avro serializer/deserializer are in the package Confluent.SchemaRegistry.Serdes. Some code examples would be welcomed.


Solution

  • To configure MassTransit to use Avro, the way I've done it is using the generated class files (avrogen), and then configuring the producer and topic endpoint as shown below:

    First, you need to create the client for the schema registry:

    var schemaRegistryClient = new CachedSchemaRegistryClient(new Dictionary<string, string>
    {
        {"schema.registry.url", "localhost:8081"},
    });
    

    Then, you can configure the rider:

    services.AddMassTransit(x =>
    {
        x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
        x.AddRider(rider =>
        {
            rider.AddConsumer<KafkaMessageConsumer>();
    
            rider.AddProducer<string, KafkaMessage>(Topic, context => context.MessageId.ToString())
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistryClient).AsSyncOverAsync())
                .SetValueSerializer(new AvroSerializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
    
            rider.UsingKafka((context, k) =>
            {
                k.Host("localhost:9092");
    
                k.TopicEndpoint<string, KafkaMessage>("topic-name", "consumer-group", c =>
                {
                    c.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistryClient).AsSyncOverAsync());
                    c.SetValueDeserializer(new AvroDeserializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
                    c.AutoOffsetReset = AutoOffsetReset.Earliest;
                    c.ConfigureConsumer<KafkaMessageConsumer>(context);
    
                    c.CreateIfMissing(m =>
                    {
                        m.NumPartitions = 2;
                    });
                });
            });
        });
    });
    

    You can review a working unit test to see further details. I should probably add this to the documentation.

    I wrote this just now to answer this question, I hadn't used Avro until an hour ago.

    Also, I used this article from Confluent to get things up and running. The docker-compose.yml in the linked unit test project configures all the required services.

    Update

    There is also a complete sample that uses Avro with Kafka and MassTransit.