This code is good, it only reads the column i
(notice the last line ReadSchema: struct<i:bigint>
, which only reads i
import org.apache.spark.sql.Dataset
// Define the case class
case class Foo(i: Long, j: String)
// Create a Dataset of Foo
val ds: Dataset[Foo] = spark.createDataset(Seq(
Foo(1, "Q"),
Foo(10, "W"),
Foo(100, "E")
// Filter and cast the column
val result = ds.filter($"i" === 2).select($"i")
// Explain the query plan
// It prints:
//== Physical Plan ==
//*(1) Filter (isnotnull(i#225L) AND (i#225L = 2))
//+- *(1) ColumnarToRow
// +- FileScan parquet [i#225L] Batched: true, DataFilters: [isnotnull(i#225L), (i#225L = 2)], //Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], //PushedFilters: [IsNotNull(i), EqualTo(i,2)], ReadSchema: struct<i:bigint>
However, if i use val result = ds.filter(_.i == 10).map(_.i)
, the physical plan will read all columns including j
(notice the last line ReadSchema: struct<i:bigint,j:string>
//= Physical Plan ==
//*(1) SerializeFromObject [input[0, bigint, false] AS value#336L]
//+- *(1) MapElements //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8811/2079//839768@1028cff, obj#335: bigint
// +- *(1) Filter //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8810/7014//15521@212ee011.apply
// +- *(1) DeserializeToObject newInstance(class //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo), obj#334: //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo
// +- *(1) ColumnarToRow
// +- FileScan parquet [i#225L,j#226] Batched: true, DataFilters: [], Format: Parquet, //Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], PushedFilters: [], //ReadSchema: struct<i:bigint,j:string>
Why does spark handle differently when i use the scala syntax _.i
inside filter?
_ forces collection, tuple processing like rdd due to scala map
. You see ColumnarToRow
& DeserilaizeToObject
in physical plan.
Catalyst has little insight into Scala code and this is an old aspect if you google.
is same as col("x")
but Spark Catalyst still does nothing in this regard.
See Note that they are aware of this but it has not been catered for.