I am having some issues related to the twisted http framework. Specifically, I am trying to implement the usage of threads.deferToThread
with callbacks, to allow for better concurrency, which works fine until you have a higher load of traffic trying to access the server at once. In that case, the callbacks often fail to execute at all, and nothing is being written to the transport.
To put it short, on each request, the server will process packets sent by the client inside a deferred thread. The server then writes response packets to a queue, which the on_request_done
callback then dequeues and returns as a response. As previously mentioned, this works perfectly fine with 1 or 2 clients.
Here is a simplified version of the code I am working with:
class HttpPlayer(Player):
def __init__(self, address: str, port: int) -> None:
super().__init__(address, port)
self.queue = Queue()
self.token = ""
def enqueue(self, data: bytes):
self.queue.put(data)
def dequeue(self, max: int = 4096) -> bytes:
data = b""
while not self.queue.empty():
data += self.queue.get()
return data
class HttpBanchoProtocol(Resource):
isLeaf = True
def __init__(self) -> None:
self.player: HttpPlayer | None = None
self.children = {}
def handle_login_request(self, request: Request) -> bytes:
username, password, client = (
request.content.read().decode().splitlines()
)
deferred = threads.deferToThread(
self.player.login_received,
username,
password,
client
)
deferred.addCallbacks(
lambda _: self.on_request_done(request),
lambda f: self.on_request_error(request, f)
)
return NOT_DONE_YET
def handle_request(self, request: Request) -> bytes:
deferred = threads.deferToThread(
self.process_packets,
request.content.read()
)
deferred.addCallbacks(
lambda _: self.on_request_done(request),
lambda f: self.on_request_error(request, f)
)
return NOT_DONE_YET
def login_received(self, username: str, password: str, client: str) -> None:
# Processing login here
...
def process_packets(self, request: bytes):
# Processing packets here
...
def on_request_done(self, request: Request) -> None:
if request._disconnected:
self.player.logger.warning('Client disconnected before response')
return
if request.finished:
self.player.logger.warning('Request finished before response')
return
request.write(self.player.dequeue())
request.finish()
def on_request_error(
self,
request: Request,
failiure: Failure
) -> None:
request.setResponseCode(500)
self.player.send_error()
self.player.logger.error(
f'Failed to process request: {failiure.getErrorMessage()}',
exc_info=failiure.value
)
self.on_login_done(request)
def render_POST(self, request: Request) -> bytes:
request.setResponseCode(200)
if not (token := request.getHeader('token')):
return self.handle_login_request(request)
if not (player := app.session.players.by_token(token)):
request.setResponseCode(401)
return b""
self.player = player
return self.handle_request(request)
Here is the code I am working on for more reference if needed: https://github.com/osuTitanic/anchor/blob/044a9b75dee8d36816bbb5d7af67dc8902f2d34e/app/http.py#L99
The solution here was to remove the player
attribute from the class, ensuring that each request operated with its own instance of the player. This change improved thread safety and allowed the server to handle multiple requests concurrently without issues, resulting in correct responses being sent back to clients.