I am running a gunicorn server with 1 eventlet worker through which I communicate with the FE via websockets.
However, I also need to listen to an external pod running some code that will emit a redis pubsub event. Thus, my approach was to start a socketio background thread which inits the redis pubsub listener.
It apparently works fine until some point in which it just stops listening and I cannot determine how. This is how I start socketio in an extensions.py:
socketio = SocketIO(logger=False, engineio_logger=False, path='/socket.io', cors_allowed_origins='*', async_mode='eventlet', ping_timeout=10, ping_interval=60)
This is my listener code:
class RedisConnection:
def __init__()
....
def subscription_listener(self):
while True:
try:
pubsub = self.redis.pubsub()
pubsub.subscribe(self.websocket_channel)
try:
for message in pubsub.listen():
try:
m = TypeAdapter(RedisEnvelopeMessage).validate_python(message)
self.logger.debug(f"Received from REDIS: {message}")
if m.type == 'message':
d = m.data
self.logger.debug(f"Received message from REDIS: {d}")
with self.app.app_context():
# use d.module instead of RedisWS
WSS = self.app.extensions.get(d.module, "RedisWS").ws_services.get(d.company_id)
# TODO the reponse model should route to a WSService or to something different
if d.message is not None:
if isinstance(d.message, list):
getattr(WSS, d.method)(*d.message)
elif isinstance(d.message, dict):
getattr(WSS, d.method)(d.message)
elif isinstance(d.message, str):
getattr(WSS, d.method)(d.message)
else:
getattr(WSS, d.method)()
elif m.type == 'subscribe':
self.logger.info(f"Subscribed to REDIS channel: {m.channel}")
else:
self.logger.info(f"Received message from REDIS but NOT PROCESSED: {message}")
except Exception as e:
self.logger.catch_exception(e)
self.logger.error(f"Pubsub parsing error: {e}").save("pubsub_listener")
except Exception as e:
self.logger.catch_exception(e, level="critical")
self.logger.error(f"Pubsub listener error: {e}").save("pubsub_listener")
#socketio.sleep(2)
except Exception as e:
self.logger.catch_exception(e, level="critical")
self.logger.error(f"Pubsub loop error: {e}").save("pubsub_listener")
In my flask application init:
def create_app(config_class=Config):
....
socketio.init_app(app)
with app.app_context():
app.extensions["pubsub"] = socketio.start_background_task(redis_db_static.subscription_listener)
I don't know if someone has struggled with something similar. Many thanks!
It actually works, but after some time it just stops and I dont see any exception nor nothing. Restarting the server sets it back online. I was expecting the thread to run continously until the main gunicorn gets killed and restarted, which would actually restart it.
now it is working and solved thanks to @miguel-grinberg!
First I switched to gevent
instead of using eventlet
and made some changes to my code, so instead of running the pubsub in the background as a thread I am running it with socketio default's.
# extensions.py
socketio = SocketIO(path='/socket.io', async_mode='gevent', cors_allowed_origins='*', ping_timeout=15, ping_interval=60)
# __init__.py
def create_app(config_class=Config):
....
socketio.init_app(app, message_queue=redis_db_static.redis_url, channel=app.config.get("WEBSOCKET_CHANNEL"))
Then within my redis publish method I did so that it could work both with websockets or with other channels/services and keep my websocket dispatcher services class (think that this code is running in a celery worker):
def publish(self, pubsub_message: RedisPubSubMessage):
try:
if pubsub_message.module == "RedisWS":
WSS = self.app.extensions.get("RedisWS").ws_services.get(pubsub_message.company_id)
# TODO the reponse model should route to a WSService or to something different
if pubsub_message.message is not None:
if isinstance(pubsub_message.message, list):
getattr(WSS, pubsub_message.method)(*pubsub_message.message)
elif isinstance(pubsub_message.message, dict):
getattr(WSS, pubsub_message.method)(pubsub_message.message)
elif isinstance(pubsub_message.message, str):
getattr(WSS, pubsub_message.method)(pubsub_message.message)
else:
getattr(WSS, pubsub_message.method)()
self.logger.debug(f"Event emitted in socketio {self.socketio}: {pubsub_message.model_dump()}")
return "emitted to sockets"
else:
# GENERIC PUBLISH
return self.redis.publish(self.channel, pubsub_message.model_dump_json())
except Exception as e:
self.logger.error(f"Pubsub publish error: {e}").save("pubsub_published")
class WSService:
def __init__(self, company, socketio):
self._version = '2.2'
self.socket = socketio
self.logger = logger
...
def new_message(self, message):
if message.tracking_status != "hidden":
message_payload = message.to_dict()
self.socket.emit('new_message', message_payload, room=message.user.id)