pyspark

Partitioning with pyspark


I have a csv file A.csv with a given format and a column ID having the value A for each line.

ID,X,Y,Z
A,xx1,yy1,zz1
A,xx2,yy2,zz2
A,xx3,yy3,zz3

I have another csv file B.csv with the same format and a column ID having the value B for each line.

ID,X,Y,Z
B,xx4,yy4,zz4
B,xx5,yy5,zz5
B,xx6,yy6,zz6

Same thing with a third file C.csv with value C as ID.

ID,X,Y,Z
C,xx7,yy7,zz7
C,xx8,yy8,zz8
C,xx9,yy9,zz9

So I have three different files at the same format, each with a unique value for the ID column. My goal is to use each file separately to create partioned parquet directories without merging the three files.

I use pyspark to create parquet file from each csv file with the option partitionBy('ID'): I launch my pyspark script with a loop over the three csv files to create the 3 directories ID=A, ID=B and ID=C with parquet files inside:

import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()

for mycsv in list("A.csv", "B.csv", "C.csv"):
    df=spark.read.option("header",True).csv(mycsv)
    # write parquet
    df.write.option("header", True).partitionBy("ID").mode("append").parquet("ID")

This creates three directories:

ID/ID=A/ with parquet files inside
ID/ID=B/ with parquet files inside
ID/ID=C/ with parquet files inside

Is this partitioning really used when I request this files with the condition "WHERE ID='B'"?

SELECT * from 'ID/*/*.parquet' WHERE ID = 'B'

In that case the query should use only the ID/ID=B directory to do the search if the partitionning is really used...

Or do I merge my 3 csv files in one ABC.csv then create my parquet files with the option "partitionBy"?

ID,X,Y,Z
A,xx1,yy1,zz1
A,xx2,yy2,zz2
A,xx3,yy3,zz3
B,xx4,yy4,zz4
B,xx5,yy5,zz5
B,xx6,yy6,zz6
C,xx7,yy7,zz7
C,xx8,yy8,zz8
C,xx9,yy9,zz9

And the script is now:

import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
mycsv = 'ABC.csv'
df=spark.read.option("header",True).csv(mycsv)
# write parquet
df.write.option("header", True).partitionBy("ID").mode("append").parquet("ID")

This will create also three directories :

ID/ID=A/ with parquet files inside
ID/ID=B/ with parquet files inside
ID/ID=C/ with parquet files inside

Are these two methods used really the partitioning when I request it with:

SELECT * from 'ID/*/*.parquet' WHERE ID = 'B'

Solution

  • Yes, in both cases it will look up just the partition you need. This can be verified by looking at the physical execution plan.

    For example, if you did

    output_path = "partitioned_data"
    shutil.rmtree(output_path, ignore_errors=True)
    for filename in list("A.csv", "B.csv", "C.csv"):
       df = spark.read.option("header", True).csv(filename)
       df.write.partitionBy("ID").mode("append").parquet(output_path)
    
    filtered_df = spark.sql(f"SELECT * FROM parquet.`{output_path}` WHERE ID = 'B'")
    print("EXECUTION PLAN:")
    print(filtered_df.explain(True))
    

    It will show something like

    == Physical Plan ==
    *(1) ColumnarToRow
    +- FileScan parquet [X#2854,Y#2855,Z#2856,ID#2857] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/partitioned_data], PartitionFilters: [isnotnull(ID#2857), (ID#2857 = B)], PushedFilters: [], ReadSchema: struct<X:string,Y:string,Z:string>
    

    where you see PartitionFilters: [isnotnull(ID#2857), (ID#2857 = B)].

    In contrast try it without applying the partitionBy before storing the data and you will see that the physical plan no longer contains the PartitionFilters.

    Here is a source that talks about this.