How to configure MassTransit to serialize/deserialize using Avro when producing to, and consuming from Confluent Kafka topics? I saw that Avro serializer/deserializer are in the package Confluent.SchemaRegistry.Serdes
. Some code examples would be welcomed.
To configure MassTransit to use Avro, the way I've done it is using the generated class files (avrogen
), and then configuring the producer and topic endpoint as shown below:
First, you need to create the client for the schema registry:
var schemaRegistryClient = new CachedSchemaRegistryClient(new Dictionary<string, string>
{
{"schema.registry.url", "localhost:8081"},
});
Then, you can configure the rider:
services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.AddProducer<string, KafkaMessage>(Topic, context => context.MessageId.ToString())
.SetKeySerializer(new AvroSerializer<string>(schemaRegistryClient).AsSyncOverAsync())
.SetValueSerializer(new AvroSerializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<string, KafkaMessage>("topic-name", "consumer-group", c =>
{
c.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistryClient).AsSyncOverAsync());
c.SetValueDeserializer(new AvroDeserializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.ConfigureConsumer<KafkaMessageConsumer>(context);
c.CreateIfMissing(m =>
{
m.NumPartitions = 2;
});
});
});
});
});
You can review a working unit test to see further details. I should probably add this to the documentation.
I wrote this just now to answer this question, I hadn't used Avro until an hour ago.
Also, I used this article from Confluent to get things up and running. The docker-compose.yml
in the linked unit test project configures all the required services.
There is also a complete sample that uses Avro with Kafka and MassTransit.