pythonmultithreadingsocket.ioaiohttp

AttributeError: '_asyncio.Task' object has no attribute 'is_alive'


Whenever the socket client connect to the the socket server I get the following error:

Client connected:  3uRG6PtVuta3wHsbAAAD
message async handler error
Traceback (most recent call last):
  File "/home/ahmed/.local/lib/python3.10/site-packages/engineio/async_server.py", line 483, in run_async_handler
    return await self.handlers[event](*args)
  File "/home/ahmed/.local/lib/python3.10/site-packages/socketio/async_server.py", line 669, in _handle_eio_message
    await self._handle_connect(eio_sid, pkt.namespace, pkt.data)
  File "/home/ahmed/.local/lib/python3.10/site-packages/socketio/async_server.py", line 552, in _handle_connect
    success = await self._trigger_event(
  File "/home/ahmed/.local/lib/python3.10/site-packages/socketio/async_server.py", line 638, in _trigger_event
    ret = handler(*args)
  File "/home/ahmed/PycharmProjects/decimal-trade/prices.py", line 27, in connect
    if not thread.is_alive():
AttributeError: '_asyncio.Task' object has no attribute 'is_alive'

The socketio server

from aiohttp import web
from threading import Thread
import socketio
import requests 

sio = socketio.AsyncServer(async_mode='aiohttp', logger=True, engineio_logger=True, cors_allowed_origins='*')
app = web.Application()
sio.attach(app)

thread = Thread()
url = 'https://financialmodelingprep.com/api/v3/stock/real-time-price?apikey=yfmtiz4PGwIlCalGtRjsLcrQ9rF6NAHt'

async def server(request):
    return web.Response(text='Server Running...')

async def getPrices():
    while True:
        data = requests.get(url)
        data = data.json()
        data = data['stockList']
        await sio.emit('stocks', {'data': data}, namespace='/prices')
        await sio.sleep(60)

@sio.on('connect', namespace='/prices')
def connect(sid, environ):
    global thread
    print('Client connected: ', sid)
    if not thread.is_alive():
        thread = sio.start_background_task(getPrices)

@sio.on('disconnect', namespace='/prices')
def disconnect(sid):
    print('Client disconnected: ', sid)

app.router.add_static('/static', 'static')
app.router.add_get('/', server)

if __name__ == '__main__':
    web.run_app(app, host='127.0.0.1', port=8080)

The socket.io-client in react

import { io } from 'socket.io-client';

const URL_ = 'http://127.0.0.1:8080/prices';

export const socket = io.connect(URL_, {
    cors: {
        origin: 'http://127.0.0.1:8080',
        credentials: true,
    }
});

Solution

  • So, based on you wanting to create a background task to send the requests periodically to all connected clients, the below code should work better.

    The approach to create a background task is documented at: https://docs.aiohttp.org/en/stable/web_advanced.html

    As we are sending to all connected clients, we don't need to track connection/disconnection events and can remove those hooks.

    I've also replaced requests with aiohttp for the network request. Because requests is not async, it would result in blocking your entire server while waiting for the response.

    sio = socketio.AsyncServer(async_mode='aiohttp', logger=True, engineio_logger=True, cors_allowed_origins='*')
    app = web.Application()
    sio.attach(app)
    
    url = 'https://financialmodelingprep.com/api/v3/stock/real-time-price?apikey=yfmtiz4PGwIlCalGtRjsLcrQ9rF6NAHt'
    
    async def server(request):
        return web.Response(text='Server Running...')
    
    async def get_prices():
        async with aiohttp.ClientSession() as sess:
            while True:
                async with sess.get(url) as resp:
                    data = await resp.json()
                await sio.emit('stocks', {'data': data['stockList']}, namespace='/prices')
                await sio.sleep(60)
    
    async def run_prices_loop(_app):
        task = asyncio.create_task(get_prices())
    
        yield
    
        task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await task  # Ensure any exceptions etc. are raised.
    
    app.router.add_static('/static', 'static')
    app.router.add_get('/', server)
    app.cleanup_ctx.append(run_prices_loop)
    
    if __name__ == '__main__':
        web.run_app(app, host='127.0.0.1', port=8080)