pythonwebsocketpython-asyncio

Stuck in await for async.Event after event was set


I create a websocket connection using python websockets.

I define an infinite background task to listen for server messages and pass them to the handler as they come.

To subscribe to a live quote I first need to send out a channel opening request. I send the request and to keep track of the channel status I create an asyncio.Event.

As soon as a CHANNEL_OPENED message from the server is received, I set the event in my message handler.

Meanwhile my main execution is waiting on this event but it never makes progress. My log chain tells me that websocket connection was successful and the channel is open. If I add breakpoints to the message handler and inspect runtime variables at the CHANNEL_OPENED point I can see the event as set but still am unable to proceed. I wonder what I am doing wrong here?

My main is basically this

import asyncio

from api import ApiClient


async def main():
    api = ApiClient(
        uid=_uid,
        remember_token=_remember_token
    )

    asyncio.create_task(api.message_handler())

    api.request_channel(1)  # sends the open channel request and creates the event in api.open_channels (a dictionary of channel numbers to asyncio.Event)

    event = api.open_channels.get(channel)
    await event.wait()  # Stuck here
    log.info("Subscribing options quotes for tracked symbols.")  # this point never reached
    for symbol in symbols:  # Symbols is a list of strings
        response = api.get(f"/option-chains/{symbol}")
        options = [OptionInstrument(**option) for option in response.json()["data"]["items"]]

        for option in options:
            subscription = asyncio.create_task(api.subscribe(option, channel))
            await subscription


if __name__ == "__main__":
    asyncio.run(main())

The channel was requested like this:

    def request_channel(self, channel: int):
        if self.open_channels.get(channel):
            log.info(f"Channel {channel} was already requested. Skipping request.")
            return

        self.open_channels[channel] = asyncio.Event()
        self.streamer.send(
            dumps(
                {
                    "type": "CHANNEL_REQUEST",
                    "channel": channel,
                    "service": "FEED",
                    "parameters": {"contract": "AUTO"},
                }
            )
        )
        log.debug(f"Requested opening of channel {channel}")

In my ApiClient definition I have the message handling loop

    async def message_handler(self):
        while True:
            await self._process_message(self.streamer.recv())

    async def _process_message(self, message: str):
        message = loads(message)
        log.debug(message)
        match message["type"]:
            case "AUTH_STATE" | "SETUP":
                if message.get("state") == "AUTHORIZED":
                    log.debug("Websocket streamer setup and authorized by the server.")

            case "CHANNEL_OPENED":
                self.open_channels[message["channel"]].set()
                log.debug(f"Channel {message["channel"]} open.")

            case "KEEPALIVE":
                self.streamer.send(dumps({"type": "KEEPALIVE", "channel": message["channel"]}))
                log.debug("Extended keep alive")

            case "ERROR":
                log.error(f"Streamer had an error. Restarting. Error message:\n\t{message}")
                self.streamer.close()
                self.open_channels = dict()
                self._streamer = None

            case _:
                log.warning(f"Unexpected message type received:\n\t{message}")

the api streamer object was configured with websockets:


    @property
    def streamer(self) -> ClientConnection:
        if not self._streamer:
            self._streamer = self._setup_streamer_websocket()
        return self._streamer

    def _setup_streamer_websocket(self) -> ClientConnection:
        websocket = connect(self.quote_streamer_url)

        setup_message_payload = {
            "type": "SETUP",
            "channel": 0,
            "keepaliveTimeout": 60,
            "acceptKeepaliveTimeout": 60,
            "version": "0.1",
        }

        websocket.send(message=dumps(setup_message_payload))

        auth_message_payload = {
            "type": "AUTH",
            "channel": 0,
            "token": self.quote_streamer_token,
        }

        websocket.send(dumps(auth_message_payload))

        return websocket

I get the following runtime log messages, including all server messages logged by api._process_message:

2024-08-03 11:57:02,405 - [INFO] [api.login:95]: Logged in.
2024-08-03 11:57:03,593 - [DEBUG] [api._get_streamer_credentials:152]: Got credentials.
2024-08-03 11:57:04,586 - [DEBUG] [api.request_channel:246]: Requested opening of channel 1
2024-08-03 11:57:04,715 - [DEBUG] [api._process_message:202]: {'type': 'SETUP', 'channel': 0, 'keepaliveTimeout': 60, 'acceptKeepaliveTimeout': 60, 'version': '1.0-1.0.1-20240426-113644'}
2024-08-03 11:57:04,716 - [DEBUG] [api._process_message:202]: {'type': 'AUTH_STATE', 'channel': 0, 'state': 'UNAUTHORIZED'}
2024-08-03 11:57:04,717 - [DEBUG] [api._process_message:202]: {'type': 'AUTH_STATE', 'channel': 0, 'state': 'AUTHORIZED', 'userId': <redacted>}
2024-08-03 11:57:04,717 - [DEBUG] [api._process_message:206]: Websocket streamer setup and authorized by the server.
2024-08-03 11:57:04,717 - [DEBUG] [api._process_message:202]: {'type': 'CHANNEL_OPENED', 'channel': 1, 'service': 'FEED', 'parameters': {'contract': 'AUTO', 'subFormat': 'LIST'}}
2024-08-03 11:57:04,718 - [DEBUG] [api._process_message:210]: Channel 1 open.
2024-08-03 11:57:34,715 - [DEBUG] [api._process_message:202]: {'type': 'KEEPALIVE', 'channel': 0}
2024-08-03 11:57:34,716 - [DEBUG] [api._process_message:214]: Extended keep alive
2024-08-03 11:58:04,715 - [DEBUG] [api._process_message:202]: {'type': 'KEEPALIVE', 'channel': 0}
2024-08-03 11:58:04,716 - [DEBUG] [api._process_message:214]: Extended keep alive

After this last log I am stuck in the keep alive loop waiting on the Event.

I have no idea what I am doing wrong, have tried with defining new event loops, changing async logic and adding awaits in all server interactions but it didnt change my results for the better.

Miniconda environment information:


Solution

  • Since your self.streamer.recv() is blocking, message_handler never yields control back to asyncio event loop and it cannot resume the main task.