I am using Spark 2.4 and referring to https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
Bean class:
public class EmployeeBean implements Serializable {
private Long id;
private String name;
private Long salary;
private Integer age;
// getters and setters
}
Spark Example:
SparkSession spark = SparkSession.builder().master("local[4]").appName("play-with-spark").getOrCreate();
List<EmployeeBean> employees1 = populateEmployees(1, 1_000_000);
Dataset<EmployeeBean> ds1 = spark.createDataset(employees1, Encoders.kryo(EmployeeBean.class));
Dataset<EmployeeBean> ds2 = spark.createDataset(employees1, Encoders.bean(EmployeeBean.class));
ds1.persist(StorageLevel.MEMORY_ONLY());
long ds1Count = ds1.count();
ds2.persist(StorageLevel.MEMORY_ONLY());
long ds2Count = ds2.count();
I looked for storage in Spark Web UI. Useful part -
ID RDD Name Size in Memory
2 LocalTableScan [value#0] 56.5 MB
13 LocalTableScan [age#6, id#7L, name#8, salary#9L] 23.3 MB
Few questions:
Shouldn't size of Kryo serialized RDD be less than Java serialized RDD instead of more than double size?
I also tried MEMORY_ONLY_SER()
mode and RDDs size are the same. RDD as serialized Java objects should be stored as one byte array per partition. Shouldn't the size of persisted RDDs be less than deserialized ones?
What exactly is adding Kryo and bean encoders are doing while creating Dataset?
Can I rename persisted RDDs for better readability?
Shouldn't size of kryo serialized RDD be less than Java serialized RDD instead of more than double size?
That would be true if you ever used Java serialization (or RDDs for that matter). However that's not the case here. Java serialization is used when you apply Encoders.javaSerialization
which, same as Encoders.kryo
, uses binary serialization.
Binary serializers take a whole object, serialize it using general purpose serialization tools, and store resulting byte array as a single DataFrame
column. The result is opaque for the optimizer (with no real storage optimization, as blobs don't compress well), and usable only with functional ("strongly typed" API).
Encoders.bean
is a completely different beast closely resembling Encoders.product
. It leverages the structure of the class, and reflects in the schema. Because it encodes individual fields, columns can be efficiently compressed using standard Spark methods. Hence the lower storage memory requirements.
Closely related to Spark Encoders: when to use beans()