pythonapache-kafkafastapiaiokafka

Kafka consumers in FastAPI -> different services, different consumer, same producers


I have gone through these questions and some others, but unfortunately, I could not find an answer for my specific case even after reading the linked documents. Therefore, I believe this question is not a duplicate.

Can multiple Kafka consumers read same message from the partition

How multiple consumer group consumers work across partition on the same topic in Kafka?

How does Kafka achieve its parallelism with multiple consumption on the same topic same partition?

My question

We have a PHP side that handles Kafka producer operations and broadcasts messages related to product insert, update, and delete. This side has one topic named ‘products’ with three partitions: partition 0 for insert messages, partition 1 for update messages, and partition 2 for `delete’ messages.

On the other hand, we have additional services that need to listen to this topic. Each service performs operations on its own database independently, without relying on other services. All of these services handle their own insert, update, and delete operations. Before discussing my specific challenges, I’d like to mention the following points:

So my challenge is this:

1- auto-commit: True or False?

One of the options for configuring the consumer is the auto_commit setting. If I set it to false, all the messages in the partitions will remain as new ones even if my consumers read them. Initially, I thought this was the correct approach since my consumers are independent of each other. If one reads a message more quickly than others, it should not mark it as committed, allowing others to read it as well. However, after reading some questions and documentation, I realized that I may not fully understand the concept of auto_commit. Therefore, I’m unsure whether I should set it to true or false in order to receive messages in all my consumers.

2- group_id: what is it's role here?

Now that each service has only one consumer, what should I name their group_id? Should they all have different group_ids? How does it affect the message retrieval scenario here? I mean, do these choices have any effects on how my consumers receive the messages?

3- Partitions: Should I listen to all of them with one consumer?

As the partitions on kafka producer are brodcasting different action messages, I am not sure if I should listen to each with separate consumer, or listen to all 3 with one single consumer. The msg.header contains the action name (insert, update, or delete), so I can know what is each message after receiving it.

Overall, I described my main concerns here, and the usecase. Any help, tip, or guide is deeply appreciated.

By the way, this is the test consumer that I have so far:

import sys, json, asyncio
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from aiokafka import AIOKafkaConsumer, TopicPartition

def encode_json(msg):
    to_load = msg.value.decode("utf-8")
    return json.loads(to_load)


async def main():
    topic_partition = TopicPartition(topic="products", partition=1)
    consumer = AIOKafkaConsumer(
        bootstrap_servers="host:port",
        enable_auto_commit=False,
        group_id="update",
        auto_offset_reset="earliest",
    )
    consumer.assign([topic_partition])
    await consumer.start()
    try:
        async for msg in consumer:

            print(
                msg.topic,
                msg.key.decode("utf-8"),
                msg.headers,
                # json.loads(msg.value()),
            )
    finally:
        consumer.stop()


asyncio.run(main())

Solution

    1. You can auto commit, but that might skip events. You should commit manually after processing any event to ensure that doesn't happen. Note - For ordering purposes, deletes or updates could be processed before any insert when you use more than one partition, so you'll need to be careful about partitioning your data like this

    2. If you manually assign partitions, group id shouldn't be needed. Similarly, if there's only one group member, you'll only need to ensure that value is unique. If you run multiple python processes (for high availability, for example), then you'd end up with multiple consumers in that group,

    3. For higher consumption rate, one consumer per partition would be best, but reading all three (with the same consumer group) and parsing the event type would work as well