I am running two kafka consummers in fastAPI application
Found out that if line exists await startup_consumer() then there are no logs of startup_sync_consumer() and vise a versa. Looks like each consumer blocks event loop. How to fix it?
from fastapi import FastAPI
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaConsumer
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start Kafka components in the background
async def start_kafka():
logger.info("Starting Kafka initialization...")
try:
app.producer = await startup_producer()
app.consumer = await startup_consumer()
app.sync_consumer = await startup_sync_consumer()
except Exception as e:
logger.error(f"Failed to start Kafka components: {str(e)}")
# Don't raise the exception, let the app continue running
else:
logger.info("Finished Kafka initialization")
FastAPICache.init(InMemoryBackend())
# Start Kafka in the background
asyncio.create_task(start_kafka())
yield
# Cleanup Kafka components if they were started
if hasattr(app, 'producer'):
await shutdown_producer()
if hasattr(app, 'consumer'):
await shutdown_consumer(app.consumer)
await shutdown_sync_consumer()
application = FastAPI(
title="Data Provider Service",
description="Service for managing geodata and geoservices",
version="1.0.0",
lifespan=lifespan
)
Consumers code
consumer = KafkaConsumer()
async def startup_consumer():
"""Start the Kafka consumer"""
logger.info("Starting Kafka consumer")
await consumer.start()
return consumer
# Consumers code
class KafkaConsumer:
def __init__(self):
...
async def start(self):
"""Start the Kafka consumer"""
try:
# Create and start consumer
self.consumer = AIOKafkaConsumer()
await self.consumer.start()
await self.consume_messages()
except:
...
async def consume_messages(self):
"""Consume messages from Kafka"""
try:
async for message in self.consumer:
data_str = message.value.decode("utf-8")
data = json.loads(data_str)
except:
logger.error(f"Error in consume_messages: {e} [{traceback.print_exc()}] ")
Sync consumer
async def startup_sync_consumer():
logger.info("Get Sync Kafka consumer")
global sync_consumer_instance
if sync_consumer_instance is None:
try:
sync_consumer_instance = SyncKafkaComsumer()
logger.info("Starting sync kafka consumer")
await sync_consumer_instance.start()
logger.info("Sync kafka consumer started successfully")
except Exception as e:
logger.error(f"Failed to start sync kafka consumer: {e}")
raise
return sync_consumer_instance
class SyncKafkaComsumer:
def __init__(self):
...
async def start(self):
"""Start the Kafka consumer"""
try:
# Create and start consumer
self.consumer = AIOKafkaConsumer()
logger.info("Kafka consumer object created")
await self.consumer.start()
await self.consume_messages()
...
async def consume_messages(self):
"""Consume messages from Kafka"""
try:
async for message in self.consumer:
Instead of awaiting consume_messages(), spawn it via asyncio.create_task() and store the task so you can cancel it on shutdown.
Consumer Class - Updated:
import asyncio
import json
import logging
from aiokafka import AIOKafkaConsumer
logger = logging.getLogger(__name__)
class KafkaConsumer:
def __init__(self, topic, bootstrap_servers, group_id):
self.topic = topic
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.consumer = None
self._consume_task = None
async def start(self):
"""Start the Kafka consumer and launch the consume loop in the background."""
try:
self.consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
enable_auto_commit=True, # adjust per your needs
)
await self.consumer.start()
logger.info("KafkaConsumer started for topic=%s group_id=%s", self.topic, self.group_id)
# Run the long-lived consume loop in the background and return immediately
self._consume_task = asyncio.create_task(self.consume_messages())
except Exception:
logger.exception("Failed to start KafkaConsumer")
async def stop(self):
"""Stop the consume loop and the consumer."""
try:
if self._consume_task:
self._consume_task.cancel()
try:
await self._consume_task
except asyncio.CancelledError:
pass
if self.consumer:
await self.consumer.stop()
logger.info("KafkaConsumer stopped for topic=%s group_id=%s", self.topic, self.group_id)
except Exception:
logger.exception("Error during KafkaConsumer stop")
async def consume_messages(self):
"""Consume messages in a long-running loop."""
try:
async for message in self.consumer:
try:
data_str = message.value.decode("utf-8")
data = json.loads(data_str)
# TODO: handle data
except Exception:
logger.exception("Failed to process message")
except asyncio.CancelledError:
# Task was cancelled during shutdown
logger.debug("consume_messages cancelled")
except Exception:
logger.exception("Error in consume_messages")
Same needs to be done for sync consumer. Even though you call it SyncKafkaComsumer, its still async (uses AIOKafkaConsumer). Mirror the same structure. You dont need to wait for the consume loop inside start(). Spawn a task and add a stop() method.
class SyncKafkaComsumer:
def __init__(self, topic, bootstrap_servers, group_id):
self.topic = topic
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.consumer = None
self._consume_task = None
async def start(self):
try:
self.consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
)
await self.consumer.start()
logger.info("SyncKafkaComsumer started")
self._consume_task = asyncio.create_task(self.consume_messages())
except Exception:
logger.exception("Failed to start SyncKafkaComsumer")
async def stop(self):
try:
if self._consume_task:
self._consume_task.cancel()
try:
await self._consume_task
except asyncio.CancelledError:
pass
if self.consumer:
await self.consumer.stop()
logger.info("SyncKafkaComsumer stopped")
except Exception:
logger.exception("Error stopping SyncKafkaComsumer")
async def consume_messages(self):
try:
async for message in self.consumer:
# process sync messages here
...
except asyncio.CancelledError:
logger.debug("SyncKafkaComsumer consume cancelled")
except Exception:
logger.exception("Error in SyncKafkaComsumer.consume_messages")
Start both consumers concurrently and shut down properly. If you want the app to start quickly, keep your asyncio.create_task(start_kafka()). Inside start_kafka(), await each start() because they now return quickly. On shutdown, call each .stop() to cancel tasks and stop the consumers.
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
FastAPICache.init(InMemoryBackend())
async def start_kafka():
logger.info("Starting Kafka initialization...")
try:
app.producer = await startup_producer()
# Initialize instances and start (non-blocking now)
app.consumer = KafkaConsumer(
topic="topic1",
bootstrap_servers="localhost:9092",
group_id="group-async",
)
await app.consumer.start()
app.sync_consumer = SyncKafkaComsumer(
topic="topic2",
bootstrap_servers="localhost:9092",
group_id="group-sync",
)
await app.sync_consumer.start()
logger.info("Finished Kafka initialization")
except Exception:
logger.exception("Failed to start Kafka components")
# Start Kafka init without blocking app startup
kafka_task = asyncio.create_task(start_kafka())
yield
# Cleanup Kafka components if they were started
try:
if hasattr(app, "consumer"):
await app.consumer.stop()
if hasattr(app, "sync_consumer"):
await app.sync_consumer.stop()
if hasattr(app, "producer"):
await shutdown_producer()
except Exception:
logger.exception("Error during Kafka shutdown")
``
Your consumers block because start() awaits a never-ending loop. Make start() return immediately and run the loop in a background task with asyncio.create_task(). Cancel that task on shutdown. If you can share a snippet of your AIOKafkaConsumer configs (topics/group IDs), we are in a better position to help ensure both consumers wont step on each other during rebalances and that they get exactly the messages you expect.
Hope it gives you a clear direction now.