javaspring-bootdockerapache-kafka

Kafka consumer always tries 127.0.0.1:9092


I have this image of kafka running in a container:

 service-kafka:
    container_name: pepito-kafka-server
    hostname: pepito
    image: apache/kafka:3.7.1
    networks: 
      - pepito-network
    ports:
        - 8484:9092
    extra_hosts:
      - "docker.internal:127.0.0.1"
    healthcheck:
      test: /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server pepito:9092 || exit 1
      interval: 1s
      timeout: 60s
      retries: 60

as you can see I have a custom port for kafka,

now from my springboot app I'm trying to use it like this:

@Configuration
public class KafkaTopicConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
}

and,

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

and,

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${pepito.group-id}")
    private String groupId;

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @KafkaListener(topics = "${spring.application.name}", groupId = "${pepito.group-id}")
    public void listenGroupFoo(String message) {
        System.out.println("Received Message in group " + groupId + ": " + message);
    }

}

I have a producer and a consumer, because the idea for this app is to send as well as receive and process messages,

now, I'm getting these errors:

[36morg.apache.kafka.clients.Metadata       [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Cluster ID: 5L6g3nShT-eMCtK--X85sw
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Discovered group coordinator localhost:9092 (id: 2147483546 rack: null)
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] (Re-)joining group
[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 2147453646 disconnected.
[2m2025-04-21T15:43:36.068-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 2147483636 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: null. isDisconnected: true. Rediscovery will be attempted.
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Request joining group due to: rebalance failed due to 'null' (DisconnectException)
[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.087-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.199-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.302-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.548-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.998-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:37.800-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.

It's trying to connect to 127.0.0.1:9092 the default host/port for springboot,

Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.

and I have in my application.properties

spring:
  kafka:
    bootstrap-servers: "pepito:8484" # in my hosts pepito is pointing to where docker is running

where else in my configuration do I have to indicate to connect ONLY to pepito:8484, and no other ip/port? also, I did not change a thing in my container image of kafka, do I need to set something there? any help is welcomed,


Solution

  • So basically, your Spring Boot app is like, "Hey Kafka, I'm knocking on pepito:8484, let’s chat!"

    Kafka opens the door and goes, "Cool, nice to meet you! But if you wanna keep talking, hit me up at localhost:9092 from now on."

    And your poor app is like, "Uh... okay, I guess..."
    Then it tries to call localhost:9092, but obviously, there's no one home....connection errors.

    In your service-kafka container, add the env variable ADVERTISED_LISTENERS as:

    service-kafka:
      container_name: pepito-kafka-server
      hostname: pepito
      image: apache/kafka:3.7.1
      networks: 
        - pepito-network
      ports:
        - 8484:9092
      environment:
        KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://pepito:8484 #here
      extra_hosts:
        - "docker.internal:127.0.0.1"
      healthcheck:
        test: /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server pepito:9092 || exit 1
        interval: 1s
        timeout: 60s
        retries: 60
    

    This time, kafka will tell the app "its okay, now we can talk".