fastapiaiokafka

Kafka consumer is blocking event loop in FastAPI


I am running two kafka consummers in fastAPI application

Found out that if line exists await startup_consumer() then there are no logs of startup_sync_consumer() and vise a versa. Looks like each consumer blocks event loop. How to fix it?

from fastapi import FastAPI
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaConsumer


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start Kafka components in the background
    async def start_kafka():
        logger.info("Starting Kafka initialization...")
        try:
            app.producer = await startup_producer()
            app.consumer = await startup_consumer()
            app.sync_consumer = await startup_sync_consumer()
        except Exception as e:
            logger.error(f"Failed to start Kafka components: {str(e)}")
            # Don't raise the exception, let the app continue running
        else:
            logger.info("Finished Kafka initialization")
    
    FastAPICache.init(InMemoryBackend())
    # Start Kafka in the background
    asyncio.create_task(start_kafka())
    
    yield
    
    # Cleanup Kafka components if they were started
    if hasattr(app, 'producer'):
        await shutdown_producer()
    if hasattr(app, 'consumer'):
        await shutdown_consumer(app.consumer)
    await shutdown_sync_consumer()


application = FastAPI(
    title="Data Provider Service",
    description="Service for managing geodata and geoservices",
    version="1.0.0",
    lifespan=lifespan
)


Consumers code

    consumer = KafkaConsumer()
    
    async def startup_consumer():
        """Start the Kafka consumer"""
        logger.info("Starting Kafka consumer")
        await consumer.start()
        return consumer
    # Consumers code
    class KafkaConsumer:
        def __init__(self):
            ...
    
         async def start(self):
            """Start the Kafka consumer"""
            try:            
                # Create and start consumer
                self.consumer = AIOKafkaConsumer()
                await self.consumer.start()
                await self.consume_messages()
            except:
                ...
    
        async def consume_messages(self):
            """Consume messages from Kafka"""
            try:
                async for message in self.consumer:
                    data_str = message.value.decode("utf-8")
                    data = json.loads(data_str)
            except:
                logger.error(f"Error in consume_messages: {e} [{traceback.print_exc()}] ")

Sync consumer

async def startup_sync_consumer():
    logger.info("Get Sync Kafka consumer")
    global sync_consumer_instance
    if sync_consumer_instance is None:
        try:
            sync_consumer_instance = SyncKafkaComsumer()
            logger.info("Starting sync kafka consumer")
            await sync_consumer_instance.start()
            logger.info("Sync kafka consumer started successfully")
        except Exception as e:
            logger.error(f"Failed to start sync kafka consumer: {e}")
            raise
    return sync_consumer_instance



class SyncKafkaComsumer:
    def __init__(self):
        ...
    async def start(self):
        """Start the Kafka consumer"""
        try:
            # Create and start consumer
            self.consumer = AIOKafkaConsumer()
            logger.info("Kafka consumer object created")
            
            await self.consumer.start()
            await self.consume_messages()
            ...
     
    async def consume_messages(self):
        """Consume messages from Kafka"""
        try:
            async for message in self.consumer:

