javavert.x

vertx RecordParser exception handling


I am using a NetServer in Vertx and when a new client is connected a flowable is created emitting packets that are received on that NetClient. In order to figure out how to do proper exception handling I have added some random exceptions into the flow:

private RecordParser createParser(NetState state, FlowableEmitter<Buffer> emitter) {
    RecordParser parser = RecordParser.newFixed(4);
    parser.handler(buffer -> {
        if(Math.random() > 0.8) {
            logger.error("I am dying now");
            throw new RuntimeException();
        }

        // protocol handling
    }).exceptionHandler((throwable) -> {
        logger.error("Got an exception: ", throwable);
        emitter.onError(throwable);
    });
    return parser;
}

I use this like this:

NetState state = new NetState();
// client -> NetClient
return Flowable.create(emitter -> client.handler(createParser(state, emitter)), BackpressureStrategy.ERROR);

Now the problem is, my exception thrown in the record parser never reaches any exception handler. Neither the one of the RecordParser itself nor the one I had attached to the client. It goes all up to the root context. How do I catch exceptions that occur during record parsing (in order to close the client that sent invalid data for example)


Solution

  • Apparently exceptionHandler is used only on exception that happen on the socket / IO and not inside a handler.

    RecordParser extends ReadStream which Javadoc statement doesn't clarify that:

    Set an exception handler on the read stream.

    But the user guide mentions it in few places

    handling_exceptions

    You can set an exceptionHandler to receive any exceptions that happen on the socket

    handling_exceptions_2

    You can set an exceptionHandler to receive any exceptions that happens before the connection is passed to the requestHandler

    readstream

    exceptionHandler: called when an exception occurs on the ReadStream.

    I have implemented small handler to help with that

    public class StreamCatchingHandler<P, T> implements Handler<T> {
    
      private final Promise<P> promise;
      private final ReadStream<T> stream;
      private final Handler<T> handler;
    
      @Override
      public void handle(T event) {
        try {
          handler.handle(event);
        } catch (Throwable ex) {
          stream.pause();
          promise.fail(ex);
        }
      }
    }