pythonwebsocketpython-asyncioreconnect

Python asyncio loop.run_forever()


I try to do a trading bot and the most things work fine. But everytime the internet connection is gone for a short time, the code fails. I use asyncio run_forever() function and I think that the code should run forever until it gets stopped, but it does not work.

Here is my code:

import json


async def listen():
    url = "wss://phemex.com/ws"
    async with websockets.connect(url) as ws:
        sub_msg = json.dumps({
            "id": sequence,
            "method": "kline.subscribe",
            "params": [symbol, kline_interval],
        })
        await ws.send(sub_msg)
        while True:
            msg = await ws.recv()
            msg = json.loads(msg)["kline"]

And I call the loop like this:

loop = asyncio.get_event_loop()
try: 
    loop.create_task(listen())
    asyncio.ensure_future(listen())
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    print("Closing Loop")
    loop.close()

As soon as the connection is lost, there is the following error:

Task exception was never retrieved
future: <Task finished coro=<listen() done, defined at c:\Users\danis\Desktop\AWS\Dateien\Crypto_Bot_Telegram_PSAR_PHEMEX_inkl_websocket_reconnect.py:351> exception=ConnectionClosedError('code = 1006 (connection closed abnormally [internal]), no reason',)>
Traceback (most recent call last):
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 750, in transfer_data
    message = await self.read_message()
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 819, in read_message
    frame = await self.read_data_frame(max_size=self.max_size)
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 895, in read_data_frame
    frame = await self.read_frame(max_size)
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 975, in read_frame
    extensions=self.extensions,
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\framing.py", line 55, in read
    data = await reader(2)
  File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "C:\Users\danis\.conda\envs\python36\lib\asyncio\selector_events.py", line 714, in _read_ready
    data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] Eine vorhandene Verbindung wurde vom Remotehost geschlossen

How can I run this code forever?


Solution

  • In asyncio the event loop is responsible of run asynchronous tasks, but it doesn't handle errors they can throw. There are different ways of run the loop, you can run it for execute specific tasks or run it forever, so it keeps running awaiting for new tasks.

    In your case the task listen is throwing an uncaught exception (ConnectionResetError), the event loop noitifies you about that with the traceback but it keeps running, maybe with other tasks (that means run forever). You are just notified, and like an error happened in your coroutine, the task stops running.

    Solution: Handle the error of the traceback in your courutine, you must make sure it will run forever, the event loop doesn't do that.

    async def listen():
        url = "wss://phemex.com/ws"
        while True:
            try:
                async with websockets.connect(url) as ws:
                    sub_msg = "{\"id\":" + str(
                        sequence) + ", \"method\": \"kline.subscribe\", \"params\":[\"" + symbol + "\"," 
                        + str(kline_interval) + "]}"
                    await ws.send(sub_msg)
                    while True:
                        msg = await ws.recv()
                        msg = json.loads(msg)["kline"]
            except ConnectionResetError:
                print("ConnectionResetError, reconnecting...")