What is the status of schema evolution for arrays
of structs
(complex types) in spark?
I know that for either ORC or Parquet for regular simple types works rather fine (adding a new column) but I could not find any documentation so far for my desired case.
My use case is to have a structure similar to this one:
user_id,date,[{event_time, foo, bar, baz, tag1, tag2, ... future_tag_n}, ...]
And I want to be able to add new fields to the struct in the array.
Would a Map
(key-value) complex type instead cause any inefficiencies? There I would at least be sure that adding new fields (tags) would be flexible.
case class BarFirst(baz:Int, foo:String)
case class BarSecond(baz:Int, foo:String, moreColumns:Int, oneMore:String)
case class BarSecondNullable(baz:Int, foo:String, moreColumns:Option[Int], oneMore:Option[String])
case class Foo(i:Int, date:String, events:Seq[BarFirst])
case class FooSecond(i:Int, date:String, events:Seq[BarSecond])
case class FooSecondNullable(i:Int, date:String, events:Seq[BarSecondNullable])
val dfInitial = Seq(Foo(1, "2019-01-01", Seq(BarFirst(1, "asdf")))).toDF
dfInitial.printSchema
dfInitial.show
root
|-- i: integer (nullable = false)
|-- date: string (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = false)
| | |-- foo: string (nullable = true)
scala> dfInitial.show
+---+----------+----------+
| i| date| events|
+---+----------+----------+
| 1|2019-01-01|[[1,asdf]]|
+---+----------+----------+
dfInitial.write.partitionBy("date").parquet("my_df.parquet")
tree my_df.parquet
my_df.parquet
├── _SUCCESS
└── date=2019-01-01
└── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet
val evolved = Seq(FooSecond(2, "2019-01-02", Seq(BarSecond(1, "asdf", 11, "oneMore")))).toDF
evolved.printSchema
evolved.show
scala> evolved.printSchema
root
|-- i: integer (nullable = false)
|-- date: string (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = false)
| | |-- foo: string (nullable = true)
| | |-- moreColumns: integer (nullable = false)
| | |-- oneMore: string (nullable = true)
scala> evolved.show
+---+----------+--------------------+
| i| date| events|
+---+----------+--------------------+
| 1|2019-01-02|[[1,asdf,11,oneMo...|
+---+----------+--------------------+
import org.apache.spark.sql._
evolved.write.mode(SaveMode.Append).partitionBy("date").parquet("my_df.parquet")
my_df.parquet
├── _SUCCESS
├── date=2019-01-01
│ └── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet
└── date=2019-01-02
└── part-00000-64e65d05-3f33-430e-af66-f1f82c23c155.c000.snappy.parquet
val df = spark.read.parquet("my_df.parquet")
df.printSchema
scala> df.printSchema
root
|-- i: integer (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = true)
| | |-- foo: string (nullable = true)
|-- date: date (nullable = true)
df.show
df.as[FooSecond].collect // AnalysisException: No such struct field moreColumns in baz, foo
df.as[FooSecondNullable].collect // AnalysisException: No such struct field moreColumns in baz, foo
This behaviour was evaluated for spark 2.2.3_2.11 and 2.4.2_2.12.
When executing the code after after edit (above), schema merging is off and the new columns are not loaded. When enabling schema merge:
val df = spark.read.option("mergeSchema", "true").parquet("my_df.parquet")
scala> df.printSchema
root
|-- i: integer (nullable = true)
|-- events: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- baz: integer (nullable = true)
| | |-- foo: string (nullable = true)
| | |-- moreColumns: integer (nullable = true)
| | |-- oneMore: string (nullable = true)
|-- date: date (nullable = true)
df.as[FooSecond].collect // obviously fails NullPointerException must use option df.as[FooSecondNullable].collect // works fine
evolved.write.mode(SaveMode.Append).partitionBy("date").saveAsTable("my_df")
seems to work fine (no exception), but when trying to read the data back in:
spark.sql("describe my_df").show(false)
+-----------------------+---------------------------------+-------+
|col_name |data_type |comment|
+-----------------------+---------------------------------+-------+
|i |int |null |
|events |array<struct<baz:int,foo:string>>|null |
|date |string |null |
|# Partition Information| | |
|# col_name |data_type |comment|
|date |string |null |
+-----------------------+---------------------------------+-------+
when instead of an Array of Structs only using basic types:
val first = Seq(Foo(1, "2019-01-01")).toDF
first.printSchema
first.write.partitionBy("dt").saveAsTable("df")
val evolved = Seq(FooEvolved(1,2, "2019-01-02")).toDF
evolved.printSchema
evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
org.apache.spark.sql.AnalysisException: The column number of the existing table default.df(struct<first:int,dt:string>) doesn't match the data schema(struct<first:int,second:int,dt:string>);
there is a clear error message Question: is it still possible to evolve the schema in Hive? Or is a manual adaption of the schema required?
Schema evolution for Arrays of Structs is supported, but one must turn on the merge option when reading the files and seems to work out of the box only when directly reading the files without Hive.
When reading from hive only the old schema is returned, as when writing new columns seem to be dropped silently.
Schema evolution in parquet format (manually creating views, an additional benefit that parquet unsupported schema evolution (rename, datatype change are possible)) looks like an interesting alternative as the merge-schema option set to true is quite resource heavy and it works for all SQL engines on Hadoop.