I have a FastAPI app running inside docker which is deployed using portainer. Works fine after a few minutes but then suddenly it stops receiving any requests. I don't see any requests in the logs, instead when doing curl on the docker bridge port, it just simply forever hangs.
The portainer setup is basic, with only randomized port mappings.
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION=1.8.2
WORKDIR /app
RUN apt-get update && apt-get install -y \
curl \
build-essential \
postgresql-client \
libpq-dev \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH="${PATH}:/root/.local/bin"
RUN mkdir -p $HOME/.postgresql
RUN curl --create-dirs -o $HOME/.postgresql/root.crt 'https://cockroachlabs.cloud/clusters/.../cert'
COPY pyproject.toml poetry.lock* ./
RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi
COPY . .
EXPOSE 8121
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8121"]
The project is also splinted into a module.
main.py file looks like this
import secrets
from datetime import datetime
from typing import List
import fastapi
import uvicorn
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
from gpt_proxy.utils import mask_token
from .config import ADMIN_KEY, log
from .db import USERS, add_user, db_close, db_init, del_user
from .firebase_manager import Firebase
from .models import TokenCreate, TokenResponse, UserToken
from .openai_forward import OpenAiForward
app = fastapi.FastAPI()
forwarder = OpenAiForward()
api_key_header = APIKeyHeader(name="X-Admin-Key", auto_error=True)
def verify_admin_key(api_key: str = Security(api_key_header)):
if api_key != ADMIN_KEY:
raise HTTPException(status_code=403, detail="Invalid admin key")
return api_key
@app.on_event("startup")
async def startup():
log.info("Starting up OpenAI Forward application")
await db_init()
log.info("Application startup complete")
@app.on_event("shutdown")
async def shutdown():
log.info("Shutting down OpenAI Forward application")
if forwarder.client:
await forwarder.client.close()
await db_close()
log.info("Application shutdown complete")
@app.post("/tokens", response_model=TokenResponse)
async def create_token(
token_request: TokenCreate, api_key: str = Depends(verify_admin_key)
):
new_token = f"mn-{secrets.token_urlsafe(32)}"
return await add_user(username=token_request.username, token=new_token)
@app.delete("/tokens/{username}")
async def delete_token(username: str, api_key: str = Depends(verify_admin_key)):
await del_user(username)
return {"message": f"Token for user {username} has been deleted"}
@app.get("/tokens", response_model=List[UserToken])
async def list_users(api_key: str = Depends(verify_admin_key)):
users = []
for user in USERS:
user["token"] = mask_token(user["token"])
users.append(user)
return users
@app.route(
"/{api_path:path}",
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH", "TRACE"],
)
async def _handle_openai_request(request: fastapi.Request):
return await forwarder.reverse_proxy(request)
if __name__ == "__main__":
log.info("Starting OpenAI Forward server")
uvicorn.run(app, host="0.0.0.0", port=8010)
Database initialization:
import time
from tortoise import Tortoise
from ..config import log
from .models import FirebaseToken, User
USERS = []
async def _do_migration() -> None:
old_users = {
....
}
for k, v in old_users.items():
_ = await User.get_or_create(username=k, token=v, created_at=time.time())
async def _get_all_users() -> None:
users = await User.all()
if not users:
await _do_migration()
await _get_all_users()
for user in users:
USERS.append(dict(user))
async def db_init() -> None:
await Tortoise.init(db_url=DB_URI, modules={"models": ["gpt_proxy.db.models"]})
await Tortoise.generate_schemas()
await _get_all_users()
Openai forward class code. Not sure if this has any relevance since previous requests work just fine. So I'm thinking the blocking is somewhere else
import asyncio
import time
from functools import wraps
from typing import Any, AsyncGenerator, Callable, Tuple, Type, TypeVar
import aiohttp
import anyio
import fastapi
from fastapi import HTTPException
from starlette.responses import BackgroundTask, StreamingResponse
from .config import log
from .firebase_manager import Firebase
from .models import ClientConfig
from .utils import get_token_user, header_cloudflare_safe, mask_token
T = TypeVar("T")
def async_retry(
max_retries: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
) -> Callable:
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T | None:
current_delay = delay
for attempt in range(max_retries + 1):
try:
if attempt > 0:
log.info(
f"Retrying {func.__name__}, attempt {attempt}/{max_retries} "
f"after {current_delay:.2f}s delay"
)
await anyio.sleep(current_delay)
current_delay *= backoff
return await func(*args, **kwargs)
except exceptions as e:
log.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: "
f"{type(e).__name__}: {str(e)}"
)
if attempt == max_retries:
log.error(
f"All retry attempts failed for {func.__name__}. "
f"Final exception: {type(e).__name__}: {str(e)}"
)
raise
return None
return wrapper
return decorator
class OpenAiForward:
def __init__(self) -> None:
log.info("Initializing OpenAI Forward")
self.base_url = "https://api.openai.com/"
self.client: aiohttp.ClientSession | None = None
self.firebase = Firebase()
async def _init_client(self) -> None:
if self.client is None:
log.info("Initializing aiohttp client session")
tcp_connector = aiohttp.TCPConnector(
limit=500, limit_per_host=0, force_close=False
)
self.client = aiohttp.ClientSession(connector=tcp_connector)
log.info("aiohttp client session initialized")
async def _get_token(self, token: str):
log.info(f"Processing token {mask_token(token)}")
if token.startswith("mn-"):
username = await get_token_user(token)
if not username:
log.info("Using direct token")
return None, token
fb_token = await self.firebase.get_token()
return username, fb_token
else:
log.info("Using direct token")
return None, token
async def iter_bytes(
self, response: aiohttp.ClientResponse, request: fastapi.Request
) -> AsyncGenerator[bytes, Any]:
log.info(f"Streaming response for {request.url.path}")
async for chunk, _ in response.content.iter_chunks():
yield chunk
@async_retry(
max_retries=3,
delay=0.2,
backoff=0.2,
exceptions=(
aiohttp.ServerTimeoutError,
aiohttp.ServerConnectionError,
aiohttp.ServerDisconnectedError,
asyncio.TimeoutError,
anyio.EndOfStream,
RuntimeError,
),
)
async def send(
self, client_config: ClientConfig, data: dict | None = None
) -> aiohttp.client.ClientRequest | Any | None:
if not self.client:
await self._init_client()
log.info(f"Sending {client_config.method} request to {client_config.url}")
if self.client:
return await self.client.request(
method=client_config.method,
url=client_config.url,
data=data,
headers=client_config.headers,
)
return None
async def prepare_config(self, request: fastapi.Request) -> ClientConfig:
headers: dict = header_cloudflare_safe(request)
original_bearer: str = headers.get(
"Authorization", headers.get("authorization")
)
if original_bearer:
token: str = original_bearer.split()[-1].strip()
user, replacement_token = await self._get_token(token)
if replacement_token is None:
raise HTTPException(status_code=401, detail="Invalid token")
auth_header = f"Bearer {replacement_token}"
if "Authorization" in headers:
headers["Authorization"] = auth_header
elif "authorization" in headers:
headers["authorization"] = auth_header
log.info(
f"Token processing: User={user or 'direct'}, "
f"Using={'Firebase' if user else 'direct'} token"
)
url = f"https://api.openai.com/{request.url.path}"
if request.url.query:
url = f"{url}?{request.url.query}"
return ClientConfig(
headers=headers,
method=request.method,
url=url,
)
async def reverse_proxy(self, request: fastapi.Request) -> StreamingResponse:
request_id = str(time.time())
log.info(
f"[{request_id}] Incoming request: {request.method} {request.url.path}"
)
config = await self.prepare_config(request)
body = await request.body()
data = body if body else None
try:
log.info(f"[{request_id}] Forwarding request to OpenAI")
response = await self.send(config, data=data)
log.info(f"[{request_id}] OpenAI response received: {response.status}")
return StreamingResponse(
self.iter_bytes(response, request),
status_code=response.status,
media_type=response.headers.get("content-type"),
background=BackgroundTask(response.release),
)
except aiohttp.ClientError as e:
log.exception(f"[{request_id}] Failed to forward request to OpenAI")
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_502_BAD_GATEWAY,
detail=f"Failed to forward request: {str(e)}",
)
except Exception as e:
log.exception(f"[{request_id}] Unexpected error during request forwarding")
raise fastapi.HTTPException(
status_code=fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
I've went through a refractory and the problem seems to be gone. No idea what fixed it. Either because I launched the web app with the built-in fastapi command or that I specified the count of workers fastapi can use.
Now my dockerfile looks like this
FROM python:3.10-slim
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION=1.8.2
WORKDIR /app
RUN apt-get update && apt-get install -y \
curl \
build-essential \
postgresql-client \
libpq-dev \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH="${PATH}:/root/.local/bin"
RUN mkdir -p $HOME/.postgresql
RUN curl --create-dirs -o $HOME/.postgresql/root.crt 'https://cockroachlabs.cloud/clusters/1234/cert'
COPY pyproject.toml poetry.lock* ./
RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi
COPY . .
EXPOSE 8121
CMD ["fastapi", "run", "main", "--port", "8121", "--workers", "3"]