pythonpython-multithreadingqthreadflask-socketiopython-socketio

Socketio background task issues with pubsub subscription


I am running a gunicorn server with 1 eventlet worker through which I communicate with the FE via websockets.

However, I also need to listen to an external pod running some code that will emit a redis pubsub event. Thus, my approach was to start a socketio background thread which inits the redis pubsub listener.

It apparently works fine until some point in which it just stops listening and I cannot determine how. This is how I start socketio in an extensions.py:

socketio = SocketIO(logger=False, engineio_logger=False, path='/socket.io', cors_allowed_origins='*', async_mode='eventlet', ping_timeout=10, ping_interval=60)

This is my listener code:

class RedisConnection:
    def __init__()
     .... 
    def subscription_listener(self):
        while True:
            try:
                pubsub = self.redis.pubsub()
                pubsub.subscribe(self.websocket_channel) 
                try:
                    for message in pubsub.listen():
                        try:
                            m = TypeAdapter(RedisEnvelopeMessage).validate_python(message)
                            self.logger.debug(f"Received from REDIS: {message}")
                            if m.type == 'message':
                                d = m.data
                                self.logger.debug(f"Received message from REDIS: {d}")
                                with self.app.app_context():
                                    # use d.module instead of RedisWS
                                    WSS = self.app.extensions.get(d.module, "RedisWS").ws_services.get(d.company_id)
                                    # TODO the reponse model should route to a WSService or to something different
                                    if d.message is not None:
                                        if isinstance(d.message, list):
                                            getattr(WSS, d.method)(*d.message)
                                        elif isinstance(d.message, dict):
                                            getattr(WSS, d.method)(d.message)
                                        elif isinstance(d.message, str):
                                            getattr(WSS, d.method)(d.message)
                                    else:
                                        getattr(WSS, d.method)()
                            elif m.type == 'subscribe':
                                self.logger.info(f"Subscribed to REDIS channel: {m.channel}")
                            else:
                                self.logger.info(f"Received message from REDIS but NOT PROCESSED: {message}")
                        except Exception as e:
                            self.logger.catch_exception(e)
                            self.logger.error(f"Pubsub parsing error: {e}").save("pubsub_listener")
                except Exception as e:
                    self.logger.catch_exception(e, level="critical")
                    self.logger.error(f"Pubsub listener error: {e}").save("pubsub_listener")
                    #socketio.sleep(2)
            except Exception as e:
                self.logger.catch_exception(e, level="critical")
                self.logger.error(f"Pubsub loop error: {e}").save("pubsub_listener")

In my flask application init:

def create_app(config_class=Config):
....
    socketio.init_app(app)
    with app.app_context():
        app.extensions["pubsub"] = socketio.start_background_task(redis_db_static.subscription_listener)

I don't know if someone has struggled with something similar. Many thanks!

It actually works, but after some time it just stops and I dont see any exception nor nothing. Restarting the server sets it back online. I was expecting the thread to run continously until the main gunicorn gets killed and restarted, which would actually restart it.


Solution

  • now it is working and solved thanks to @miguel-grinberg!

    First I switched to gevent instead of using eventlet and made some changes to my code, so instead of running the pubsub in the background as a thread I am running it with socketio default's.

    # extensions.py
    socketio = SocketIO(path='/socket.io', async_mode='gevent', cors_allowed_origins='*', ping_timeout=15, ping_interval=60)
    
    # __init__.py
    def create_app(config_class=Config):
    ....
        socketio.init_app(app, message_queue=redis_db_static.redis_url, channel=app.config.get("WEBSOCKET_CHANNEL"))
    

    Then within my redis publish method I did so that it could work both with websockets or with other channels/services and keep my websocket dispatcher services class (think that this code is running in a celery worker):

        def publish(self, pubsub_message: RedisPubSubMessage):
            try:
                if pubsub_message.module == "RedisWS":
                    WSS = self.app.extensions.get("RedisWS").ws_services.get(pubsub_message.company_id)
                    # TODO the reponse model should route to a WSService or to something different
                    if pubsub_message.message is not None:
                        if isinstance(pubsub_message.message, list):
                            getattr(WSS, pubsub_message.method)(*pubsub_message.message)
                        elif isinstance(pubsub_message.message, dict):
                            getattr(WSS, pubsub_message.method)(pubsub_message.message)
                        elif isinstance(pubsub_message.message, str):
                            getattr(WSS, pubsub_message.method)(pubsub_message.message)
                    else:
                        getattr(WSS, pubsub_message.method)()
                    self.logger.debug(f"Event emitted in socketio {self.socketio}: {pubsub_message.model_dump()}")
                    return "emitted to sockets"
                else:
                    # GENERIC PUBLISH
                    return self.redis.publish(self.channel, pubsub_message.model_dump_json())
            except Exception as e:
                self.logger.error(f"Pubsub publish error: {e}").save("pubsub_published")
    
    class WSService:
        def __init__(self, company, socketio):
            self._version = '2.2'
            self.socket = socketio
            self.logger = logger
    ...
        def new_message(self, message):
            if message.tracking_status != "hidden":
                message_payload = message.to_dict()
                self.socket.emit('new_message', message_payload, room=message.user.id)