Solution

  • Instead of awaiting consume_messages(), spawn it via asyncio.create_task() and store the task so you can cancel it on shutdown.

    Consumer Class - Updated:

    import asyncio
    import json
    import logging
    from aiokafka import AIOKafkaConsumer
    
    logger = logging.getLogger(__name__)
    
    class KafkaConsumer:
        def __init__(self, topic, bootstrap_servers, group_id):
            self.topic = topic
            self.bootstrap_servers = bootstrap_servers
            self.group_id = group_id
            self.consumer = None
            self._consume_task = None
    
        async def start(self):
            """Start the Kafka consumer and launch the consume loop in the background."""
            try:
                self.consumer = AIOKafkaConsumer(
                    self.topic,
                    bootstrap_servers=self.bootstrap_servers,
                    group_id=self.group_id,
                    enable_auto_commit=True,  # adjust per your needs
                )
                await self.consumer.start()
                logger.info("KafkaConsumer started for topic=%s group_id=%s", self.topic, self.group_id)
                # Run the long-lived consume loop in the background and return immediately
                self._consume_task = asyncio.create_task(self.consume_messages())
            except Exception:
                logger.exception("Failed to start KafkaConsumer")
    
        async def stop(self):
            """Stop the consume loop and the consumer."""
            try:
                if self._consume_task:
                    self._consume_task.cancel()
                    try:
                        await self._consume_task
                    except asyncio.CancelledError:
                        pass
                if self.consumer:
                    await self.consumer.stop()
                logger.info("KafkaConsumer stopped for topic=%s group_id=%s", self.topic, self.group_id)
            except Exception:
                logger.exception("Error during KafkaConsumer stop")
    
        async def consume_messages(self):
            """Consume messages in a long-running loop."""
            try:
                async for message in self.consumer:
                    try:
                        data_str = message.value.decode("utf-8")
                        data = json.loads(data_str)
                        # TODO: handle data
                    except Exception:
                        logger.exception("Failed to process message")
            except asyncio.CancelledError:
                # Task was cancelled during shutdown
                logger.debug("consume_messages cancelled")
            except Exception:
                logger.exception("Error in consume_messages")
    

    Same needs to be done for sync consumer. Even though you call it SyncKafkaComsumer, its still async (uses AIOKafkaConsumer). Mirror the same structure. You dont need to wait for the consume loop inside start(). Spawn a task and add a stop() method.

    class SyncKafkaComsumer:
        def __init__(self, topic, bootstrap_servers, group_id):
            self.topic = topic
            self.bootstrap_servers = bootstrap_servers
            self.group_id = group_id
            self.consumer = None
            self._consume_task = None
    
        async def start(self):
            try:
                self.consumer = AIOKafkaConsumer(
                    self.topic,
                    bootstrap_servers=self.bootstrap_servers,
                    group_id=self.group_id,
                )
                await self.consumer.start()
                logger.info("SyncKafkaComsumer started")
                self._consume_task = asyncio.create_task(self.consume_messages())
            except Exception:
                logger.exception("Failed to start SyncKafkaComsumer")
    
        async def stop(self):
            try:
                if self._consume_task:
                    self._consume_task.cancel()
                    try:
                        await self._consume_task
                    except asyncio.CancelledError:
                        pass
                if self.consumer:
                    await self.consumer.stop()
                logger.info("SyncKafkaComsumer stopped")
            except Exception:
                logger.exception("Error stopping SyncKafkaComsumer")
    
        async def consume_messages(self):
            try:
                async for message in self.consumer:
                    # process sync messages here
                    ...
            except asyncio.CancelledError:
                logger.debug("SyncKafkaComsumer consume cancelled")
            except Exception:
                logger.exception("Error in SyncKafkaComsumer.consume_messages")
    

    Start both consumers concurrently and shut down properly. If you want the app to start quickly, keep your asyncio.create_task(start_kafka()). Inside start_kafka(), await each start() because they now return quickly. On shutdown, call each .stop() to cancel tasks and stop the consumers.

    from fastapi import FastAPI
    from contextlib import asynccontextmanager
    import asyncio
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        FastAPICache.init(InMemoryBackend())
    
        async def start_kafka():
            logger.info("Starting Kafka initialization...")
            try:
                app.producer = await startup_producer()
    
                # Initialize instances and start (non-blocking now)
                app.consumer = KafkaConsumer(
                    topic="topic1",
                    bootstrap_servers="localhost:9092",
                    group_id="group-async",
                )
                await app.consumer.start()
    
                app.sync_consumer = SyncKafkaComsumer(
                    topic="topic2",
                    bootstrap_servers="localhost:9092",
                    group_id="group-sync",
                )
                await app.sync_consumer.start()
    
                logger.info("Finished Kafka initialization")
            except Exception:
                logger.exception("Failed to start Kafka components")
    
        # Start Kafka init without blocking app startup
        kafka_task = asyncio.create_task(start_kafka())
    
        yield
    
        # Cleanup Kafka components if they were started
        try:
            if hasattr(app, "consumer"):
                await app.consumer.stop()
            if hasattr(app, "sync_consumer"):
                await app.sync_consumer.stop()
            if hasattr(app, "producer"):
                await shutdown_producer()
        except Exception:
            logger.exception("Error during Kafka shutdown")
    ``
    

    Your consumers block because start() awaits a never-ending loop. Make start() return immediately and run the loop in a background task with asyncio.create_task(). Cancel that task on shutdown. If you can share a snippet of your AIOKafkaConsumer configs (topics/group IDs), we are in a better position to help ensure both consumers wont step on each other during rebalances and that they get exactly the messages you expect.

    Hope it gives you a clear direction now.