javarestjerseyserver-sent-eventspersistent-connection

Server sent event with Jersey: EventOutput is not closed after client drops


I am using jersey to implement a SSE scenario.

The server keeps connections alive. And push data to clients periodically.

In my scenario, there is a connection limit, only a certain number of clients can subscribe to the server at the same time.

So when a new client is trying to subscribe, I do a check(EventOutput.isClosed) to see if any old connections are not active anymore, so they can make room for new connections.

But the result of EventOutput.isClosed is always false, unless the client explicitly calls close of EventSource. This means that if a client drops accidentally(power outage or internet cutoff), it's still hogging the connection, and new clients can not subscribe.

Is there a work around for this?


Solution

  • @CuiPengFei,

    So in my travels trying to find an answer to this myself I stumbled upon a repository that explains how to handle gracefully cleaning up the connections from disconnected clients.

    The encapsulate all of the SSE EventOutput logic into a Service/Manager. In this they spin up a thread that checks to see if the EventOutput has been closed by the client. If so they formally close the connection (EventOutput#close()). If not they try to write to the stream. If it throws an Exception then the client has disconnected without closing and it handles closing it. If the write is successful then the EventOutput is returned to the pool as it is still an active connection.

    The repo (and the actual class) are available here. Ive also included the class without imports below in case the repo is ever removed.

    Note that they bind this to a Singleton. The store should be globally unique.

    public class SseWriteManager {
    
    private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();
    
    private final ScheduledExecutorService messageExecutorService;
    
    private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);
    
    public SseWriteManager() {
        messageExecutorService = Executors.newScheduledThreadPool(1);
        messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
    }
    
    public void addSseConnection(String id, EventOutput eventOutput) {
        logger.info("adding connection for id={}.", id);
        connectionMap.put(id, eventOutput);
    }
    
    private class messageProcessor implements Runnable {
        @Override
        public void run() {
            try {
                Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
                while (iterator.hasNext()) {
                    boolean remove = false;
                    Map.Entry<String, EventOutput> entry = iterator.next();
                    EventOutput eventOutput = entry.getValue();
                    if (eventOutput != null) {
                        if (eventOutput.isClosed()) {
                            remove = true;
                        } else {
                            try {
                                logger.info("writing to id={}.", entry.getKey());
                                eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
                            } catch (Exception ex) {
                                logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
                                remove = true;
                            }
                        }
                    }
                    if (remove) {
                        // we are removing the eventOutput. close it is if it not already closed.
                        if (!eventOutput.isClosed()) {
                            try {
                                eventOutput.close();
                            } catch (Exception ex) {
                                // do nothing.
                            }
                        }
                        iterator.remove();
                    }
                }
            } catch (Exception ex) {
                logger.error("messageProcessor.run threw exception.", ex);
            }
        }
    }
    
    public void shutdown() {
        if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
            logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
            messageExecutorService.shutdown();
        } else {
            logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
        }
    
    }}