asp.net.net.net-coredotnet-httpclientdotnet-aspire

How to connect to kafka port and do integration testing by reading topic and message


I am trying to connect to Kafka port exposed by me in apphost in integrationtest.cs file after connecting to port, go and read all kafka topics and open a specific topic and read the messages in it

Apphost.cs

var builder = DistributedApplication.CreateBuilder(args);
"
"
"
"
var kafka = builder.AddKafka("kafka", port: 9200)
    .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100))
    .WithLifetime(ContainerLifetime.Session);

builder.AddProject<Projects.O_WP_PIT_KafkaPublisher_Service>("kafkaPublisher", "Aspire")
    .WithReference(apiService)
    .WaitFor(apiService)
    .WithReference(kafka)
    .WaitFor(kafka)
    .WaitForCompletion(dacpac);

builder.Build().Run();

**IntegrationTest.cs **

_appHost = await DistributedApplicationTestingBuilder.CreateAsync<Projects.PI_AspireSolution_AppHost>();
_appHost.Services.ConfigureHttpClientDefaults(clientBuilder =>
{
    clientBuilder.AddStandardResilienceHandler();
});
_app = await _appHost.BuildAsync();
await _app.StartAsync();
        await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running).WaitAsync(TimeSpan.FromSeconds(120));

after this im not sure how to connect to kafka and read all the topics and messages in each topic


Solution

  • first of all Make sure you have the Confluent.Kafka and XUnit NuGet packages installed in your test project:

    dotnet add package Confluent.Kafka
    
    dotnet add package XUnit 
    

    Once you ensure Kafka is running (WaitForResourceAsync completes successfully), you need Get List of Topics as follows :

    public async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
    {
        using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
        var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
        return metadata.Topics.Select(t => t.Topic).ToList();
    }
    

    Once you have the topic list, you can create a Kafka Consumer to read messages.

    public async Task ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = bootstrapServers,
            GroupId = "test-consumer-group",
            AutoOffsetReset = AutoOffsetReset.Earliest, // Read from the beginning
            EnableAutoCommit = false // Don't auto-commit offsets
        };
    
        using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
        consumer.Subscribe(topicName);
    
        try
        {
            for (int i = 0; i < 5; i++) // Read 5 messages for testing
            {
                var consumeResult = consumer.Consume(TimeSpan.FromSeconds(10));
                if (consumeResult != null)
                {
                    Console.WriteLine($"Received message: {consumeResult.Message.Value}");
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error while consuming: {ex.Message}");
        }
        finally
        {
            consumer.Close();
        }
    }
    

    The complete code of IntegrationTest.cs will be as follows :

    using Xunit;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using Confluent.Kafka;
    using Confluent.Kafka.Admin;
    
    public class KafkaIntegrationTest
    {
        private readonly string _kafkaBootstrapServers = "localhost:9200"; // Change if needed
    
        [Theory]
        [InlineData("my-topic")]   //Replace this with your desired topic
        [InlineData("another-topic")] //Replace this with your desired topic  
        public async Task Kafka_Should_Contain_Topic_And_Read_Messages(string topicName)
        {
            // Wait for Kafka to be up
            await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running)
                                              .WaitAsync(TimeSpan.FromSeconds(120));
    
            // Get all topics
            var topics = await GetAllTopicsAsync(_kafkaBootstrapServers);
            
            // Assert topic exists
            Assert.Contains(topicName, topics);
    
            // Read messages from the topic
            var messages = await ReadMessagesFromTopicAsync(_kafkaBootstrapServers, topicName);
    
            // Assert messages are not empty
            Assert.NotEmpty(messages);
    
            // (Optional) Assert specific expected message content
            Assert.Contains(messages, msg => msg.Contains("expected-message-content"));
        }
    
        private async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
        {
            using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
            return metadata.Topics.Select(t => t.Topic).ToList();
        }
    
        private async Task<List<string>> ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = bootstrapServers,
                GroupId = "test-consumer-group",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false
            };
    
            var messages = new List<string>();
    
            using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
            consumer.Subscribe(topicName);
    
            try
            {
                for (int i = 0; i < 5; i++) // Read up to 5 messages
                {
                    var consumeResult = consumer.Consume(TimeSpan.FromSeconds(5));
                    if (consumeResult != null)
                    {
                        messages.Add(consumeResult.Message.Value);
                    }
                }
            }
            finally
            {
                consumer.Close();
            }
    
            return messages;
        }
    }