scalaapache-sparkapache-spark-sqlsizespark-streaming

How to calculate the size of dataframe in bytes in Spark?


I want to write one large sized dataframe with repartition, so I want to calculate number of repartition for my source dataframe.

numberofpartition = {size of dataframe/default_blocksize}

How to calculate the dataframe size in bytes?


Solution

  • Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes we can get the size of actual Dataframe once its loaded into memory. Check the below code.

    scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
    df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
    
    scala> import org.apache.commons.io.FileUtils
    import org.apache.commons.io.FileUtils
    
    scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    bytes: BigInt = 763275709
    
    scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
    res5: String = 727 MB
    
    scala> import sys.process._
    import sys.process._
    
    scala> "hdfs dfs -ls -h /tmp/srinivas/".!
    Found 2 items
    -rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
    -rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
    res6: Int = 0
    
    val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
        val dataSize = bytes.toLong
        val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt // May be you can change or modify this to get required partitions.
    
        df.repartition(if(numPartitions == 0) 1 else numPartitions)
          .[...]
    

    Edit - 1 : Please use the following logic as per your Spark versions.

    Spark 3.0.2

    val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes
    

    Spark 2.4

    val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    

    Spark 2.3

    val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes
    

    PySpark

    spark._jsparkSession.sessionState().executePlan(df._jdf.queryExecution().logical()).optimizedPlan().stats().sizeInBytes()