I have several CSV files with an inconsistent number of data rows without a header row and I want to read these files into a single PySpark DataFrame. The structure of the CSV files is as follows:
data1,data2
data1,data2,data3
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 => data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2
data1,data2
data1,data2,data3
I want to skip the first two rows and the last three rows from each file before combining them into one DataFrame. I am using the following approach, which works but I am looking for a more optimized solution because the datasets is quite large.
def concat(df_list: list):
df = df_list[0]
for i in df_list[1:]:
df = df.unionByName(i, allowMissingColumns=True)
return df
def __read_with_separators(self, spark: SparkSession, field_details: List[Dict[str, Any]], file_path_list: List[str], kwargs: dict) -> DataFrame:
df_list = []
for file_path in file_path_list:
rdd = spark.sparkContext.textFile(file_path)
total_rows = rdd.count()
start_index = kwargs.get("skiprows", 0)
end_index = total_rows - kwargs.get("skipfooter", 0)
rdd_filtered = rdd.zipWithIndex().filter(lambda x: start_index <= x[1] < end_index).map(lambda x: x[0]).map(lambda line: line.split(delimiter))
temp_df = rdd_filtered.toDF(schema)
df_list.append(temp_df)
return concat(df_list)
Questions:
After loading your dataframe with spark.read.text, you can take the 3rd row from the dataframe(it will be the 3rd line of one of your input files before any data shuffling), run schema_of_csv to get the schema and then apply from_csv function:
from pyspark.sql import functions as F
from functools import reduce
csv_path = "/path/to/files"
df = spark.read.text(csv_path)
sample = df.take(3)[-1].value
# get the schema of this CSV row
schema = F.schema_of_csv(sample)
# convert csv strings into columns based on the above schema
df = df.select(F.from_csv('value', schema).alias('value')).select("value.*")
# filter out rows if all the values of column 4,5,6 are NULL
df_new = df.where(~reduce(lambda x,y: x&y, map(F.isnull, df.columns[3:])))
In case some of your CSV fields contain newlines(which is common in many production environments), the above approach will not work, reading in multiLine
mode is required. in such case, you can try:
# get a full sample CSV row and find the schema from the sample
# you can program this to read the top N lines from any processing file on your storage engine(aws, hadoop etc)
# which cover a full valid row. discard the first 2 lines and then run spark.read.csv, Spark will
# infer CSV schema from the topmost lines.
# example: csv_sample = '\n'.join(spark.sparkContext.textFile('/path/to/a/csv/file').take(10)[2:])
csv_sample = """data1,data2,data3,",data4
",data5,data6"""
csv_schema = spark.read.csv(spark.sparkContext.parallelize([csv_sample]), multiLine=True).schema
# then load the dataframe using spark.read.csv
df = spark.read.csv(csv_path, schema=csv_schema, multiLine=True)