apache-sparkapache-spark-sqlavroorc

Reading orc does not trigger projection pushdown and predicate push down


I have a fileA in orc with the following format

key
    id_1
    id_2
value
    value_1
     ....
    value_30

If I use the following config:

'spark.sql.orc.filterPushdown'                : true

And my code looks like this:

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("key.id_")

the size of the file read will be the same as

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("*")

Shouldn't spark only

  1. predicate pushdown - read files and stripes that satisfy the filter
  2. projection pushdown - read columns that we are used?

I also checked with similar sized avro file and found no improvement in selection speed

Am i measuring orc the wrong way?


Solution

  • If you look at let's take the following reproducible example:

    val df = Seq(
      ((1,1),(2,2)),
      ((1,1),(2,2)),
      ((1,1),(2,2))
    ).toDF("key", "value")
    
    val keySchema = "struct<id_1:int,id_2:int>"
    val valueSchema = "struct<value_1:int,value_2:int>"
    
    val input = df.select(col("key").cast(keySchema), col("value").cast(valueSchema))
    
    scala> input.show
    +------+------+
    |   key| value|
    +------+------+
    |{1, 1}|{2, 2}|
    |{1, 1}|{2, 2}|
    |{1, 1}|{2, 2}|
    +------+------+
    
    input.write.mode("overwrite").orc("myFile.orc")
    

    If we now read this file as with the filters that you apply and use the explain method, we see the following:

    val output = spark.read.orc("myFile.orc")
      .filter(col("key.id_1") > lit(1))
      .select("key.id_1")
      .explain
    
    scala> output.explain
    == Physical Plan ==
    *(1) Project [key#68.id_1 AS id_1#73]
    +- *(1) Filter (isnotnull(value#69) AND (value#69.value_1 > 1))
       +- FileScan orc [key#68,value#69] Batched: false, DataFilters: [isnotnull(value#69), (value#69.value_1 > 1)], Format: ORC, Location: InMemoryFileIndex[file:/C:/Users/(C)KurtHoman/myFile.orc], PartitionFilters: [], PushedFilters: [IsNotNull(value), GreaterThan(value.value_1,1)], ReadSchema: struct<key:struct<id_1:int>,value:struct<value_1:int>>
    

    We see that there are some DataFilters/PushedFilters at work, so predicate pushdown is working. If you want to really avoid to read full files, you need to make sure your input file is properly partitioned. Some more info about those filters here.

    Now, we do see indeed that both the key and the value column are being read in, but that is because a PushedFilter alone does not guarantee that you absolutely don't read in any value where the filter predicate is false, it just applies a prefilter on the file-level (more info in this SO answer). So we will actually have to apply that filter in our Spark DAG as well (which you see in the output of explain).

    So, to wrap it up: