I often use the map
function on spark Dataset rows to do transformations in Scala on typed objects. My usual pattern is to convert intermediate results created from data frame transformations (withColumn
, groupBy
, etc.) and create a typed Dataset of the intermediate result so I can use map
.
This works well but leads to a lot of 'temporary' case classes for intermediate results or unwieldy tuple types.
An alternative would be to run map
on a data frame and retrieve typed fields from the row using getAs[T]
but this doesn't seem to work with spark.implicits
if T
is a case class.
E.g. this gives the error ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import spark.implicits._
final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
val name = row.getAs[String]("name")
val person = row.getAs[Person]("person")
(name, person)
})
display(ds)
whereas this works fine:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import spark.implicits._
final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
.as[Tuple2[String, Person]]
val ds = df.map(row => {
val name = row._1
val person = row._2
(name, person)
})
display(ds)
So spark is happily converting the dataframe person struct to the Person
case class in the second example but won't do it in the first example. Does anyone know a simple way to fix this?
Thanks,
David
"Simple", possibly :), but very much using internal api's that are subject to change (and have done). This code won't work as-is on Spark 4 either (tested on 3.5.1).
As an approach it's also likely slower than the second example you provide using tuples as the Spark code translates from InternalRow to the user land Row before entering your map code. The below code then converts back to InternalRow before calling the decoder.
resolveAndBind is typically ok in this kind of example but it's also not guaranteed to work in all cases as resolution of field names etc. typically needs to happen as part of the full analysis of the query plan.
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import spark.implicits._
implicit val pEnc = implicitly[Encoder[Person]].asInstanceOf[ExpressionEncoder[Person]]
val decoder = pEnc.resolveAndBind().objDeserializer
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
val name = row.getAs[String]("name")
val personRow = row.getAs[Row]("person")
val person = decoder.eval(CatalystTypeConverters.convertToCatalyst(personRow).asInstanceOf[InternalRow]).asInstanceOf[Person]
(name, person)
})
ds.show
in summary, you are better off using a tuple wrapper and the inbuilt encoding wherever possible, it's faster and is designed and tested to work that way.