How to connect to kafka port and do integration testing by reading topic and message

I am trying to connect to Kafka port exposed by me in apphost in integrationtest.cs file after connecting to port, go and read all kafka topics and open a specific topic and read the messages in it


var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka", port: 9200)
    .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100))

builder.AddProject<Projects.O_WP_PIT_KafkaPublisher_Service>("kafkaPublisher", "Aspire")


**IntegrationTest.cs **

_appHost = await DistributedApplicationTestingBuilder.CreateAsync<Projects.PI_AspireSolution_AppHost>();
_appHost.Services.ConfigureHttpClientDefaults(clientBuilder =>
_app = await _appHost.BuildAsync();
await _app.StartAsync();
        await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running).WaitAsync(TimeSpan.FromSeconds(120));

after this im not sure how to connect to kafka and read all the topics and messages in each topic


  • first of all Make sure you have the Confluent.Kafka and XUnit NuGet packages installed in your test project:

    dotnet add package Confluent.Kafka
    dotnet add package XUnit 

    Once you ensure Kafka is running (WaitForResourceAsync completes successfully), you need Get List of Topics as follows :

    public async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
        using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
        var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
        return metadata.Topics.Select(t => t.Topic).ToList();

    Once you have the topic list, you can create a Kafka Consumer to read messages.

    public async Task ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
        var config = new ConsumerConfig
            BootstrapServers = bootstrapServers,
            GroupId = "test-consumer-group",
            AutoOffsetReset = AutoOffsetReset.Earliest, // Read from the beginning
            EnableAutoCommit = false // Don't auto-commit offsets
        using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
            for (int i = 0; i < 5; i++) // Read 5 messages for testing
                var consumeResult = consumer.Consume(TimeSpan.FromSeconds(10));
                if (consumeResult != null)
                    Console.WriteLine($"Received message: {consumeResult.Message.Value}");
        catch (Exception ex)
            Console.WriteLine($"Error while consuming: {ex.Message}");

    The complete code of IntegrationTest.cs will be as follows :

    using Xunit;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using Confluent.Kafka;
    using Confluent.Kafka.Admin;
    public class KafkaIntegrationTest
        private readonly string _kafkaBootstrapServers = "localhost:9200"; // Change if needed
        [InlineData("my-topic")]   //Replace this with your desired topic
        [InlineData("another-topic")] //Replace this with your desired topic  
        public async Task Kafka_Should_Contain_Topic_And_Read_Messages(string topicName)
            // Wait for Kafka to be up
            await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running)
            // Get all topics
            var topics = await GetAllTopicsAsync(_kafkaBootstrapServers);
            // Assert topic exists
            Assert.Contains(topicName, topics);
            // Read messages from the topic
            var messages = await ReadMessagesFromTopicAsync(_kafkaBootstrapServers, topicName);
            // Assert messages are not empty
            // (Optional) Assert specific expected message content
            Assert.Contains(messages, msg => msg.Contains("expected-message-content"));
        private async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
            using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
            return metadata.Topics.Select(t => t.Topic).ToList();
        private async Task<List<string>> ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
            var config = new ConsumerConfig
                BootstrapServers = bootstrapServers,
                GroupId = "test-consumer-group",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false
            var messages = new List<string>();
            using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
                for (int i = 0; i < 5; i++) // Read up to 5 messages
                    var consumeResult = consumer.Consume(TimeSpan.FromSeconds(5));
                    if (consumeResult != null)
            return messages;