I am trying to figure out if this is an issue in the sparksql-scalapb (https://github.com/scalapb/sparksql-scalapb) or if I am not able to figure out somehow.
Consider a proto which will generate a case class
case class Person(name: Option[String], age: Option[Int]) extended GeneratedMessage .....
When we create a spark job which will use the above proto class and writes it as a df to file/filesystem, it will only have two columns name
and age
.
Consider if this proto is evolved to add gender
field which will generate a case class
case class Person(name: Option[String], age: Option[Int], gender: Option[String]) extended GeneratedMessage .....
If we use this new proto class to read the old data we have written to file/filesystem using
import scalapb.spark.Implicits._
val df = spark.read(.....
df.as[Person]
This will fail with
org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `gender` cannot be resolved. Did you mean one of the following? [`name`, `age`].
Shouldn't the scalapb encoder always set the fields which are not available in df as None and continue the conversion instead of erroring out?
Author of sparksql-scalapb here. In binary parsing, field tags are encoded as numbers, so field renames and missing field behavior is well-defined. In dataframes, the column name is a string, so can't reliably detect renames and a missing column might be a programming error that we shouldn't silently ignore by filling in nulls. Currently, it's the user's responsibility to migrate the dataframes. It should be possible (though a bit tricky) to write a generic function that takes a dataframe and a protobuf descriptor and returns a new dataframe that adds missing columns with nulls, but it's not a planned addition. It's a bit tricky since a robust implementation will have to handle all cases, for example, when the missing column is in a message that is nested inside a repeated field.
Feel free to experiment with that and a PR will be considered!