I'm currently doing unit-tests with a fastapi
service using pytest
. I found that fastapi.testclient.TestClient
can only handle one post
request and subsequent call to TestClient.post
would fail. Here is an minimum reproducible example:
from fastapi.testclient import TestClient
from fastapi import FastAPI
from sse_starlette import EventSourceResponse
app = FastAPI()
@app.post("/hello")
def query_hello_1():
def hello():
yield "hello"
return EventSourceResponse(
hello(),
)
@app.post("/hello_2")
def query_hello_2():
def hello():
yield "hello_2"
return EventSourceResponse(
hello(),
)
client = TestClient(app)
def test_hello_1():
print(list(client.post("/hello").iter_lines()))
def test_hello_2():
print(list(client.post("/hello_2").iter_lines()))
Run this script with pytest
gives raise RuntimeError(f'{self!r} is bound to a different event loop')
error for the second test. But it is perfectly ok if I just return a plain text instead of an SSE
response. I wonder if there is a better approach to test api
s with SSE
response using TestClient
?
Using this fixture solves your issue:
@pytest.fixture
def reset_sse_starlette_appstatus_event():
"""
Fixture that resets the appstatus event in the sse_starlette app.
Should be used on any test that uses sse_starlette to stream events.
"""
# See https://github.com/sysid/sse-starlette/issues/59
from sse_starlette.sse import AppStatus
AppStatus.should_exit_event = None
Hope this helps!