javapipelinenettydecoder

How to send a message inside a netty decoder to the next handler/decoder in the pipeline?


My channel pipeline contains several decoders, all of them operating on TextWebSocketFrame messages. Now my problem is, that I have to choose the right decoder base on some content of the message.

Essentially, I have to parse a certain field in the message and then decide if I want to proceed handling the message or pass the message to the next encoder/handler.

Most people suggest to use a single decoder to decode all messages in such a case, but my problem is that some decoders are added dynamically and it would be a mess to put all logic in a single decoder.

Currently the code looks like this:

@Override
protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
    String messageAsJson = msg.text();
    JsonObject jsonObject = JSON_PARSER.fromJson(messageAsJson, JsonObject.class);

    JsonObject messageHeader = jsonObject.getAsJsonObject(MESSAGE_HEADER_FIELD);
    String serviceAsString = messageHeader.get(MESSAGE_SERVICE_FIELD).getAsString();
    String inboundTypeAsString = messageHeader.get(MESSAGE_TYPE_FIELD).getAsString();

    Service service = JSON_PARSER.fromJson(serviceAsString, Service.class);
    InboundType inboundType = JSON_PARSER.fromJson(inboundTypeAsString, InboundType.class);

    if (service == Service.STREAMING) {
        out.add(decodeQuotesMessage(inboundType, messageAsJson));
    } else {
        
    }
}

So basically I'd need some logic in the else branch to pass the message to the next handler in the pipeline.

I am aware, that this approach is not the most efficient one but the architecture of my service has a slow path (running on a different thread pool), including this logic and a fast path. I can accept some slow code at this place.


Solution

  • In general, you need something like this:

    if (service == Service.STREAMING) {
        ctx.pipeline().addLast(new StreamingHandler());
    } else {
        ctx.pipeline().addLast(new OtherHandler());
    }
    out.add(decodeQuotesMessage(inboundType, messageAsJson));
    ctx.pipeline().remove(this);
    

    Logic behind is next:

    1. You decoded header and you know now what flow you need to follow;
    2. You add specific handler to the pipeline according to your header;
    3. You add decoded message to 'out' list and thus you say "send this decoded message to next handler in pipeline, that would be handler defined in step 2 in case current handler is last in pipeline";
    4. You remove the current handler from the pipeline to avoid handlers duplication in case your protocol will send the same header again and again. However, this is step is specific to your protocol and may be not necessary.

    This is just the general approach, however, it really depends on your protocol flow.