apache-sparkpysparkapache-spark-xml

How to identify or reroute bad xml's when reading xmls with spark


Using spark, I am trying to read a bunch of xmls from a path, one of the files is a dummy file which is not an xml.

I would like the spark to tell me that one particular file is not valid, in any way

Adding "badRecordsPath" otiton writes the bad data into specified location for JSON files, but the same is not working for xml, is there some other way?

df = (spark.read.format('json')
      .option('badRecordsPath','/tmp/data/failed')
      .load('/tmp/data/dummy.json')

Solution

  • As far as I know.... Unfortunately it wasnt available in xml package of spark till today in a declarative way... in the way you are expecting...

    Json it was working since FailureSafeParser was implemented like below... in DataFrameReader

    /**
       * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
       * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
       *
       * Unless the schema is specified using `schema` function, this function goes through the
       * input once to determine the input schema.
       *
       * @param jsonDataset input Dataset with one JSON object per record
       * @since 2.2.0
       */
      def json(jsonDataset: Dataset[String]): DataFrame = {
        val parsedOptions = new JSONOptions(
          extraOptions.toMap,
          sparkSession.sessionState.conf.sessionLocalTimeZone,
          sparkSession.sessionState.conf.columnNameOfCorruptRecord)
    
        val schema = userSpecifiedSchema.getOrElse {
          TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
        }
    
        ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
        val actualSchema =
          StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
    
        val createParser = CreateJacksonParser.string _
        val parsed = jsonDataset.rdd.mapPartitions { iter =>
          val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
          val parser = new FailureSafeParser[String](
            input => rawParser.parse(input, createParser, UTF8String.fromString),
            parsedOptions.parseMode,
            schema,
            parsedOptions.columnNameOfCorruptRecord)
          iter.flatMap(parser.parse)
        }
        sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
      }
    

    you can implement the feature programatic way.
    read all the files in the folder using sc.textFile . foreach file using xml parser parse the entries.

    If its valid redirect to another path .

    If its invalid, then write in to bad record path.