I am issuing a Redis subscribe within WebSocket (WS). When I receive the WS open, I thread the request and then instantiate the Redis client. Within the open, I thread for Redis and issue the subscription.
This all works fine, until I receive an unexpected WS close. At that point, the thread running the Redis subscription is gone. If I issue an unsubscribe, I get a hang. If I don't unsubscribe, I have left a phantom subscription that causes me trouble next go round.
Is there some way to delete a subscription once the thread that issued it has terminated? I have noted that the Redis instance has a mon variable for that terminated thread. Sample Ruby code is:
class Backend
include MInit
def initialize(app)
setup
@app = app
end
def run!(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
ws_thread = Thread.fork(env) do
credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)
ws.on :open do |event|
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
redis_thread = Thread.fork do
credis.subscribe(channel) do |on|
on.message do |message_channel, message|
sent = ws.send(message)
end
on.unsubscribe do |message_channel|
puts "Unsubscribe on channel:#{channel};"
end
end
end
end
ws.on :message do |event|
handoff(ws: ws, event: event)
end
ws.on :close do |event|
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
ws.on :error do |event|
ws.close
end
# Return async Rack response
ws.rack_response
end
end
else
@app.call(env)
end
private
def handoff(ws: nil, event: nil, source: nil, message: nil)
# processing
end
end
The fix is rather simple once I really understood the problem. The Redis thread actually still exists. But, Redis is hanging anyway because it is waiting for the thread to get control. To do that, the WS.close code needs to cede control by using EM.next_tick within the WS.close as follows:
ws.on :close do |event|
EM.next_tick do
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
end