I have a csv file A.csv with a given format and a column ID having the value A for each line.
ID,X,Y,Z
A,xx1,yy1,zz1
A,xx2,yy2,zz2
A,xx3,yy3,zz3
I have another csv file B.csv with the same format and a column ID having the value B for each line.
ID,X,Y,Z
B,xx4,yy4,zz4
B,xx5,yy5,zz5
B,xx6,yy6,zz6
Same thing with a third file C.csv with value C as ID.
ID,X,Y,Z
C,xx7,yy7,zz7
C,xx8,yy8,zz8
C,xx9,yy9,zz9
So I have three different files at the same format, each with a unique value for the ID column. My goal is to use each file separately to create partioned parquet directories without merging the three files.
I use pyspark to create parquet file from each csv file with the option partitionBy('ID'): I launch my pyspark script with a loop over the three csv files to create the 3 directories ID=A, ID=B and ID=C with parquet files inside:
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
for mycsv in list("A.csv", "B.csv", "C.csv"):
df=spark.read.option("header",True).csv(mycsv)
# write parquet
df.write.option("header", True).partitionBy("ID").mode("append").parquet("ID")
This creates three directories:
ID/ID=A/ with parquet files inside
ID/ID=B/ with parquet files inside
ID/ID=C/ with parquet files inside
Is this partitioning really used when I request this files with the condition "WHERE ID='B'"?
SELECT * from 'ID/*/*.parquet' WHERE ID = 'B'
In that case the query should use only the ID/ID=B directory to do the search if the partitionning is really used...
Or do I merge my 3 csv files in one ABC.csv then create my parquet files with the option "partitionBy"?
ID,X,Y,Z
A,xx1,yy1,zz1
A,xx2,yy2,zz2
A,xx3,yy3,zz3
B,xx4,yy4,zz4
B,xx5,yy5,zz5
B,xx6,yy6,zz6
C,xx7,yy7,zz7
C,xx8,yy8,zz8
C,xx9,yy9,zz9
And the script is now:
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
mycsv = 'ABC.csv'
df=spark.read.option("header",True).csv(mycsv)
# write parquet
df.write.option("header", True).partitionBy("ID").mode("append").parquet("ID")
This will create also three directories :
ID/ID=A/ with parquet files inside
ID/ID=B/ with parquet files inside
ID/ID=C/ with parquet files inside
Are these two methods used really the partitioning when I request it with:
SELECT * from 'ID/*/*.parquet' WHERE ID = 'B'
Yes, in both cases it will look up just the partition you need. This can be verified by looking at the physical execution plan.
For example, if you did
output_path = "partitioned_data"
shutil.rmtree(output_path, ignore_errors=True)
for filename in list("A.csv", "B.csv", "C.csv"):
df = spark.read.option("header", True).csv(filename)
df.write.partitionBy("ID").mode("append").parquet(output_path)
filtered_df = spark.sql(f"SELECT * FROM parquet.`{output_path}` WHERE ID = 'B'")
print("EXECUTION PLAN:")
print(filtered_df.explain(True))
It will show something like
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [X#2854,Y#2855,Z#2856,ID#2857] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/partitioned_data], PartitionFilters: [isnotnull(ID#2857), (ID#2857 = B)], PushedFilters: [], ReadSchema: struct<X:string,Y:string,Z:string>
where you see PartitionFilters: [isnotnull(ID#2857), (ID#2857 = B)]
.
In contrast try it without applying the partitionBy
before storing the data and you will see that the physical plan no longer contains the PartitionFilters
.
Here is a source that talks about this.