I'm trying to use KNative eventing to expose a simple web application via a Kafka topic. The server should be able to handle multiple requests concurrently, but, unfortunately, it seems to handle them sequentially when I send them via Kafka. When making simple HTTP requests directly to the service, though, the concurrency is working fine.
The setup only uses a KafkaSource
which points to my KNative Service
, and is using a Kafka instance deployed using the bitnami/kafka
helm chart.
The version I'm using is v1.7.1
for KNative serving and eventing, and v1.7.0
for the Kafka eventing integration (from knative-sandbox/eventing-kafka
).
The service I'm trying to deploy is a python FastAPI application that, upon receiving a request (with an ID of sorts), logs the received request, sleeps for 5 seconds, then returns a dummy message:
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
import logging
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.DEBUG, datefmt="%Y-%m-%d %H:%M:%S",
)
app = FastAPI()
class Item(BaseModel):
id: str
@app.post("/")
async def root(item: Item):
logging.debug(f"Request received with ID: {item.id}")
await asyncio.sleep(5)
logging.debug(f"Request complete for ID: {item.id}")
return {"message": "Hello World"}
The app is served using uvicorn:
FROM python:3.9-slim
RUN pip install fastapi uvicorn
ADD main.py .
ENTRYPOINT uvicorn --host 0.0.0.0 --port 8877 main:app
The service deployment spec shows that I'm setting a containerConcurrency
value that's greater than 1
:
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: concurrency-test
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
autoscaling.knative.dev/metric: "concurrency"
autoscaling.knative.dev/target: "5"
spec:
containerConcurrency: 5
containers:
- name: app
image: dev.local/concurrency-test:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8877
---
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: concurrency-test
spec:
consumerGroup: concurrency-test-group
bootstrapServers:
- kafka.default.svc.cluster.local:9092
topics:
- concurrency-test-requests
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: concurrency-test
Note: I also tried with
spec.consumers: 2
in theKafkaSource
but the behavior was the same.
When sending two concurrent requests to the service directly with HTTP, the logs to look like this (both requests finish within 6 seconds, so concurrency is in effect):
2022-09-12 02:14:36 DEBUG Request received with ID: abc
2022-09-12 02:14:37 DEBUG Request received with ID: def
2022-09-12 02:14:41 DEBUG Request complete for ID: abc
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:14:42 DEBUG Request complete for ID: def
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
When sending requests via Kafka, though, the logs look like this (the requests are being processed one after the other):
2022-09-12 02:14:55 DEBUG Request received with ID: 111
2022-09-12 02:15:00 DEBUG Request complete for ID: 111
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
2022-09-12 02:15:00 DEBUG Request received with ID: 222
2022-09-12 02:15:05 DEBUG Request complete for ID: 222
INFO: 10.42.0.7:0 - "POST / HTTP/1.1" 200 OK
Please let me know if this sequential request handling is the expected behavior when using eventing with just a KafkaSource
, and I hope there are ways for enabling concurrency in this setup.
Kafka provides ordering within a partition (the implementation is a distributed log). You may need to change the number of partitions on your Kafka topic to achieve higher parallelism; you may be able to also use the spec.consumers
value to increase the throughput (untested).
I'd also encourage filing an issue in the eventing-kafka
repo with your problem and any additional knobs if there is other behavior you're looking for.