.netapache-kafkamasstransitconfluent-cloud

Using MassTransit to connect to Confluent Cloud not working


I tried with the following set up to connect securely to the Confluent Cloud environment through MassTransit, but it seems not be working. I'm trying to make this work in .NET Core:

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((busRegistryContext, rabbitMQBusFactory) => rabbitMQBusFactory.ConfigureEndpoints(busRegistryContext));

    x.AddRider(rider =>
    {
        rider.AddProducer<UserEvent>(topicName: "UserCreated");
        rider.AddProducer<UserEvent>(topicName: "UserUpdated");
        rider.AddProducer<UserEvent>(topicName: "UserDeleted");

        rider.AddConsumer<UserCreatedEventConsumer>();

        rider.UsingKafka((riderContext, kafkaFactory) => 
        {
            kafkaFactory.SecurityProtocol = Confluent.Kafka.SecurityProtocol.SaslSsl;

            kafkaFactory.Host(server: "[hided...].westeurope.azure.confluent.cloud:9092", configureHost =>
            {
                configureHost.UseSasl(saslConfig =>
                {
                saslConfig.Mechanism = Confluent.Kafka.SaslMechanism.Plain;
                saslConfig.Username = "...................";
                saslConfig.Password = "...................";
                });
            });

            var consumerConfig = new ConsumerConfig()
            {
                GroupId = "dotnet-example-group-1",
                AutoOffsetReset = AutoOffsetReset.Latest,
                EnableAutoCommit = false
            };

            kafkaFactory.TopicEndpoint<UserCreatedEvent>(
                topicName: "UserCreated", consumerConfig, kafkaTopicReceiveEndpointConfig =>
            {
                kafkaTopicReceiveEndpointConfig.ConfigureConsumer<UserCreatedEventConsumer>(riderContext);
            });
        });
    });
});

I get the following error:

MassTransit: Warning: Connection Failed: rabbitmq://localhost/

RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
 ---> System.AggregateException: One or more errors occurred. (Connection failed)
 ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
 ---> System.Net.Sockets.SocketException (10061): No connection could be made because the target machine actively refused it.
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
   at System.Threading.Tasks.ValueTask.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location ---
   at RabbitMQ.Client.Impl.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, TimeSpan timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at MassTransit.RabbitMqTransport.ConnectionContextFactory.CreateConnection(ISupervisor supervisor)

I actually expected to just connect to my Confluent Cloud Kafka cluster, but I'm new to MassTransit and Confluent Cloud and don't get it why this is happening (Although I followed some tutorials and read through MassTransit documentation carefully)


