I have a very large amount of data in my S3 bucket partitioned by two columns MODULE
and DATE
such that the file structure of my parquets are:
s3://my_bucket/path/file.parquet/MODULE='XYZ'/DATE=2020-01-01
I have 7 MODULE
and the DATE
ranges from 2020-01-01
to 2020-09-01
.
I found a discrepancy in the data and need to correct the MODULE
entries for one of the module. Basically I need to change all data for a particular index number, belonging to MODULE
XYZ to MODULE
ABC.
I can do this in pyspark by loading the data frame and doing something like:
df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))
But how do I repartition it so that only those entries that are changed get moved to the ABC MODULE
partition? If I do something like:
df.mode('append').partitionBy('MODULE','DATE').parquet(s3://my_bucket/path/file.parquet")
I would be adding the data along with the erroneous MODULE
data. Plus, I have almost a years worth of data and don't want to repartition the entire dataset as it would take a very long time.
Is there a way to do this?
If I understand well, you have data in partition MODULE=XYZ
that should be moved to MODULE=ABC
.
First, identify the impacted files.
from pyspark.sql import functions as F
file_list = df.where(F.col("index") == 34).select(
F.input_file_name()
).distinct().collect()
Then, you create a dataframe based only on theses files, you use it to complete both MODULE
.
df = spark.read.parquet(file_list).withColumn(
"MODULE", when(col("index") == 34, "ABC").otherwise(col("MODULE"))
)
df.write.parquet(
"s3://my_bucket/path/ABC/", mode="append", partitionBy=["MODULE", "DATE"]
)
At this point, ABC should be OK (you just added the missing data), but XYZ should be wrong because of duplicate data. To recover XYZ, you just need to delete the list of files in file_list
.