scalaapache-sparkapache-kafkaprotocol-buffersscalapb

Spark unmarshal proto bytes into readable format


My Spark application receives binary data from Kafka. The data is sent as a byte proto message to Kafka. The proto message is:

message Any {
  string type_url=1;
  bytes value=2;
}

With the ScalaPB Library I can deserialize the Any message to its original format. How do I deserialize the value from bytes to a readable format? SerializationUtils does not work. This is how the Any message looks after being deserialized.

#+-----------------------------------------|
#| type_url           | value              |
#+-----------------------------------------|
#|type.googleapis.c...|[0A 8D D8 04 0A 1...|
#+-----------------------------------------|

The value is still in its byte format. After deserializing it with SerializationUtils the data is not correct.

#+-----------------+
#|value            |
#+-----------------+
#|2020-09-04T10:...|
#+-----------------+

Is there another alternative? Ist there a way to deserialize the bytes into a String, Struct or String Json?

I use ScalaPBs example with udf to deserialize the bytes into the Any message.

val parseCloud = ProtoSQL.udf { bytes: Array[Byte] => CloudEvent.parseFrom(bytes) }

The udf with SerializationUtils for the bytes value looks following.

val parseBytes = ProtoSQL.udf {bytes: Array[Byte] => deserialize(bytes)}

Solution

  • If you know the message type in the Any, you can deserialize using the unpack method.

    val unpackAny = ProtoSQL.udf { any: com.google.protobuf.any.Any => any.unpack[MyMessage] }