javamultithreadingwebsockettyrus

Threading in javax.websockets / Tyrus


I'm writing a Java app that sends and receives messages from a websocket server. When the app receives a message it might take some time to process it. Therefore I'm trying to use multiple threads to receive messages. To my understanding Grizzly has selector threads as well as worker threads. By default there is 1 selecter thread and 2 worker threads, in the following example I'm trying to increase those to 5 and 10 respectively. In the below example I'm pausing the the thread that calls the onMessage method for 10sec to simulate processing of the incoming information. The information comes in every second, therefore 10 threads should be able to handle the amount of traffic. When I profile the run, only 1 selector thread is running and 2 working threads. Furthermore, messages are only received at a 10sec interval. Indicating that only 1 thread is handling the traffic - I find this very odd. During profiling, one worker thread e.g. Grizzly(1) receives the first message sent. Then 10 seconds later 'Grizzly(2)' receives the second message - then Grizzly(2) keeps on receiving the messages, and Grizzly(1) does not perform any actions.

Can someone please explain this odd behavior and how to change it to e.g. 10 threads constantly waiting in line for a message?

Main:

    public static void main(String[] args) {
        WebsocketTextClient client = new WebsocketTextClient();
        client.connect();
        for (int i = 0; i < 60; i++) {
            client.send("Test message " + i);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println("Error sleeping!");
            }
        }
    }

WebsocketTextClient.java:

import java.net.URI;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.client.ThreadPoolConfig;
import org.glassfish.tyrus.container.grizzly.client.GrizzlyClientProperties;

public class WebsocketTextClient {

    private ClientManager client;
    private ClientEndpointConfig clientConfig;
    WebsocketTextClientEndpoint endpoint;

    public WebsocketTextClient() {
        client = ClientManager.createClient();
        client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(5));
        client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));
    }

    public boolean connect() {
        try {
            clientConfig = ClientEndpointConfig.Builder.create().build();
            endpoint = new WebsocketTextClientEndpoint();
            client.connectToServer(endpoint, clientConfig, new URI("wss://echo.websocket.org"));
        } catch (Exception e) {
            return false;
        }
        return true;
    }

    public boolean disconnect() {
        return false;
    }

    public boolean send(String message) {
        endpoint.session.getAsyncRemote().sendText(message);
        return true;
    }

    private class WebsocketTextClientEndpoint extends Endpoint {
        Session session;

        @Override
        public void onOpen(Session session, EndpointConfig config) {
            System.out.println("Connection opened");
            this.session = session;
            session.addMessageHandler(new WebsocketTextClientMessageHandler());
        }
    }

    private class WebsocketTextClientMessageHandler implements MessageHandler.Whole<String> {

        @Override
        public void onMessage(String message) {
            System.out.println("Message received from " + Thread.currentThread().getName() + " " + message);
            try {
                Thread.sleep(10000);
            } catch (Exception e) {
                System.out.println("Error sleeping!");
            }
            System.out.println("Resuming");
        }
    }
}

Solution

  • What you appear to be asking is for WebSockets to be able to receive multiple messages sent by the same client connection, to process those messages in separate threads, and to send the responses when they are ready - which means, potentially out of order. This scenario can only happen if the client is multi-threaded.

    To deal with multiple threads on the same WebSocket session would generally require the ability for WebSockets to multiplex the data going to and from the client. This is not currently a feature of WebSockets, but could certainly be built on top of it. However, multiplexing those client and server threads on a single channel introduces a fair bit of complexity, because you need to stop all the client and server threads from inadvertently overwriting or starving one another.

    The Java spec for MessageHandler is perhaps a little ambiguous about the threading model;

    https://docs.oracle.com/javaee/7/api/javax/websocket/MessageHandler.html says:

    Each web socket session uses no more than one thread at a time to call its MessageHandlers.

    But the important term here is "socket session". If your client is sending multiple messages within the same WebSocket session, the server side handler will execute within a single thread. This doesn't mean you can't do lots of interesting stuff within the thread, particularly if you're using Input/OutputStreams (or Writers) on both ends. It does mean that communication with the client is mediated by just one thread. If you want to multiplex the communication, you'd have to write something on top of the socket to do so; that would include developing your own threading model for dispatching the requests.

    An easier solution would be to create a new Session for each client request. Each client request starts a session (ie, TCP connection), sends the data, and waits for the result. This gives you multiple MessageHandler threads - one per session, per the spec.

    This is the most straightforward way to get multi-threading on the server side; any other approach will tend to need a multiplexing mechanism - which, depending on your use case, is perhaps not worth the effort, and certainly carries some complexity and risk.

    If you are concerned about the number of sessions (TCP/HTTP connections) between client/s and server/s, you could consider creating a pool of Sessions on the client side, and use each client Session one at a time, returning the session to the pool whenever the client is done with it.

    Finally, perhaps not directly relevant: I found that when I used Payara Micro to serve the WebSocket endpoint, I needed to set this:

      <resources>
        ...
        <managed-executor-service maximum-pool-size="200" core-pool-size="10" long-running-tasks="true" keep-alive-seconds="300" hung-after-seconds="300" task-queue-capacity="20000" jndi-name="concurrent/__defaultManagedExecutorService" object-type="system-all"></managed-executor-service>
    

    The default ManagedExecutorService only provides a single thread. This appears to be the case in Glassfish as well. This had me running around for hours thinking that I didn't understand the threading model, when it was just the pool size that was confusing me.