javanetty

Data corruption with high throughput


I wrote a small client-server system with Netty. The client(s) send messages to a server, and the server responds with an ACK once the message is processed. This ACK contains an ever incrementing offset, so I can remove it from my list of pending messages.

If the throughput is low (~1000 messages per second), all is good. However, with a higher throughput (above 2000 ps), I start to see that sometimes when the decoder reads the offset (a long), it gets a very large offset, something like 4755801206503331182 (when the usual offset is in the 6-digits range). The number of errors seems proportional to the throughput, the higher the throughput, the more frequent the errors (I'd estimate it at 2 errors every 1000 messages)

This is my decoder:

public class HeartbeatAckDecoder extends ByteToMessageDecoder {

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    byte channelId = in.readByte();

    in.markReaderIndex();
    if (channelId == CHANNEL_ACK) {
      if (in.readableBytes() < 8) { // long minimum
        in.resetReaderIndex();
      } else {
        long offset = in.readLong();
        out.add(ChannelMessage.ack(offset));
      }
    } else {
      out.add(ChannelMessage.heartbeat());
    }
  }
}

This decoder handles 2 types of messages, ACKs and heartbeats. The first byte tells the type of message, if it's a HB, then there's nothing more to read.

However, if it's an ACK, the next 8 bytes (a long) it's the offset. I believe this logic is correct, so I don't know why I get this issue. It doesn't seem like the throughput is too high for Netty, because I'm sending the same number of messages from the client to the server (in fact, double of that because I have 2 clients), and the server reads the messages perfectly.

Any ideas?


Solution

  • The problem is that you call in.markReaderIndex() after reading the first byte (channelId). So when you do resetReaderIndex(), it resets only to after that byte, causing partial reads of the offset.

    To fix, call in.markReaderIndex() before reading anything. Then if there aren’t enough bytes for the full message (1 byte channelId + 8 bytes offset), reset to the start and wait for more data.

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
      in.markReaderIndex();
      if (in.readableBytes() < 1) return; // need channelId byte
    
      byte channelId = in.readByte();
      if (channelId == HEARTBEAT) { 
        out.add(ChannelMessage.heartbeat());
      } else if (channelId == CHANNEL_ACK) {
        if (in.readableBytes() < 8) {
          in.resetReaderIndex(); // not enough bytes for offset, reset
          return;
        }
        out.add(ChannelMessage.ack(in.readLong()));
      } else {
        throw new Exception("Unsupported channelId " + channelId);
      }
    }
    

    This ensures you only read the offset once the full 9 bytes are available, preventing corrupted values at high throughput.