apache-sparkapache-spark-sqlscalapb

SparkSQL with ScalaPB: Using MapType in output proto format gives scala.MatchError while calling toByteString


The following is my output message format :

message EditorialTextAdEnforcementData {
  int32 customerId = 1;
  int32 source = 2;
  DecisionDetails decisionDetails = 3;
  int32 flagsEnforceOption = 4;
  int32 categoryEnforceOption = 5;
  int32 applyBypass = 6;
  map<int32, string> categories = 7;
  bcl.DateTime rowDateSource = 8;
  int32 accountId = 9;
  int64 adId = 10;
  int64 orderId = 11;
  int32 adType = 12;
  int32 campaignType = 13;
  bool hasImage = 14;
  bool isNewAdType = 15;
}

While preparting the output dataset, I am using the following method to include a dummy map

 val output =...
        .withColumn(UC.Categories, map(lit("1"), lit("test"))).as[EditorialTextAdEnforcementData]
 output.show()

It works fine till this step. However, when I call:

output.map(_.toByteString).show()

I get the following error:

Exception in thread "main" scala.MatchError: MapType(IntegerType,StringType,false) (of class org.apache.spark.sql.types.MapType)
    at org.apache.spark.sql.catalyst.expressions.objects.MapObjects.doGenCode(objects.scala:836)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
    at scala.Option.getOrElse(Option.scala:121)

If I remove the categories field, it seems to work fine. Please let me know how I can write MapType fields as protobuf messages


Solution

  • This is an open issue tracked under https://github.com/scalapb/sparksql-scalapb/issues/79).

    As a workaround, we can use map as an encapsulated struct field inside the message as recommended here: https://developers.google.com/protocol-buffers/docs/proto3#backwards_compatibility. as follows:

    message Categories {
      int32 key = 1;
      string value = 2;
    }
    
    message EditorialTextAdEnforcementData {
      int32 customerId = 1;
      int32 source = 2;
      repeated DecisionDetails decisionDetails = 3;
      int32 flagsEnforceOption = 4;
      int32 categoryEnforceOption = 5;
      int32 applyBypass = 6;
      repeated Categories categories = 7;
      bcl.DateTime rowDateSource = 8;
      int32 accountId = 9;
      int64 adId = 10;
      int64 orderId = 11;
      int32 adType = 12;
      int32 campaignType = 13;
      bool hasImage = 14;
      bool isNewAdType = 15;
    }