I am trying to connect to Kafka port exposed by me in apphost in integrationtest.cs file after connecting to port, go and read all kafka topics and open a specific topic and read the messages in it
Apphost.cs
var builder = DistributedApplication.CreateBuilder(args);
"
"
"
"
var kafka = builder.AddKafka("kafka", port: 9200)
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100))
.WithLifetime(ContainerLifetime.Session);
builder.AddProject<Projects.O_WP_PIT_KafkaPublisher_Service>("kafkaPublisher", "Aspire")
.WithReference(apiService)
.WaitFor(apiService)
.WithReference(kafka)
.WaitFor(kafka)
.WaitForCompletion(dacpac);
builder.Build().Run();
**IntegrationTest.cs **
_appHost = await DistributedApplicationTestingBuilder.CreateAsync<Projects.PI_AspireSolution_AppHost>();
_appHost.Services.ConfigureHttpClientDefaults(clientBuilder =>
{
clientBuilder.AddStandardResilienceHandler();
});
_app = await _appHost.BuildAsync();
await _app.StartAsync();
await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running).WaitAsync(TimeSpan.FromSeconds(120));
after this im not sure how to connect to kafka and read all the topics and messages in each topic
first of all Make sure you have the Confluent.Kafka
and XUnit
NuGet packages installed in your test project:
dotnet add package Confluent.Kafka
dotnet add package XUnit
Once you ensure Kafka is running (WaitForResourceAsync
completes successfully), you need Get List of Topics as follows :
public async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
{
using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Select(t => t.Topic).ToList();
}
Once you have the topic list, you can create a Kafka Consumer to read messages.
public async Task ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
{
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "test-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest, // Read from the beginning
EnableAutoCommit = false // Don't auto-commit offsets
};
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe(topicName);
try
{
for (int i = 0; i < 5; i++) // Read 5 messages for testing
{
var consumeResult = consumer.Consume(TimeSpan.FromSeconds(10));
if (consumeResult != null)
{
Console.WriteLine($"Received message: {consumeResult.Message.Value}");
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error while consuming: {ex.Message}");
}
finally
{
consumer.Close();
}
}
The complete code of IntegrationTest.cs
will be as follows :
using Xunit;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
public class KafkaIntegrationTest
{
private readonly string _kafkaBootstrapServers = "localhost:9200"; // Change if needed
[Theory]
[InlineData("my-topic")] //Replace this with your desired topic
[InlineData("another-topic")] //Replace this with your desired topic
public async Task Kafka_Should_Contain_Topic_And_Read_Messages(string topicName)
{
// Wait for Kafka to be up
await _resourceNotificationService.WaitForResourceAsync("kafka", KnownResourceStates.Running)
.WaitAsync(TimeSpan.FromSeconds(120));
// Get all topics
var topics = await GetAllTopicsAsync(_kafkaBootstrapServers);
// Assert topic exists
Assert.Contains(topicName, topics);
// Read messages from the topic
var messages = await ReadMessagesFromTopicAsync(_kafkaBootstrapServers, topicName);
// Assert messages are not empty
Assert.NotEmpty(messages);
// (Optional) Assert specific expected message content
Assert.Contains(messages, msg => msg.Contains("expected-message-content"));
}
private async Task<List<string>> GetAllTopicsAsync(string bootstrapServers)
{
using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
return metadata.Topics.Select(t => t.Topic).ToList();
}
private async Task<List<string>> ReadMessagesFromTopicAsync(string bootstrapServers, string topicName)
{
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "test-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
var messages = new List<string>();
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe(topicName);
try
{
for (int i = 0; i < 5; i++) // Read up to 5 messages
{
var consumeResult = consumer.Consume(TimeSpan.FromSeconds(5));
if (consumeResult != null)
{
messages.Add(consumeResult.Message.Value);
}
}
}
finally
{
consumer.Close();
}
return messages;
}
}