I have the following problem:
A certain producer sends Protobuf messages as binary data (byte array).
These binary data enter a misconfigured Kafka cluster which deserializes the byte array as string.
Then, that cluster serializes the data as string and sends it to the consumer.
The unsuspecting consumer expects to receive a binary byte array, but instead gets a UTF-8 encoded mess.
I tried to reproduce it in a JUnit test.
Let's say we have the following proto file:
syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";
option java_package = "com.mycompany.proto";
option java_multiple_files = true;
package com.mycompany;
enum MessageType {
NOT_SET = 0;
TYPE_A = 1;
TYPE_B = 2;
}
message MyMessagePart {
string someValue = 1;
}
message MyMessage {
// Numeric (integer) variable
int32 myNumber = 1;
// Text value
string myText = 2;
// Enum value
MessageType mType = 3;
// Message parts
repeated MyMessagePart messagePart = 4;
// Uint32 value
google.protobuf.UInt32Value uint32Value = 5;
// Timestamp
google.protobuf.Timestamp timestamp = 6;
}
Then I wrote the following test.
public class EncodingTest {
@Test
public void dealWithCorruptedBinaryData() throws InvalidProtocolBufferException {
// 1. Create a Protobuf message
final MyMessage msg = MyMessage.newBuilder()
.setMyNumber(42)
.setMyText("Hello")
.setMType(MessageType.TYPE_A)
.setUint32Value(UInt32Value.newBuilder()
.setValue(2067)
.build())
.addMessagePart(MyMessagePart.newBuilder()
.setSomeValue("message part value")
.build())
.build();
// 2. Convert it to bytes
final byte[] bytesSentByProducer = msg.toByteArray();
// 3. Now bytesSentByProducer enter misconfigured Kafka
// where they are deserialized using StringDeserializer
final StringDeserializer deserializer = new StringDeserializer();
final String dataReceivedInsideMisconfiguredKafka = deserializer.deserialize("inputTopic",
bytesSentByProducer);
// 4. Then, misconfigured Kafka serializes the data as String
final StringSerializer serializer = new StringSerializer();
final byte[] dataSentToConsumer = serializer.serialize("outputTopic", dataReceivedInsideMisconfiguredKafka);
// Because dataSentToConsumer have been corrupted during deserialization
// or serialization as string, conversion back to Protobuf does not work.
final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);
}
}
The producer creates a Protobuf message msg
and encodes it as a byte array bytesSentByProducer
.
The misconfigured Kafka cluster receives that byte array, deserializes it as string dataReceivedInsideMisconfiguredKafka
, serializes it as string dataSentToConsumer
and sends it to the consumer.
Because UTF-8 encoding has corrupted the binary data, the call
final MyMessage receivedMessage = MyMessage.parseFrom(dataSentToConsumer);
results in an exception:
com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:107)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1245)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1130)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1024)
at com.google.protobuf.CodedInputStream$ArrayDecoder.readUInt32(CodedInputStream.java:954)
at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:58)
at com.google.protobuf.UInt32Value.<init>(UInt32Value.java:14)
The conversion of byte array back to the message works with uncorrupted byte array bytesSentByProducer
(MyMessage.parseFrom(bytesSentByProducer)
).
Questions:
Is it possible to convert dataSentToConsumer
to bytesSentByProducer
?
If yes, how can I fix this issue if the only part under my control is the consumer? How can I undo the UTF-8 encoding which happens inside the misconfigured Kafka cluster?
Note: The obvious solution is to configure the Kafka cluster properly. The same consumer works fine in another environment where there is a normal Kafka cluster which doesn't do any strange transformation. This obvious and easiest solution is not available for bureaucratic reasons.
What I tried
Approach 1
private byte[] convertToOriginalBytes(final byte[] bytesAfter) throws CharacterCodingException {
final Charset charset = StandardCharsets.UTF_8;
final CharsetDecoder decoder = charset.newDecoder();
final CharsetEncoder encoder = charset.newEncoder();
final ByteBuffer byteBuffer = ByteBuffer.wrap(bytesAfter);
final CharBuffer charBuffer = CharBuffer.allocate(bytesAfter.length);
final CoderResult result = decoder.decode(byteBuffer, charBuffer, true);
result.throwException();
final ByteBuffer reversedByteBuffer = encoder.encode(charBuffer);
final byte[] reversedBytes = new byte[reversedByteBuffer.remaining()];
reversedByteBuffer.get(reversedBytes);
return reversedBytes;
}
The result is an exception.
java.nio.BufferUnderflowException
at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:272)
at com.mycompany.EncodingTest.convertToOriginalBytes(EncodingTest.java:67)
at com.mycompany.EncodingTest.dealWithCorruptedBinaryData(EncodingTest.java:54)
Approach 2
As far as I know UTF-8 has various byte patterns:
0xxxxxxx
for single-byte characters.110xxxxx 10xxxxxx
for two-byte characters etc.I assume that somewhere inside StringDeserializer
and/or StringSerializer
binary data are modified to conform to such UTF-8 rules.
Provided that this transformation is reversible, one could manipulate bits to get the original message.
Hate to be the bearer of bad news, but what you want is impossible.
The key point is completeness. Is there a complete mapping from one domain (here, raw bytes) to the target domain (here, UTF_8), and, the same in reverse.
Said differently: Here's the challenge: Given an arbitrarily chosen sequence of bytes, make some text such that, if you serialize that text using UTF-8 charset encoding, it produces those exact bytes. Is there a sequence of bytes you can choose such that this job is not possible?
And unfortunately the answer is yes, thus trivially proving that bytes -> text-via-UTF_8 -> bytes
is fatal unless you're very very lucky and the bytes so happen to not include anything that UTF8 cannot render.
Many decoders will take non-valid-UTF8 (because, if certain byte sequences cannot possibly appear when converting text to bytes with UTF8, generally that implies there are certain byte sequences which, if converted to text via UTF8, are invalid) - and just take a stab at it, or toss the 'corrupted data' glyph in there instead of erroring out. So, whomever manages that Kafka server never got an error. This act (turning invalid UTF-8, because it wasn't UTF-8, into a 'uhh, wha?' symbol) is destructive.
Some charset encodings do make this possible. The most commonly used one is no doubt ISO-8859-1
. That one is complete - because it's just a simple mapping that maps every byte value, all the way from 0 to 255, to some unique character. Hence, you can go two-way on that all day.
Thus, we get to some fixes:
Provided that this transformation is reversible
Yup. You correctly identified the key requirement for all of this, which is that it's reversible. And it isn't, unfortunately.