I currently have a C++ web sockets server using uWebSockets. I want to scale it horizontally using Redis. It means that I'll use this Redis client. However, I'm facing a problem with the implementation of pub/sub channels. Indeed, since the Redis channel subscription needs its own event loop (according to this example), and obviously the same for the uWebSockets app (see this example), I end up with two event loops. And my problem is that I don't know how to manage running these two loops properly.
I tried running them on two different threads, which works if they are totally independent of each other. However, since I want to broadcast the upcoming Redis message to all web sockets client, I need the uWebSockets app instance (see this example) in the Redis thread to broadcast it:
Subscriber sub = redis->subscriber();
sub.on_message([](std::string channel, std::string msg){
app->publish("broadcast", msg, (uWS::OpCode)1);
});
Therefore the two event loops are not independant of each other and when I received a message from Redis, it takes about 5 seconds before it is handled by the uWebSockets app.
Does someone know how to properly set up this Redis pus/sub feature ? Thank you for your help.
I managed to solve my problem.
I found that calling app->publish(...)
in my second thread was not thread-safe. Indeed, an interesting post showed me that in order to access the app from another thread, we have to use the method defer
on the event loop. Therefore, the structure becomes:
...
uWS::SSLApp *app = nullptr;
uWS::Loop *loop = nullptr;
Redis *redis = nullptr;
...
void redisEventLoopThread(Subscriber *sub) {
sub->on_message([](string channel, string msg) {
loop->defer([msg]() {
app->publish(channel, msg, ...);
});
});
sub->subscribe("channel_name");
while (true) {
try {
sub->consume();
} catch (const Error &err) {...}
}
}
...
int main() {
app = new uWS::SSLApp();
loop = uWS::Loop::get();
redis = new Redis(...);
Subscriber sub = redis->subscriber();
thread redisThread(redisEventLoopThread, &sub);
app->ws<...>(...).listen(...).run();
...
}