apache-sparkapache-spark-datasetkryoapache-spark-encoders

Question regarding kryo and java encoders in datasets


I am using Spark 2.4 and referring to https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

Bean class:

public class EmployeeBean implements Serializable {

    private Long id;
    private String name;
    private Long salary;
    private Integer age;

    // getters and setters

}

Spark Example:

    SparkSession spark = SparkSession.builder().master("local[4]").appName("play-with-spark").getOrCreate();

    List<EmployeeBean> employees1 = populateEmployees(1, 1_000_000);

    Dataset<EmployeeBean> ds1 = spark.createDataset(employees1, Encoders.kryo(EmployeeBean.class));
    Dataset<EmployeeBean> ds2 = spark.createDataset(employees1, Encoders.bean(EmployeeBean.class));

    ds1.persist(StorageLevel.MEMORY_ONLY());
    long ds1Count = ds1.count();

    ds2.persist(StorageLevel.MEMORY_ONLY());
    long ds2Count = ds2.count();

I looked for storage in Spark Web UI. Useful part -

ID  RDD Name                                           Size in Memory   
2   LocalTableScan [value#0]                           56.5 MB  
13  LocalTableScan [age#6, id#7L, name#8, salary#9L]   23.3 MB

Few questions:


Solution

  • Shouldn't size of kryo serialized RDD be less than Java serialized RDD instead of more than double size?

    That would be true if you ever used Java serialization (or RDDs for that matter). However that's not the case here. Java serialization is used when you apply Encoders.javaSerialization which, same as Encoders.kryo, uses binary serialization.

    Binary serializers take a whole object, serialize it using general purpose serialization tools, and store resulting byte array as a single DataFrame column. The result is opaque for the optimizer (with no real storage optimization, as blobs don't compress well), and usable only with functional ("strongly typed" API).

    Encoders.bean is a completely different beast closely resembling Encoders.product. It leverages the structure of the class, and reflects in the schema. Because it encodes individual fields, columns can be efficiently compressed using standard Spark methods. Hence the lower storage memory requirements.

    Closely related to Spark Encoders: when to use beans()