Solution

  • The way I solved it is as follows:

    P.S. keep in mind that if you want to load balance between various consumers, you might want to use different consumer group ids. Partitions will than be divided within these consumer groups. Please see: Consumer Groups

    One more thing: you do not have to inject the IHostedService anymore with MassTransit, since services.AddMassTransit()... does this for you.

    For people who are new to the concept of 'messaging' like me...

    You can choose whether or not to include the consumer within Program.cs. If you do so, you will just receive the payload within the same project. Let's say you define the producer in one project and the consumer in another, then the producer will send the message to the Kafka topic and the consumer within that other project will read from this topic and will receive the message.

    Program.cs

    using Confluent.Kafka;
    using MassTransit;
    
    services.AddMassTransit(x =>
    {
        x.UsingInMemory((context, cfg) =>
        {
            cfg.ConfigureEndpoints(context);
        });
    
        x.AddRider(rider =>
        {
            // Producers
            rider.AddProducer<UserCreatedEvent>(topicName: "UserCreated");//Example 1
            rider.AddProducer<UserUpdatedEvent>(topicName: "UserUpdated");//Example 2
            rider.AddProducer<UserDeletedEvent>(topicName: "UserDeleted");//Example 3
    
            // Consumers
            rider.AddConsumer<UserCreatedEventConsumer>();
    
            // Apache Kafka configuration
            rider.UsingKafka((riderContext, kafkaFactory) => 
            {
                kafkaFactory.SecurityProtocol = SecurityProtocol.SaslSsl;
    
                kafkaFactory.Host("[hided...].westeurope.azure.confluent.cloud:9092", configureHost =>
                {
                    configureHost.UseSasl(saslConfig =>
                    {
                        saslConfig.Mechanism = SaslMechanism.Plain;
                        saslConfig.Username = ".....PUT HERE YOUR API KEY.........";
                        saslConfig.Password = ".....PUT HERE YOUR API SECRET......";
                    });
                });
    
                var consumerConfig = new ConsumerConfig()
                {
                    // If you have 4 consumers and you want to divide them within for example 8 partitions -> use the same group id for the consumers you want the partitions assigned to
                    // If you, for instance use the same group id for 3 consumers and the 4th has another group id, then the partitions will be divided in such a way that 3 consumers will be 
                    // assigned from 0 to 6 partitions and the 4th will only consume from the 7th partition.. 
                    // Using partitions enables consumers to read in parallel which is convenient for consumers to consume at the same time..
                    GroupId = "dotnet-example-group-2",
    
                    //Keep in mind that this will start from where the offset was left (offset is a unique number for each message)
                    // You can also use 'Earliest', but this will always pull everything in!
                    // In case of down time of an application, you could do additional check to see whether or not something was already processed
                    // Just to be sure (within your custom class that consumes messages)
                    AutoOffsetReset = AutoOffsetReset.Latest,
    
                    EnableAutoCommit = false,
                    // Use Property for Sasl Username for Consumer (only if needed)
                    // Use Property Sasl Password for Consumer (only if needed)
                };
    
                // Put here your consumer endpoints (Configuration of how to receive messages and from which topic incl. settings from consumerConfig)
                kafkaFactory.TopicEndpoint<UserCreatedEvent>(
                topicName: "UserCreated", consumerConfig, kafkaTopicReceiveEndpointConfig =>
                {
                    kafkaTopicReceiveEndpointConfig.ConfigureConsumer<UserCreatedEventConsumer>(riderContext);
                });
            });
        });
    });
    

    Controller

    public class TestController 
    {
        private readonly ITopicProducer<UserCreatedEvent> userCreatedProducer;
        private readonly ITopicProducer<UserUpdatedEvent> userUpdatedProducer;
        private readonly ITopicProducer<UserDeletedEvent> userDeletedProducer;
    
        public AuthController(
            ITopicProducer<UserCreatedEvent> userCreatedProducer,
            ITopicProducer<UserUpdatedEvent> userUpdatedProducer,
            ITopicProducer<UserDeletedEvent> userDeletedProducer)
       {
           this.userCreatedProducer = userCreatedProducer;
           this.userUpdatedProducer = userUpdatedProducer;
           this.userDeletedProducer = userDeletedProducer;
       }
    
        [HttpGet(nameof(TestKafka))]
        public async Task<IActionResult> TestKafka()
        {
            await userCreatedProducer.Produce(new UserCreatedEvent()
            {
                EventId = Guid.NewGuid(),
                Value = new User() 
                { 
                    Id = 1, 
                    Username = "Apache_Kafka_Test_User"
                }
            });
    
            await userUpdatedProducer.Produce(new UserUpdatedEvent()
            {
                EventId = Guid.NewGuid(),
                Value = new User()
                {
                    Id = 1,
                    Username = "Apache_Kafka_Test_User_Updated"
                }
            });
    
            return Ok();
        }
    }
    

    Custom classes: Message Classes

    public class UserCreatedEvent
    {
        public Guid EventId { get; set; }
    
        public object Value { get; set; }
    
        public UserCreatedEvent()
        {
    
        }
    }
    
    public class UserUpdatedEvent
    {
        public Guid EventId { get; set; }
    
        public object Value { get; set; }
    
        public UserUpdatedEvent()
        {
            
        }
    }
    

    Custom classes: Consumer Message Classes for MassTransit

    public class UserCreatedEventConsumer : IConsumer<UserCreatedEvent>
    {
        public Task Consume(ConsumeContext<UserCreatedEvent> context)
        {
            _ = context.Message.EventId;
            _ = context.Message.Value;
    
            Debug.WriteLine($"EventId: {context.Message.EventId}");
            Debug.WriteLine(context.Message.Value);
            return Task.CompletedTask;
        }
    }
    
    public class UserUpdatedEventConsumer : IConsumer<UserUpdatedEvent>
    {
        public Task Consume(ConsumeContext<UserUpdatedEvent> context)
        {
            _ = context.Message.EventId;
            _ = context.Message.Value;
    
            Debug.WriteLine($"EventId: {context.Message.EventId}");
            Debug.WriteLine(context.Message.Value);
            return Task.CompletedTask;
        }
    }