I am new to Kafka and currently attempting to send data from Airflow to a Kafka broker. However, I'm encountering an issue where it reports a DNS lookup failure for the broker.
broker service in docker-compose.yml
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- confluent
healthcheck:
test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
interval: 10s
timeout: 5s
retries: 5
kafka_stream.py
def stream_data():
from kafka import KafkaProducer
res = get_data()
res = format_data(res)
print("Function Started")
print(res)
try:
producer = KafkaProducer(bootstrap_servers=['broker:29092'],api_version = (2, 5, 0), max_block_ms=500000)
producer.send('user_created', res)
except Exception as e:
print("Failed to connect to Kafka broker : ", e)
with DAG('user_automation',
default_args=default_args,
schedule_interval='@daily',
) as dag:
streaming_task = PythonOperator(
task_id = 'stream_data_from_api',
python_callable=stream_data
)
Error:
[2024-07-28, 16:04:19 UTC] {conn.py:1276} WARNING - DNS lookup failed for broker:29092, exception was [Errno -3] Temporary failure in name resolution. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
[2024-07-28, 16:04:19 UTC] {conn.py:297} ERROR - DNS lookup failed for broker:29092 (0)
Here is my complete code: https://github.com/dhainiksuthar/airflow-kafka
Containers can only resolve each other by name when they are part of the same network.
Looking at your repo, the airflow services are on the default network bridge rather than the isolated confluent
one.
Remove the networks list from the services, then they'll all be part of the same network