pythonpython-3.xapache-sparkpysparkapache-spark-sql

How to Read Multiple CSV Files with Skipping Rows and Footer in PySpark Efficiently?


I have several CSV files with an inconsistent number of data rows without a header row and I want to read these files into a single PySpark DataFrame. The structure of the CSV files is as follows:

data1,data2
data1,data2,data3
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6       =>        data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2
data1,data2
data1,data2,data3

I want to skip the first two rows and the last three rows from each file before combining them into one DataFrame. I am using the following approach, which works but I am looking for a more optimized solution because the datasets is quite large.

def concat(df_list: list):
    df = df_list[0]
    for i in df_list[1:]:
        df = df.unionByName(i, allowMissingColumns=True)
    return df


def __read_with_separators(self, spark: SparkSession, field_details: List[Dict[str, Any]], file_path_list: List[str], kwargs: dict) -> DataFrame:
    df_list = []
    for file_path in file_path_list:
        rdd = spark.sparkContext.textFile(file_path)

        total_rows = rdd.count()
        start_index = kwargs.get("skiprows", 0)
        end_index = total_rows - kwargs.get("skipfooter", 0)

        rdd_filtered = rdd.zipWithIndex().filter(lambda x: start_index <= x[1] < end_index).map(lambda x: x[0]).map(lambda line: line.split(delimiter))
        temp_df = rdd_filtered.toDF(schema)
        df_list.append(temp_df)

    return concat(df_list)

Questions:

  1. Is there a way to read multiple CSV files at once while skipping rows and footer rows more efficiently?
  2. Are there any optimizations or improvements to the current approach to handle large datasets more effectively?

Solution

  • After loading your dataframe with spark.read.text, you can take the 3rd row from the dataframe(it will be the 3rd line of one of your input files before any data shuffling), run schema_of_csv to get the schema and then apply from_csv function:

    from pyspark.sql import functions as F
    from functools import reduce
    
    csv_path = "/path/to/files"
    df = spark.read.text(csv_path)
    
    sample = df.take(3)[-1].value
    
    # get the schema of this CSV row
    schema = F.schema_of_csv(sample)
    
    # convert csv strings into columns based on the above schema
    df = df.select(F.from_csv('value', schema).alias('value')).select("value.*")
    
    # filter out rows if all the values of column 4,5,6 are NULL
    df_new = df.where(~reduce(lambda x,y: x&y, map(F.isnull, df.columns[3:])))
    

    In case some of your CSV fields contain newlines(which is common in many production environments), the above approach will not work, reading in multiLine mode is required. in such case, you can try:

    # get a full sample CSV row and find the schema from the sample
    # you can program this to read the top N lines from any processing file on your storage engine(aws, hadoop etc)
    # which cover a full valid row. discard the first 2 lines and then run spark.read.csv, Spark will 
    # infer CSV schema from the topmost lines.
    # example:     csv_sample = '\n'.join(spark.sparkContext.textFile('/path/to/a/csv/file').take(10)[2:])
    csv_sample = """data1,data2,data3,",data4 
                       ",data5,data6"""
    
    csv_schema = spark.read.csv(spark.sparkContext.parallelize([csv_sample]), multiLine=True).schema
    
    # then load the dataframe using spark.read.csv
    df = spark.read.csv(csv_path, schema=csv_schema, multiLine=True)