scalaperformanceapache-sparkapache-spark-sqlapache-spark-dataset

Why dataset.count() is faster than rdd.count()?


I created a Spark Dataset[Long]:

scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]

When I ran ds.count it gave me result in 0.2s (on a 4 Core 8GB machine). Also, the DAG it created is as follows:

enter image description here

But, when I ran ds.rdd.count it gave me result in 4s (same machine). But the DAG it created is as follows:

enter image description here

So, my doubts are:

  1. Why ds.rdd.count is creating only one stage whereas ds.count is creating 2 stages ?
  2. Also, when ds.rdd.count is having only one stage then why it is slower than ds.count which has 2 stages ?

Solution

  • Why ds.rdd.count is creating only one stage whereas ds.count is creating 2 stages ?

    Both counts are effectively two step operations. The difference is that in case of ds.count, the final aggregation is performed by one of the executors, while ds.rdd.countaggregates the final result on the driver, therefore this step is not reflected in the DAG:

    Also, when ds.rdd.count is having only one stage then why it is slower

    Ditto. Moreover ds.rdd.count has to initialize (and later garbage collect) 100 million Row objects, what is hardly free and probably accounts for majority of the time difference here.

    Finally range-like objects are not a good benchmarking tool, unless used with a lot of caution. Depending on the context count over range can be expressed as a constant time operation and even without explicit optimizations can be extremely fast (see for example spark.sparkContext.range(0, 100000000).count) but don't reflect performance with a real workload.

    Related to: How to know which count query is the fastest?