scalaapache-sparkframeless

Implicit Encoder for TypedDataset and Type Bounds in Scala


My objective is to create a MyDataFrame class that will know how to fetch data at a given path, but I want to provide type-safety. I'm having some trouble using a frameless.TypedDataset with type bounds on remote data. For example

sealed trait Schema
final case class TableA(id: String) extends Schema
final case class TableB(id: String) extends Schema

class MyDataFrame[T <: Schema](path: String, implicit val spark: SparkSession) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
} 

But I keep getting could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]. I know that TypedDataset.create needs an Injection for this to work. But I'm not sure how I would write this for a generic T. I thought the compiler would be able to deduce that since all subtypes of Schema are case classes that it would work.

Anybody ever run into this?


Solution

  • All implicit parameters should be in the last parameter list and this parameter list should be separate from non-implicit ones.

    If you try to compile

    class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession) {
      def read = TypedDataset.create(spark.read.parquet(path)).as[T]
    }
    

    you'll see error

    Error:(11, 35) could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]
        def read = TypedDataset.create(spark.read.parquet(path)).as[T]
    

    So let's just add corresponding implicit parameter

    class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
      def read = TypedDataset.create(spark.read.parquet(path)).as[T]
    }
    

    we'll have error

    Error:(11, 64) could not find implicit value for parameter as: frameless.ops.As[org.apache.spark.sql.Row,T]
        def read = TypedDataset.create(spark.read.parquet(path)).as[T]
    

    So let's add one more implicit parameter

    import frameless.ops.As
    import frameless.{TypedDataset, TypedEncoder}
    import org.apache.spark.sql.{Row, SparkSession}
    
    class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row], as: As[Row, T]) {
      def read = TypedDataset.create(spark.read.parquet(path)).as[T]
    }
    

    or with kind-projector

    class MyDataFrame[T <: Schema : As[Row, ?]](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
      def read = TypedDataset.create(spark.read.parquet(path)).as[T]
    }
    

    You can create custom type class

      trait Helper[T] {
        implicit def te: TypedEncoder[Row]
        implicit def as: As[Row, T]
      }
    
      object Helper {
        implicit def mkHelper[T](implicit te0: TypedEncoder[Row], as0: As[Row, T]): Helper[T] = new Helper[T] {
          override implicit def te: TypedEncoder[Row] = te0
          override implicit def as: As[Row, T] = as0
        }
      }
    
      class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
        val h = implicitly[Helper[T]]
        import h._
        def read = TypedDataset.create(spark.read.parquet(path)).as[T]
      }
    

    or

      class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
        import h._
        def read = TypedDataset.create(spark.read.parquet(path)).as[T]
      }
    

    or

      trait Helper[T] {
        def create(dataFrame: DataFrame): TypedDataset[T]
      }
    
      object Helper {
        implicit def mkHelper[T](implicit te: TypedEncoder[Row], as: As[Row, T]): Helper[T] =
          (dataFrame: DataFrame) => TypedDataset.create(dataFrame).as[T]
      }
    
      class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
        def read = implicitly[Helper[T]].create(spark.read.parquet(path))
      }
    

    or

      class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
        def read = h.create(spark.read.parquet(path))
      }
    

    Corrected version:

    import org.apache.spark.sql.Encoder
    import frameless.{TypedDataset, TypedEncoder}
    
    class MyDataFrame[T <: Schema](path: String)(implicit
      spark: SparkSession,
      e: Encoder[T],
      te: TypedEncoder[T]
    ) {
      def read: TypedDataset[T] = TypedDataset.create[T](spark.read.parquet(path).as[T])
    }
    

    or using context bounds

    class MyDataFrame[T <: Schema : Encoder : TypedEncoder](path: String)(implicit
      spark: SparkSession
    ) {
      def read: TypedDataset[T] = TypedDataset.create[T](spark.read.parquet(path).as[T])
    }
    

    Testing: I converted a json file {"id": "xyz"} into parquet file and then

    sealed trait Schema
    final case class TableA(id: String) extends Schema
    final case class TableB(id: String) extends Schema
    
    import org.apache.spark.sql.SparkSession
    
    implicit val spark: SparkSession = SparkSession.builder
      .master("local")
      .appName("Spark SQL basic example")
      .getOrCreate()
    
    import spark.implicits._
    import frameless.syntax._
    
    val res: TypedDataset[TableA] = new MyDataFrame[TableA]("path/to/parquet/file").read
    println(res) // [id: string]
    res.foreach(println).run() // TableA(xyz)