apache-sparkpysparkapache-spark-sqldatabricksazure-databricks

Spark reading CSV with bad records


I am trying to read a csv file in spark using a pre-defined schema. For which I use:

df = (spark.read.format("csv")
        .schema(schema)
        .option("sep", ";")
        .load(
            file_path,
            header=True,
            encoding="utf-8"))

In this case the data gets loaded without any issues. Now when I provide a bad records path, I get no records:

df = (
        spark.read.format("csv")
        .schema(schema)
        .option("sep", ";")
        .option(
            "badRecordsPath",
            bad_records_path,
        )
        .load(
            file_path,
            header=True,
            encoding="utf-8",
        ))

All the records are dumped into the bad records path with the error. MALFORMED_CSV_RECORD (SQLSTATE: KD000) even though the schema used is exactly the same. Why would I be getting this error?


Solution

  • The solution that worked for me was adding an additional random StructField in the schema at the end(since order of fields matter for csv). I used StringType but any type should work. Spark will try to parse an additional column after the last delimeter at the end of the line and populate that column with nulls. The original data would be validated by the schema provided and bad records will be moved to quarantine. After reading the data, extra column can be dropped.

    Edit: adding an example

    Suppose we have a csv file with the following data:

    col1;col2;col3;col4;col5;
    3088548;"1166263";"something";"something";;
    

    In the above file we have 5 columns with a delimeter at the end. If we define our schema with just 5 columns and try to read the file we won't get any results and all records will be dumped in badRecords with MALFORMED error:

    enter image description here

    But if we define our schema with one extra random column at the end then we would be able to read the file while validating the schema:

    enter image description here

    After reading we can drop the extra column(col6).