dockerapache-kafkafastapiconsumerkafka-python

Kafka + FastAPI + Docker template


Introduction

I am currently experimenting with Kafka and FastAPI and trying to build a template to enable me to quickly write programs in a microservice pattern.

Goal - Vision

Building a repository of design patterns that implement very easy microservice infrastructures. The examples should only demonstrate how messages are sent between different services and offer a user to easily integrate their custom code without the hassle of spending a lot of time with the setup.

Motivation

I searched a lot but I was not able to find simple examples. Most examples are highly customized and do not really generalize.

Tech Stack

Open to other implementations

Please let me know if you have any other recommendations. I am quite new to microservice architectures and would be very happy to explore further designs.

Current Problem

My current template involves building a Zookeeper, Kafka, consumer, and producer service. However, I am encountering an issue where my consumer is not able to consume messages generated by my producer. The producer seems to work fine and successfully publishes messages, which I have confirmed using the docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my-topic --from-beginning command.

My consumer appears to not do anything at all.

Thank you in advance for all your suggestions on this issue.

my folder structure:

enter image description here

my docker-compose file:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka:
    image: confluentinc/cp-kafka:latest
    restart: "no"
    links:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL


  producer:
    build: ./producer
    ports:
      - '8000:8000'
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    depends_on:
      - kafka

  consumer:
    build: ./consumer
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
      - KAFKA_GROUP_ID=my-group
    depends_on:
      - kafka

  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    ports:
      - 9000:9000
    depends_on:
      - kafka

my producer docker file:

FROM python:3.8-slim-buster


COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

my producer req file:

fastapi
uvicorn
confluent-kafka

my producer main.py:

import json
from confluent_kafka import Producer
from fastapi import FastAPI


app = FastAPI()

producer_conf = {
    'bootstrap.servers': 'kafka:9092',
    'client.id': 'my-app'
}

producer = Producer(producer_conf)

def produce(data: dict):
    try:
        data = json.dumps(data).encode('utf-8')
        producer.produce('my-topic', value=data)
        producer.flush()
        return {"status": "success", "message": data}
    except Exception as e:
        return {"status": "error", "message": str(e)}

my consumer docker file:

FROM python:3.8-slim-buster

COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt



CMD [ "python", "main.py" ]

my consumer req file:

confluent-kafka

my consumer main.py:

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'kafka:9092',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
    'group.id': 'my-group',
    'api.version.request': True,
    'api.version.fallback.ms': 0
}

def consume_messages():
    consumer = Consumer(conf)

    consumer.subscribe(['my-topic'])

    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Reached end of partition: {msg.topic()}[{msg.partition()}]')
                else:
                    print(f'Error while consuming messages: {msg.error()}')
            else:
                print(f"Received message: {msg.value().decode('utf-8')}")

    except Exception as e:
        print(f"Exception occurred while consuming messages: {e}")
    finally:
        consumer.close()

def startup():
    consume_messages()

if __name__ == "__main__":
    try:
        print("Starting consumer...")
        startup()
    except Exception as e:
        print(f"Exception occurred: {e}")

Build system via:

docker-compose up

You can activate the producer with this curl:

 curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'

I tried to re-write the consumer multiple times. Changed ports and docker compose configurations. Unfortunatly, I am unable to pin-point my issue.


Solution

  • Special thanks to @OneCricketeer for helping me get this up and running!

    Repository for micro service templates

    (contains this solution)

    Please feel free to contribute here:

    https://github.com/maxmekiska/micro-templates

    Tech Stack

    Open to other implementations

    Please let me know if you have any other recommendations. I am quite new to microservice architectures and would be very happy to explore further designs.

    my folder structure:

    enter image description here

    my docker-compose file:

    version: '3'
    
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
        ports:
          - 2181:2181
          - 2888:2888
          - 3888:3888
    
      kafka:
        image: confluentinc/cp-kafka:latest
        restart: "no"
        links:
          - zookeeper
        ports:
          - 9092:9092
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    
      kafdrop:
        image: obsidiandynamics/kafdrop
        restart: "no"
        environment:
          KAFKA_BROKERCONNECT: "kafka:29092"
        ports:
          - 9000:9000
        depends_on:
          - kafka
    
      producer:
        build: ./producer
        ports:
          - '8000:8000'
        environment:
          - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
        depends_on:
          - kafka
    
      consumer:
        build: ./consumer
        environment:
          - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
          - KAFKA_GROUP_ID=my-group
        depends_on:
          - kafka
    

    my producer docker file:

    FROM python:3.8-slim-buster
    
    
    COPY . /app
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
    

    my producer req file:

    fastapi
    uvicorn
    confluent-kafka
    

    my producer main.py:

    import json
    from confluent_kafka import Producer
    from fastapi import FastAPI
    
    
    app = FastAPI()
    
    producer_conf = {
        'bootstrap.servers': 'kafka:29092',
        'client.id': 'my-app'
    }
    
    producer = Producer(producer_conf)
    
    @app.post("/produce")
    def produce(data: dict):
        producer.produce('my-topic', value=json.dumps(data).encode('utf-8'))
        producer.flush()
        return {"status": "success"}
    

    my consumer docker file:

    FROM python:3.8-slim-buster
    
    COPY . /app
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    
    
    CMD [ "python", "main.py" ]
    

    my consumer req file:

    confluent-kafka
    

    my consumer main.py:

    from confluent_kafka import Consumer, KafkaError
    import time 
    
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    
    conf = {
        'bootstrap.servers': 'kafka:29092',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': True,
        'group.id': 'my-group',
        'api.version.request': True,
        'api.version.fallback.ms': 0
    }
    
    def consume_messages():
        consumer = Consumer(conf)
    
        consumer.subscribe(['my-topic'])
    
        try:
            while True:
                msg = consumer.poll(1.0)
                logging.info("Polling")
                logging.info(msg)
    
                if msg is None:
                    logging.info("No message")
                    continue
    
                if msg.error():
                    logging.info("Error")
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        print(f'Reached end of partition: {msg.topic()}[{msg.partition()}]')
                    else:
                        print(f'Error while consuming messages: {msg.error()}')
                        logging.info(msg.error())
                else:
                    print(f"Received message: {msg.value().decode('utf-8')}")
                    logging.info(msg.value().decode('utf-8'))
    
        except Exception as e:
            print(f"Exception occurred while consuming messages: {e}")
            logging.info(e)
        finally:
            consumer.close()
            logging.info("Consumer closed")
    
    
    def startup():
        logging.info("Starting consumer...")
        time.sleep(30)
        consume_messages()
    
    if __name__ == "__main__":
        try:
            startup()
        except Exception as e:
            print(f"Exception occurred: {e}")
    

    Build system via:

    docker-compose up
    

    You can activate the producer with this curl:

     curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'