I am using a NetServer
in Vertx and when a new client is connected a flowable is created emitting packets that are received on that NetClient
. In order to figure out how to do proper exception handling I have added some random exceptions into the flow:
private RecordParser createParser(NetState state, FlowableEmitter<Buffer> emitter) {
RecordParser parser = RecordParser.newFixed(4);
parser.handler(buffer -> {
if(Math.random() > 0.8) {
logger.error("I am dying now");
throw new RuntimeException();
}
// protocol handling
}).exceptionHandler((throwable) -> {
logger.error("Got an exception: ", throwable);
emitter.onError(throwable);
});
return parser;
}
I use this like this:
NetState state = new NetState();
// client -> NetClient
return Flowable.create(emitter -> client.handler(createParser(state, emitter)), BackpressureStrategy.ERROR);
Now the problem is, my exception thrown in the record parser never reaches any exception handler. Neither the one of the RecordParser
itself nor the one I had attached to the client
. It goes all up to the root context. How do I catch exceptions that occur during record parsing (in order to close the client that sent invalid data for example)
Apparently exceptionHandler is used only on exception that happen on the socket / IO and not inside a handler.
RecordParser extends ReadStream which Javadoc statement doesn't clarify that:
Set an exception handler on the read stream.
But the user guide mentions it in few places
You can set an exceptionHandler to receive any exceptions that happen on the socket
You can set an exceptionHandler to receive any exceptions that happens before the connection is passed to the requestHandler
exceptionHandler: called when an exception occurs on the ReadStream.
I have implemented small handler to help with that
public class StreamCatchingHandler<P, T> implements Handler<T> {
private final Promise<P> promise;
private final ReadStream<T> stream;
private final Handler<T> handler;
@Override
public void handle(T event) {
try {
handler.handle(event);
} catch (Throwable ex) {
stream.pause();
promise.fail(ex);
}
}
}