dockerfastapi

FastAPI inside docker stopped receiving any requests after a while


I have a FastAPI app running inside docker which is deployed using portainer. Works fine after a few minutes but then suddenly it stops receiving any requests. I don't see any requests in the logs, instead when doing curl on the docker bridge port, it just simply forever hangs.

The portainer setup is basic, with only randomized port mappings.

FROM python:3.10-slim

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION=1.8.2

WORKDIR /app

RUN apt-get update && apt-get install -y \
    curl \
    build-essential \
    postgresql-client \
    libpq-dev \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

RUN curl -sSL https://install.python-poetry.org | python3 -

ENV PATH="${PATH}:/root/.local/bin"

RUN mkdir -p $HOME/.postgresql

RUN curl --create-dirs -o $HOME/.postgresql/root.crt 'https://cockroachlabs.cloud/clusters/.../cert'

COPY pyproject.toml poetry.lock* ./

RUN poetry config virtualenvs.create false \
    && poetry install --no-interaction --no-ansi

COPY . .

EXPOSE 8121

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8121"]

The project is also splinted into a module.

main.py file looks like this

import secrets
from datetime import datetime
from typing import List

import fastapi
import uvicorn
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader

from gpt_proxy.utils import mask_token

from .config import ADMIN_KEY, log
from .db import USERS, add_user, db_close, db_init, del_user
from .firebase_manager import Firebase
from .models import TokenCreate, TokenResponse, UserToken
from .openai_forward import OpenAiForward

app = fastapi.FastAPI()
forwarder = OpenAiForward()
api_key_header = APIKeyHeader(name="X-Admin-Key", auto_error=True)


def verify_admin_key(api_key: str = Security(api_key_header)):
    if api_key != ADMIN_KEY:
        raise HTTPException(status_code=403, detail="Invalid admin key")
    return api_key


@app.on_event("startup")
async def startup():
    log.info("Starting up OpenAI Forward application")
    await db_init()
    log.info("Application startup complete")


@app.on_event("shutdown")
async def shutdown():
    log.info("Shutting down OpenAI Forward application")
    if forwarder.client:
        await forwarder.client.close()
    await db_close()
    log.info("Application shutdown complete")


@app.post("/tokens", response_model=TokenResponse)
async def create_token(
    token_request: TokenCreate, api_key: str = Depends(verify_admin_key)
):
    new_token = f"mn-{secrets.token_urlsafe(32)}"
    return await add_user(username=token_request.username, token=new_token)


@app.delete("/tokens/{username}")
async def delete_token(username: str, api_key: str = Depends(verify_admin_key)):
    await del_user(username)
    return {"message": f"Token for user {username} has been deleted"}


@app.get("/tokens", response_model=List[UserToken])
async def list_users(api_key: str = Depends(verify_admin_key)):
    users = []
    for user in USERS:
        user["token"] = mask_token(user["token"])
        users.append(user)
    return users


@app.route(
    "/{api_path:path}",
    methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH", "TRACE"],
)
async def _handle_openai_request(request: fastapi.Request):
    return await forwarder.reverse_proxy(request)


if __name__ == "__main__":
    log.info("Starting OpenAI Forward server")
    uvicorn.run(app, host="0.0.0.0", port=8010)

Database initialization:

import time

from tortoise import Tortoise

from ..config import log
from .models import FirebaseToken, User

USERS = []


async def _do_migration() -> None:
    old_users = {
        ....
    }

    for k, v in old_users.items():
        _ = await User.get_or_create(username=k, token=v, created_at=time.time())


async def _get_all_users() -> None:
    users = await User.all()
    if not users:
        await _do_migration()
        await _get_all_users()
    for user in users:
        USERS.append(dict(user))


async def db_init() -> None:
    await Tortoise.init(db_url=DB_URI, modules={"models": ["gpt_proxy.db.models"]})

    await Tortoise.generate_schemas()
    await _get_all_users()

Openai forward class code. Not sure if this has any relevance since previous requests work just fine. So I'm thinking the blocking is somewhere else

import asyncio
import time
from functools import wraps
from typing import Any, AsyncGenerator, Callable, Tuple, Type, TypeVar

import aiohttp
import anyio
import fastapi
from fastapi import HTTPException
from starlette.responses import BackgroundTask, StreamingResponse

from .config import log
from .firebase_manager import Firebase
from .models import ClientConfig
from .utils import get_token_user, header_cloudflare_safe, mask_token

T = TypeVar("T")


def async_retry(
    max_retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,),
) -> Callable:
    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> T | None:
            current_delay = delay
            for attempt in range(max_retries + 1):
                try:
                    if attempt > 0:
                        log.info(
                            f"Retrying {func.__name__}, attempt {attempt}/{max_retries} "
                            f"after {current_delay:.2f}s delay"
                        )
                        await anyio.sleep(current_delay)
                        current_delay *= backoff

                    return await func(*args, **kwargs)

                except exceptions as e:
                    log.warning(
                        f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: "
                        f"{type(e).__name__}: {str(e)}"
                    )

                    if attempt == max_retries:
                        log.error(
                            f"All retry attempts failed for {func.__name__}. "
                            f"Final exception: {type(e).__name__}: {str(e)}"
                        )
                        raise

            return None

        return wrapper

    return decorator


