kubernetesapache-kafkaconcurrencyknative

KNative service not handling requests concurrently when using eventing with KafkaSource


Summary:

I'm trying to use KNative eventing to expose a simple web application via a Kafka topic. The server should be able to handle multiple requests concurrently, but, unfortunately, it seems to handle them sequentially when I send them via Kafka. When making simple HTTP requests directly to the service, though, the concurrency is working fine.

Setup:

The setup only uses a KafkaSource which points to my KNative Service, and is using a Kafka instance deployed using the bitnami/kafka helm chart.

The version I'm using is v1.7.1 for KNative serving and eventing, and v1.7.0 for the Kafka eventing integration (from knative-sandbox/eventing-kafka).

Code:

The service I'm trying to deploy is a python FastAPI application that, upon receiving a request (with an ID of sorts), logs the received request, sleeps for 5 seconds, then returns a dummy message:

import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
import logging

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
)

app = FastAPI()

class Item(BaseModel):
    id: str

@app.post("/")
async def root(item: Item):
    logging.debug(f"Request received with ID: {item.id}")
    await asyncio.sleep(5)
    logging.debug(f"Request complete for ID: {item.id}")
    return {"message": "Hello World"}

The app is served using uvicorn:

FROM python:3.9-slim

RUN pip install fastapi uvicorn

ADD main.py .

ENTRYPOINT uvicorn --host 0.0.0.0 --port 8877 main:app

The service deployment spec shows that I'm setting a containerConcurrency value that's greater than 1:

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: concurrency-test
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
        autoscaling.knative.dev/metric: "concurrency"
        autoscaling.knative.dev/target: "5"
    spec:
      containerConcurrency: 5
      containers:
        - name: app
          image: dev.local/concurrency-test:latest
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 8877
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: concurrency-test
spec:
  consumerGroup: concurrency-test-group
  bootstrapServers:
  - kafka.default.svc.cluster.local:9092
  topics:
  - concurrency-test-requests
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: concurrency-test

Note: I also tried with spec.consumers: 2 in the KafkaSource but the behavior was the same.

Logs:

When sending two concurrent requests to the service directly with HTTP, the logs to look like this (both requests finish within 6 seconds, so concurrency is in effect):

2022-09-12 02:14:36 DEBUG    Request received with ID: abc
2022-09-12 02:14:37 DEBUG    Request received with ID: def
2022-09-12 02:14:41 DEBUG    Request complete for ID: abc
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:14:42 DEBUG    Request complete for ID: def
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK

When sending requests via Kafka, though, the logs look like this (the requests are being processed one after the other):

2022-09-12 02:14:55 DEBUG    Request received with ID: 111
2022-09-12 02:15:00 DEBUG    Request complete for ID: 111
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:15:00 DEBUG    Request received with ID: 222
2022-09-12 02:15:05 DEBUG    Request complete for ID: 222
INFO:     10.42.0.7:0 - "POST / HTTP/1.1" 200 OK

Please let me know if this sequential request handling is the expected behavior when using eventing with just a KafkaSource, and I hope there are ways for enabling concurrency in this setup.


Solution

  • Kafka provides ordering within a partition (the implementation is a distributed log). You may need to change the number of partitions on your Kafka topic to achieve higher parallelism; you may be able to also use the spec.consumers value to increase the throughput (untested).

    I'd also encourage filing an issue in the eventing-kafka repo with your problem and any additional knobs if there is other behavior you're looking for.