I want to consume a stream from Kafka using Bytewax to perform aggregations. Unfortunately I'm not able to connect to Kafka and the connection is always refused. I assume something with the port setup is not correct, but could not figure it out. Its even more confusing to me that another KafkaConsumer (consumer.py) running in another container which consumes and prints the stream without any errors.
docker-compose.yml
services:
kafka:
image: apache/kafka
ports:
- "9092:9092"
environment:
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://kafka:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD", "bash", "-c", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list"]
interval: 10s
timeout: 5s
retries: 5
networks:
- app-network
kafka-ui:
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
depends_on:
- kafka
networks:
- app-network
consumer:
build:
context: ./kafka_consumer
dockerfile: Dockerfile
container_name: consumer
depends_on:
factory-service:
condition: service_started
kafka:
condition: service_healthy
ports:
- "8099:80"
networks:
- app-network
bytewax:
build:
context: ./consumer
dockerfile: Dockerfile
container_name: bytewax
depends_on:
- kafka
networks:
- app-network
networks:
app-network:
driver: bridge
consumer.py (prints datastream)
from kafka import KafkaConsumer
KAFKA_BROKER = "kafka:9093"
KAFKA_TOPIC = ["factory_001","factory_002"]
consumer = KafkaConsumer(
*KAFKA_TOPIC,
group_id='my-group',
bootstrap_servers=KAFKA_BROKER,
value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)
Stream_process.py (not working)
from bytewax import operators as op
from bytewax.connectors.kafka import KafkaSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
KAFKA_BROKER = ["kafka:9093"]
KAFKA_TOPIC = ["factory_001"]
flow = Dataflow("Average Aggregation")
stream = op.input("kafka-in", flow, KafkaSource(KAFKA_BROKER, KAFKA_TOPIC))
op.output("out", stream, StdOutSink())
Log message:
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 172, in _list_parts
raise RuntimeError(msg)
RuntimeError: error listing partitions for Kafka topic `'factory_001'`: Broker: Unknown topic or partition
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/inputs.rs:252:47): error calling `FixedPartitionSource.list_parts` in step "Average Aggregation.kafka-in"
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:354:34): error building FixedPartitionedSource
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/usr/local/lib/python3.11/site-packages/bytewax/run.py", line 355, in <module>
cli_main(**kwargs)
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:149:10): error building production dataflow
In the docker-compose.yml you are forgetting to port map the 9093 port:
ports:
- "9092:9092"
- "9093:9093" # add this
This should port map your localhost to the advertised listener you defined for docker as kafka:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
You should then be able to connect to kafka:9093 on localhost, which will redirect to your listener.
Also, just in case as I believe you already did this (it would give a DNS error instead), but anyway, the kafka domain should be included in /etc/hosts so it can be resolved.