class OpenAiForward:
    def __init__(self) -> None:
        log.info("Initializing OpenAI Forward")
        self.base_url = "https://api.openai.com/"
        self.client: aiohttp.ClientSession | None = None
        self.firebase = Firebase()

    async def _init_client(self) -> None:
        if self.client is None:
            log.info("Initializing aiohttp client session")
            tcp_connector = aiohttp.TCPConnector(
                limit=500, limit_per_host=0, force_close=False
            )
            self.client = aiohttp.ClientSession(connector=tcp_connector)
            log.info("aiohttp client session initialized")

    async def _get_token(self, token: str):
        log.info(f"Processing token {mask_token(token)}")
        if token.startswith("mn-"):
            username = await get_token_user(token)
            if not username:
                log.info("Using direct token")
                return None, token
            fb_token = await self.firebase.get_token()
            return username, fb_token
        else:
            log.info("Using direct token")
            return None, token

    async def iter_bytes(
        self, response: aiohttp.ClientResponse, request: fastapi.Request
    ) -> AsyncGenerator[bytes, Any]:
        log.info(f"Streaming response for {request.url.path}")
        async for chunk, _ in response.content.iter_chunks():
            yield chunk

    @async_retry(
        max_retries=3,
        delay=0.2,
        backoff=0.2,
        exceptions=(
            aiohttp.ServerTimeoutError,
            aiohttp.ServerConnectionError,
            aiohttp.ServerDisconnectedError,
            asyncio.TimeoutError,
            anyio.EndOfStream,
            RuntimeError,
        ),
    )
    async def send(
        self, client_config: ClientConfig, data: dict | None = None
    ) -> aiohttp.client.ClientRequest | Any | None:
        if not self.client:
            await self._init_client()

        log.info(f"Sending {client_config.method} request to {client_config.url}")
        if self.client:
            return await self.client.request(
                method=client_config.method,
                url=client_config.url,
                data=data,
                headers=client_config.headers,
            )
        return None

    async def prepare_config(self, request: fastapi.Request) -> ClientConfig:
        headers: dict = header_cloudflare_safe(request)
        original_bearer: str = headers.get(
            "Authorization", headers.get("authorization")
        )

        if original_bearer:
            token: str = original_bearer.split()[-1].strip()
            user, replacement_token = await self._get_token(token)

            if replacement_token is None:
                raise HTTPException(status_code=401, detail="Invalid token")

            auth_header = f"Bearer {replacement_token}"
            if "Authorization" in headers:
                headers["Authorization"] = auth_header
            elif "authorization" in headers:
                headers["authorization"] = auth_header

            log.info(
                f"Token processing: User={user or 'direct'}, "
                f"Using={'Firebase' if user else 'direct'} token"
            )

        url = f"https://api.openai.com/{request.url.path}"
        if request.url.query:
            url = f"{url}?{request.url.query}"

        return ClientConfig(
            headers=headers,
            method=request.method,
            url=url,
        )

    async def reverse_proxy(self, request: fastapi.Request) -> StreamingResponse:
        request_id = str(time.time())
        log.info(
            f"[{request_id}] Incoming request: {request.method} {request.url.path}"
        )

        config = await self.prepare_config(request)
        body = await request.body()
        data = body if body else None

        try:
            log.info(f"[{request_id}] Forwarding request to OpenAI")
            response = await self.send(config, data=data)
            log.info(f"[{request_id}] OpenAI response received: {response.status}")

            return StreamingResponse(
                self.iter_bytes(response, request),
                status_code=response.status,
                media_type=response.headers.get("content-type"),
                background=BackgroundTask(response.release),
            )

        except aiohttp.ClientError as e:
            log.exception(f"[{request_id}] Failed to forward request to OpenAI")
            raise fastapi.HTTPException(
                status_code=fastapi.status.HTTP_502_BAD_GATEWAY,
                detail=f"Failed to forward request: {str(e)}",
            )
        except Exception as e:
            log.exception(f"[{request_id}] Unexpected error during request forwarding")
            raise fastapi.HTTPException(
                status_code=fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"Internal server error: {str(e)}",
            )

Solution

  • I've went through a refractory and the problem seems to be gone. No idea what fixed it. Either because I launched the web app with the built-in fastapi command or that I specified the count of workers fastapi can use.

    Now my dockerfile looks like this

    FROM python:3.10-slim
    
    ENV PYTHONDONTWRITEBYTECODE 1
    ENV PYTHONUNBUFFERED 1
    ENV POETRY_VERSION=1.8.2
    
    WORKDIR /app
    
    RUN apt-get update && apt-get install -y \
        curl \
        build-essential \
        postgresql-client \
        libpq-dev \
        ca-certificates \
        && rm -rf /var/lib/apt/lists/*
    
    RUN curl -sSL https://install.python-poetry.org | python3 -
    
    ENV PATH="${PATH}:/root/.local/bin"
    
    RUN mkdir -p $HOME/.postgresql
    
    RUN curl --create-dirs -o $HOME/.postgresql/root.crt 'https://cockroachlabs.cloud/clusters/1234/cert'
    
    COPY pyproject.toml poetry.lock* ./
    
    RUN poetry config virtualenvs.create false \
        && poetry install --no-interaction --no-ansi
    
    COPY . .
    
    EXPOSE 8121
    
    CMD ["fastapi", "run", "main", "--port", "8121", "--workers", "3"]