My Spark application receives binary data from Kafka. The data is sent as a byte proto message to Kafka. The proto message is:
message Any {
string type_url=1;
bytes value=2;
}
With the ScalaPB Library I can deserialize the Any message to its original format. How do I deserialize the value from bytes to a readable format? SerializationUtils does not work. This is how the Any message looks after being deserialized.
#+-----------------------------------------|
#| type_url | value |
#+-----------------------------------------|
#|type.googleapis.c...|[0A 8D D8 04 0A 1...|
#+-----------------------------------------|
The value is still in its byte format. After deserializing it with SerializationUtils the data is not correct.
#+-----------------+
#|value |
#+-----------------+
#|2020-09-04T10:...|
#+-----------------+
Is there another alternative? Ist there a way to deserialize the bytes into a String, Struct or String Json?
I use ScalaPBs example with udf to deserialize the bytes into the Any message.
val parseCloud = ProtoSQL.udf { bytes: Array[Byte] => CloudEvent.parseFrom(bytes) }
The udf with SerializationUtils for the bytes value looks following.
val parseBytes = ProtoSQL.udf {bytes: Array[Byte] => deserialize(bytes)}
If you know the message type in the Any
, you can deserialize using the unpack method.
val unpackAny = ProtoSQL.udf { any: com.google.protobuf.any.Any => any.unpack[MyMessage] }