Is a table registered with registerTempTable
(createOrReplaceTempView
with spark 2.+) cached?
Using Zeppelin, I register a DataFrame
in my scala code, after heavy computation, and then within %pyspark
I want to access it, and further filter it.
Will it use a memory-cached version of the table? Or will it be rebuilt each time?
Registered tables are not cached in memory.
The registerTempTable
createOrReplaceTempView
method will just create or replace a view of the given DataFrame
with a given query plan.
It will convert the query plan to canonicalized SQL string, and store it as view text in metastore, if we need to create a permanent view.
You'll need to cache your DataFrame explicitly. e.g :
df.createOrReplaceTempView("my_table") # df.registerTempTable("my_table") for spark <2.+
spark.cacheTable("my_table")
EDIT:
Let's illustrate this with an example :
Using cacheTable
:
scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
scala> sc.getPersistentRDDs
// res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
scala> df.createOrReplaceTempView("my_table")
scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
scala> spark.catalog.cacheTable("my_table") // spark.cacheTable("...") before spark 2.0
scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
// Map(2 -> In-memory table my_table MapPartitionsRDD[2] at
// cacheTable at <console>:26)
Now the same example using cache.registerTempTable
cache.createOrReplaceTempView
:
scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
scala> df.createOrReplaceTempView("my_table")
scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()
scala> df.cache.createOrReplaceTempView("my_table")
scala> sc.getPersistentRDDs
// res6: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
// Map(2 -> ConvertToUnsafe
// +- LocalTableScan [_1#0,_2#1], [[1,2],[b,3]]
// MapPartitionsRDD[2] at cache at <console>:28)