amazon-s3pysparkparquethadoop-partitioning

Moving files from one parquet partition to another


I have a very large amount of data in my S3 bucket partitioned by two columns MODULE and DATE such that the file structure of my parquets are:

s3://my_bucket/path/file.parquet/MODULE='XYZ'/DATE=2020-01-01

I have 7 MODULE and the DATE ranges from 2020-01-01 to 2020-09-01. I found a discrepancy in the data and need to correct the MODULE entries for one of the module. Basically I need to change all data for a particular index number, belonging to MODULE XYZ to MODULE ABC. I can do this in pyspark by loading the data frame and doing something like:

df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))

But how do I repartition it so that only those entries that are changed get moved to the ABC MODULE partition? If I do something like:

df.mode('append').partitionBy('MODULE','DATE').parquet(s3://my_bucket/path/file.parquet")

I would be adding the data along with the erroneous MODULE data. Plus, I have almost a years worth of data and don't want to repartition the entire dataset as it would take a very long time.

Is there a way to do this?


Solution

  • If I understand well, you have data in partition MODULE=XYZ that should be moved to MODULE=ABC.

    First, identify the impacted files.

    from pyspark.sql import functions as F
    
    file_list = df.where(F.col("index") == 34).select(
        F.input_file_name()
    ).distinct().collect()
    

    Then, you create a dataframe based only on theses files, you use it to complete both MODULE.

    df = spark.read.parquet(file_list).withColumn(
        "MODULE", when(col("index") == 34, "ABC").otherwise(col("MODULE"))
    )
    
    df.write.parquet(
        "s3://my_bucket/path/ABC/", mode="append", partitionBy=["MODULE", "DATE"]
    )
    

    At this point, ABC should be OK (you just added the missing data), but XYZ should be wrong because of duplicate data. To recover XYZ, you just need to delete the list of files in file_list.