apache-sparkapache-spark-datasetkryoapache-spark-encoders

Impossible to operate on custom type after it is encoded? Spark Dataset


Say you have this (solution of encoding custom type is brought from this thread):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

When do a ds.show, I got:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

I understand that it's because the contents are encoded into internal Spark SQL binary representation. But how can I display the decoded content like this?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

UPDATE1

Displaying content is not the biggest issue, what's more important is that it could lead to problem when processing the dataset, consider this example:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

Does this mean, kryo-encoded type is not able to do operation like joinWith conveniently?

How do we process custom type on Dataset then?
If we are not able to process it after it's encoded, what's the point of this kryo encoding solution on custom type?!

(Solution provided by @jacek below is good to know for case class type, but it still cannot decode custom type)


Solution

  • The following worked for me, but seems like using high-level API to do low-level (deserialization) work.

    This is not to say it should be done this way, but shows that it's possible.

    I don't know why KryoDeserializer does not deserialize bytes to the object the bytes came from. It is just this way.

    One major difference between your class definition and mine is this case that let me using the following trick. Again, no idea exactly why it makes it possible.

    scala> println(spark.version)
    3.0.1
    
    // Note that case keyword
    case class MyObj(val i: Int, val j: String)
    import org.apache.spark.sql.Encoders
    implicit val myObjEncoder = Encoders.kryo[MyObj]
    // myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]
    
    val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
    // the Kryo deserializer gives bytes
    scala> ds.printSchema
    root
     |-- value: binary (nullable = true)
    
    scala> :type sc
    org.apache.spark.SparkContext
    
    // Let's deserialize the bytes into an object
    import org.apache.spark.serializer.KryoSerializer
    val ks = new KryoSerializer(sc.getConf)
    // that begs for a generic UDF
    val deserMyObj = udf { value: Array[Byte] => 
      import java.nio.ByteBuffer
      ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }
    
    val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
    scala> solution.show
    +---+---+
    |  i|  j|
    +---+---+
    |  1|  a|
    |  2|  b|
    |  3|  c|
    +---+---+