scalaapache-spark

Handling schema mismatches in Spark


I am reading a csv file using Spark in Scala. The schema is predefined and i am using it for reading. This is the esample code:

// create the schema
val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

// Initialize Spark session
val spark: SparkSession = SparkSession.builder
    .appName("Parquet Converter")
    .getOrCreate

// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)

From what i read when reading cav with Spark using a schema there are 3 options:

  1. Set mode to DROPMALFORMED --> this will drop the lines that don't match the schema
  2. Set mode to PERMISSIVE --> this will set the whole line to null values
  3. Set mode to FAILFAST --> this will throw an exception when a mismatch is discovered

What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame. Basically, I want a combination of FAILFAST and DROPMALFORMED.

Thanks in advance


Solution

  • This is what I eventually did:
    I added to the schema the "_corrupt_record" column, for example:

    val schema= StructType(Array(
        StructField("col1", IntegerType,true),    
        StructField("col2", StringType,false),
        StructField("col3", StringType,true),
        StructField("_corrupt_record", StringType, true)))
    

    Then I read the CSV using PERMISSIVE mode (it is Spark default):

    val dataFrame: DataFrame = spark.read.format("csv")
                                    .schema(schema)
                                    .option("header", false)
                                    .option("mode", "PERMISSIVE")
                                    .load(inputCsvPath)
    

    Now my data frame holds an additional column that holds the rows with schema mismatches. I filtered the rows that have mismatched data and printed it:

    val badRows = dataFrame.filter("_corrupt_record is not null")
    badRows.cache()
    badRows.show()