scalaapache-sparkprotocol-buffersscalapb

How to generically unpack google.protobuf.Any on Spark with ScalaPB at runtime?


I have the following protobuf setup:

import "google/protobuf/any.proto";

message EntityEnvelope {
  string id = 1;
  string entity_type = 2;
  google.protobuf.Any entity = 3;
}

message EntityABC {
  string name = 1;
}

message EntityXYZ {
  string desc = 1;
}

where EntityEnvelope.entity can be any type packed as google.protobuf.Any. Each protobuf message is stored in the disk encoded as Base64.

When reading these messages, it works perfectly if I use the specific entity type at compile type when unpacking:

import scalapb.spark.Implicits._

spark.read.format("text")
    .load("/tmp/entity").as[String]
    .map { s => Base64.getDecoder.decode(s) }
    .map { bytes => EntityEnvelope.parseFrom(bytes) }
    .map { envelope => envelope.getEntity.unpack[EntityXYZ]}
    .show()

But I want to use the same code to read any kind of entity at runtime, without having to specify its type. The "closer" I got was (but doesn't even compile):

  val entityClass = Class.forName(qualifiedEntityClassNameFromRuntime)

  spark.read.format("text")
    .load("/tmp/entity").as[String]
    .map { s => Base64.getDecoder.decode(s) }
    .map { bytes => EntityEnvelope.parseFrom(bytes) }
    .map { envelope => toJavaProto(envelope.getEntity).unpack(entityClass)} // ERROR: No implicits found for Encoder[Any]
    .show()

Since Any contains the typeUrl, would be possible to find the correct descriptor and unpack it automatically at runtime, without having to specify the type at compile time?


Solution

  • To unpack Anys that you don't know their type at compile time, you will need to build a map from typeUrl to the companion object of all the types you might expect, and use that to call unpack. It can be done like this:

    val typeUrlToCompanion = Map[String, scalapb.GeneratedMessageCompanion[_ <: scalapb.GeneratedMessage]](
        "type.googleapis.com/myexample.Person2" -> Person2
        // more types
      )
    

    Then unpack with

    envelope.getEntity.unpack(typeUrlToCompanion(envelope.getEntity.typeUrl)
    

    which would give you back a GeneratedMessage - the parent trait of all ScalaPB messages.

    However, after you do that, you are going to hit another problem. Spark dataframes are structured, they have a schema with consists of named columns and types, like a table in a relational database. The schema needs to be known at runtime when the dataframe is constructed. However, you seem to want to create a dataframe where each row has a different type (whatever the Any happens to be), and the collection of types only becomes known when unpacking - so that's incompatible with the design of dataframes.

    Depends on what you want to do, there are few options to consider:

    1. Instead of using Any, use the oneof feature of protobufs, so all the possible types are known and a schema could be automatically created (and you won't have to deal with unpacking Anys)
    2. Alternatively, partition the dataframe by typeUrl, so you end up with a different dataframes where in each one, all the items are of of the same type. Then unpack each one with its known type. Can be done generically with the typeUrlToCompanion approach above.