apache-sparkapache-spark-sql

Temp table caching with spark-sql


Is a table registered with registerTempTable (createOrReplaceTempView with spark 2.+) cached?

Using Zeppelin, I register a DataFrame in my scala code, after heavy computation, and then within %pyspark I want to access it, and further filter it.

Will it use a memory-cached version of the table? Or will it be rebuilt each time?


Solution

  • Registered tables are not cached in memory.

    The registerTempTable createOrReplaceTempView method will just create or replace a view of the given DataFrame with a given query plan.

    It will convert the query plan to canonicalized SQL string, and store it as view text in metastore, if we need to create a permanent view.

    You'll need to cache your DataFrame explicitly. e.g :

    df.createOrReplaceTempView("my_table") # df.registerTempTable("my_table") for spark <2.+
    spark.cacheTable("my_table") 
    

    EDIT:

    Let's illustrate this with an example :

    Using cacheTable :

    scala> val df = Seq(("1",2),("b",3)).toDF
    // df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
    
    scala> sc.getPersistentRDDs
    // res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
    
    scala> df.createOrReplaceTempView("my_table")
    
    scala> sc.getPersistentRDDs
    // res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
    
    scala> spark.catalog.cacheTable("my_table") // spark.cacheTable("...") before spark 2.0
    
    scala> sc.getPersistentRDDs
    // res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
    // Map(2 -> In-memory table my_table MapPartitionsRDD[2] at
    // cacheTable at <console>:26)
    

    Now the same example using cache.registerTempTable cache.createOrReplaceTempView :

    scala> sc.getPersistentRDDs
    // res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
    
    scala> val df = Seq(("1",2),("b",3)).toDF
    // df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
    
    scala> df.createOrReplaceTempView("my_table")
    
    scala> sc.getPersistentRDDs
    // res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
    
    scala> df.cache.createOrReplaceTempView("my_table")
    
    scala> sc.getPersistentRDDs
    // res6: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = 
    // Map(2 -> ConvertToUnsafe
    // +- LocalTableScan [_1#0,_2#1], [[1,2],[b,3]]
    //  MapPartitionsRDD[2] at cache at <console>:28)