dockerapache-kafkadocker-composedocker-networkstream-processing

Docker connection between Kafka and Bytewax refused


I want to consume a stream from Kafka using Bytewax to perform aggregations. Unfortunately I'm not able to connect to Kafka and the connection is always refused. I assume something with the port setup is not correct, but could not figure it out. Its even more confusing to me that another KafkaConsumer (consumer.py) running in another container which consumes and prints the stream without any errors.

docker-compose.yml

services:
  kafka:
    image: apache/kafka
    ports:
      - "9092:9092"
    environment:
      # Configure listeners for both docker and host communication
      KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://kafka:9093
      KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT

      # Settings required for KRaft mode
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091

      # Listener to use for broker-to-broker communication
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

      # Required for a single node cluster
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    healthcheck:
      test: ["CMD", "bash", "-c", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - app-network

  kafka-ui:
    image: ghcr.io/kafbat/kafka-ui:latest
    ports:
      - 8080:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: "true"
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
    depends_on:
      - kafka
    networks:
      - app-network
  
  consumer:
    build:
      context: ./kafka_consumer
      dockerfile: Dockerfile
    container_name: consumer
    depends_on:
      factory-service:
        condition: service_started
      kafka:
        condition: service_healthy
    ports:
      - "8099:80"
    networks:
      - app-network

  bytewax:
    build:
      context: ./consumer
      dockerfile: Dockerfile
    container_name: bytewax
    depends_on:
      - kafka
    networks:
      - app-network

networks:
  app-network:
    driver: bridge

consumer.py (prints datastream)

from kafka import KafkaConsumer


KAFKA_BROKER = "kafka:9093"
KAFKA_TOPIC = ["factory_001","factory_002"]


consumer = KafkaConsumer(
      *KAFKA_TOPIC,
      group_id='my-group',
      bootstrap_servers=KAFKA_BROKER,
      value_deserializer=lambda x: json.loads(x.decode("utf-8"))
      )

Stream_process.py (not working)

from bytewax import operators as op
from bytewax.connectors.kafka import KafkaSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow


KAFKA_BROKER = ["kafka:9093"]
KAFKA_TOPIC = ["factory_001"]


flow = Dataflow("Average Aggregation")
stream = op.input("kafka-in", flow, KafkaSource(KAFKA_BROKER, KAFKA_TOPIC))


op.output("out", stream, StdOutSink())

Log message:

%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
    return list(_list_parts(client, self._topics))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
    return list(_list_parts(client, self._topics))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
    return list(_list_parts(client, self._topics))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
    return list(_list_parts(client, self._topics))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
    return list(_list_parts(client, self._topics))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    return list(_list_parts(client, self._topics))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 172, in _list_parts
    raise RuntimeError(msg)
RuntimeError: error listing partitions for Kafka topic `'factory_001'`: Broker: Unknown topic or partition

The above exception was the direct cause of the following exception:

bytewax.errors.BytewaxRuntimeError: (src/inputs.rs:252:47): error calling `FixedPartitionSource.list_parts` in step "Average Aggregation.kafka-in"

The above exception was the direct cause of the following exception:

bytewax.errors.BytewaxRuntimeError: (src/worker.rs:354:34): error building FixedPartitionedSource

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/usr/local/lib/python3.11/site-packages/bytewax/run.py", line 355, in <module>
    cli_main(**kwargs)
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:149:10): error building production dataflow

Solution

  • In the docker-compose.yml you are forgetting to port map the 9093 port:

        ports:
          - "9092:9092"
          - "9093:9093" # add this
    

    This should port map your localhost to the advertised listener you defined for docker as kafka:9093

    KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
    

    You should then be able to connect to kafka:9093 on localhost, which will redirect to your listener.

    Also, just in case as I believe you already did this (it would give a DNS error instead), but anyway, the kafka domain should be included in /etc/hosts so it can be resolved.