The final goal is to cancel request on server side if client exceeds its timeout.
The code related to start the server:
def run_server_loop(
routes: web.RouteTableDef,
shutdown_state: ShutdownState,
logger: Logger,
*,
port: int,
periodic_callback: Callable[[], None] | None = None,
shutdown_callback: Callable[[], None] | None = None,
) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_run_server(
routes,
shutdown_state,
logger,
port=port,
periodic_callback=periodic_callback,
)
)
finally:
if shutdown_callback is not None:
shutdown_callback()
logger.info('Server stopped')
flush_logs()
async def _run_server(
routes: web.RouteTableDef,
shutdown_state: ShutdownState,
logger: Logger,
*,
port: int,
periodic_callback: Callable[[], None] | None = None,
) -> None:
try:
app = web.Application()
app.add_routes(routes)
runner = web.AppRunner(
app,
access_log_format=(
'%a %t [%D μs] "%r" %{Content-Length}i %s '
'%b "%{Referer}i" "%{User-Agent}i"'
),
)
await runner.setup()
site = web.TCPSite(runner, port=port)
await site.start()
logger.info(f'Listening {site.name}')
while not shutdown_state.is_shutdown_requested:
await asyncio.sleep(0.1)
if periodic_callback is not None:
periodic_callback()
await runner.cleanup()
except: # noqa
logger.critical('Unhandled exception', exc_info=True)
raise
Here is my endpoint code:
@routes.get('/ping')
async def handle_ping(_) -> web.Response:
try:
import time
import asyncio
for i in range(10):
await asyncio.sleep(1)
return web.json_response(
data=PingResult(
service_name=service_name,
version=SERVICE_VERSION,
storage_path=str(storage_dir.path),
daemon_pid=daemon.pid,
daemon_status=str(daemon.status.value),
).dict()
)
except asyncio.CancelledError as ce:
print('Request was cancelled')
return HTTPBadRequest(ErrorResult(error='Request was cancelled'))
Client code
async def ping(timeout=10) -> PingResult:
async with aiohttp.ClientSession(timeout=ClientTimeout(total=timeout)) as session:
async with session.get('http://localhost:5002/ping') as resp:
body = await resp.json()
return PingResult.parse_obj(body)
Models
from aiohttp import web
from pydantic import BaseModel
class ErrorResult(TypedDict):
error: str
class HTTPBadRequest(web.HTTPBadRequest):
def __init__(self, error: Mapping) -> None:
super().__init__(text=dumps(error), content_type='application/json')
class PingResult(BaseModel):
service_name: str
version: str
storage_path: str
daemon_pid: int
daemon_status: str
Even if I call ping(timeout=2)
I can see that the request on the server wasn't cancelled. Or if I call curl http://localhost:5002/ping
and terminate the command in less than 2-3 second I'm getting the same behavior (the server side code works without any termination).
It seems like I'm misunderstanding the whole idea of cancelling request but I can figure out how can I achieve my main goal.
In current versions of aiohttp, handler cancellation is an opt-in feature (due to it causing issues for developers coming from other frameworks).
It can be enabled by passing handler_cancellation=True
to web.run_app()
or the AppRunner
.
https://docs.aiohttp.org/en/stable/web_advanced.html#peer-disconnection