pythondjangoapache-kafkaapache-zookeeperkafka-python

Kafka consumer is missing messages during deployment


My consumers are inherited from the BasicKafkaConsumerV2. During deployments when the pods are rotating I am missing few messages which is visible from the offsets printed after the manual commit(). Kafka is not supposed to miss messages if not committed. What could be the problem here.

run command:

      - name: order-consumer
      image: KUSTOMIZE_PRIMARY
      imagePullPolicy: Always
      command:
        [
        # Invoking wait for pgbouncer script
          "/wait-for.sh",
          "localhost:6432",
          "-s",
          "-t",
          "30",
          "--",
        # Starting main process
          "ddtrace-run",
          "python",
          "manage.py",
          "run_order-consumer",
        ]

Consumer:

class BasicKafkaConsumerV2:
    group_id = None  # str
    consumer_name = None  # str
    newrelic_application = None

    topic_handlers = {}  # dict

    DB_EXCEPTION_RETRY_TIMEOUT = 5  # seconds
    DLQ_TOPIC = None

    def __init__(self, latest_offset=False):
        """Inits the Consumer and subscribes to the topics"""
        self.consumer = KafkaConsumer(
            bootstrap_servers=["broker1", "broker2"],
            group_id=self.group_id,
            enable_auto_commit=False,
            auto_offset_reset="latest",
        )
        self.topics_list = list(self.topic_handlers.keys())
        self.consumer.subscribe(self.topics_list)
        self.newrelic_application = newrelic.agent.application()
        logger.info(
            f"{[self.consumer_name]} subscribed to {self.topics_list} with auto_offset_reset {self.auto_offset_reset}"
        )

    def message_handler_wrapped(
        self,
        topic: str,
        kafka_msg_value: bytes,
        headers: dict,
        consumed_message=None,
    ):
        """Processes the message
        Also handles any DB exceptions by retrying the event after a period
        """
        with tracer.trace(
            settings.DD_KAFKA_RESOURCE_NAME,
            service=settings.DD_SERVICE,
            resource=self.group_id,
            span_type="consumer",
        ) as span:
            try:
                json_data = json.loads(kafka_msg_value)
                dict_headers = convert_tuple_to_dict(headers)

                span.set_tag("topic", topic)
                span.set_tag("event", self.get_event_name(json_data))

                self.message_handler(topic, json_data, dict_headers)

            except (InterfaceError, OperationalError) as e:
                """Sleep for sometime to allow the DB to heal
                This will essentially infinitely loop (further processing of events is blocked)
                """
                logger.info(f"[{self.consumer_name}] DB Exception: {e}")
                span.set_tag("type", "retry")
                span.set_exc_info(type(e), e, e.__traceback__)
                time.sleep(self.DB_EXCEPTION_RETRY_TIMEOUT)
                self.message_handler_wrapped(
                    topic, kafka_msg_value, headers, consumed_message
                )

            except Exception as e:
                logger.exception(f"[{self.consumer_name}] Exception: {e}")
                span.set_tag("type", "error")
                span.set_exc_info(type(e), e, e.__traceback__)
                sentry_sdk.capture_exception(e)

    def message_handler(self, topic: str, data: dict, headers: dict):
        """Handles the message"""

        event = self.get_event_name(data)
        topic_handler = self.topic_handlers.get(topic)
        topic_handler.handle_message(event, data, headers)

    def start_consumer(self):
        """Starts consuming messages on the topic"""

        logger.info(f"Consumer [{self.consumer_name}] is starting consuming")

        for msg in self.consumer:
            with LogGuidSetter() as _:
                self.message_handler_wrapped(
                    msg.topic, msg.value, msg.headers, msg
                )
                self.consumer.commit()
                logger.info(
                    f"[{self.consumer_name}] Consumed message from partition: {msg.partition} offset: {msg.offset} with key: {msg.key}"
                )

    def get_event_name(self, data):
        return data.get("event") or data.get("event_name")


    class TopicEventHandler:
        topic = None
        event_handler_mapping = {}  # event name and their fn handlers
    
        def handle_message(self, event, data, headers):
            """Handles the message"""
    
            event_handler = getattr(
                self, self.event_handler_mapping.get(event, ""), None
            )
            if event_handler is None:
                logger.info(f"Topic <{self.topic}> unhandled event : {event}")
                return
    
            event_handler(data, headers)

Solution

  • Could you provide the log entries which shows that messages are missed?

    It seems that the consumer commits the current offset, and then logs that it has consumed a message from the partition.

    What could be happening is that after committing, the pod is terminated by (lets say) Kubernetes without giving your program enough time to finish logging out that it has consumed the message.

    You can configure terminationGracePeriodSeconds as part of your pod deployment specification.

    As part of your python program, you can also capture the SIGTERM event when your pod is asked to stop.

    signal.signal(signal.SIGTERM, graceful_shutdown)
    

    graceful_shutdown would be a method which would instruct your consumer handle any current messages it has received from kafka, commit it's offsets back, log out that it has handled those messages, and finally, gracefully stop the kafka consumer.

    At that point it can then exit cleanly.