scalaapache-sparkxml-parsingapache-spark-xml

Importing Manually Declared Nested Schema from Package Causes NullPointerException


I'm trying to parse XML files into DataFrames using Databricks' spark-xml with this line of code:

val xmlDF = spark
    .read
    .option("rowTag", "MeterReadingDocument")
    .option("valueTag", "foo") // meaningless, used to parse tags with no child elements
    .option("inferSchema", "false")
    .schema(schema)
    .xml(connectionString)

As you can see, I have provided a schema in order to avoid the costly operation of schema inference. This schema is defined as

 val schema = MyProjectUtils.Schemas.meterReadingDocumentSchema

Where MyProjectUtils is a package containing an object Schemas with the schema definition:

object Schemas {
...
// nested schemas 
...

val meterReadingDocumentSchema = StructType(
    Array(
      StructField("ReadingStatusRefTable", readingStatusRefTableSchema, nullable = true),
      StructField("Header", headerSchema, nullable = true),
      StructField("ImportExportParameters", importExportParametersSchema, nullable = true),
      StructField("Channels", channelsSchema, nullable = true),
      StructField("_xmlns:xsd", StringType, nullable = true),
      StructField("_xmlns:xsi", StringType, nullable = true)
    )
  )
}

You will notice the readingStatusRefTableSchema, headerSchema and other custom Schemas which are StructTypes corresponding to nested elements within the XML. These are in turn, nested as well, for example:

val headerSchema = StructType(
    Array(
      StructField("Creation_Datetime", creationDatetimeSchema, nullable = true),
      StructField("Export_Template", exportTemplateSchema, nullable = true),
      StructField("System", SystemSchema, nullable = true),
      StructField("Path", pathSchema, nullable = true),
      StructField("Timezone", timezoneSchema, nullable = true)
    )
  )

with

val creationDatetimeSchema = StructType(
    Array(
      StructField("_Datetime", TimestampType, nullable = true),
      StructField("foo", StringType, nullable = true)
    )
  )

(I can provide more details on the nested nature of the schema if relevant)

If I declare these nested schemas on a notebook or as an object within the notebook I use to read data, this works and loads the data. But when I create a jar from this project and execute it, I get the following stack trace:


INFO ApplicationMaster [shutdown-hook-0]: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.NullPointerException
    at org.apache.spark.sql.types.ArrayType.existsRecursively(ArrayType.scala:102)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1(StructType.scala:508)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1$adapted(StructType.scala:508)
    at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
    at scala.collection.IndexedSeqOptimized.exists(IndexedSeqOptimized.scala:49)
    at scala.collection.IndexedSeqOptimized.exists$(IndexedSeqOptimized.scala:49)
    at scala.collection.mutable.ArrayOps$ofRef.exists(ArrayOps.scala:198)
    at org.apache.spark.sql.types.StructType.existsRecursively(StructType.scala:508)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1(StructType.scala:508)
    at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1$adapted(StructType.scala:508)
    at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
    at scala.collection.IndexedSeqOptimized.exists(IndexedSeqOptimized.scala:49)
    at scala.collection.IndexedSeqOptimized.exists$(IndexedSeqOptimized.scala:49)
    at scala.collection.mutable.ArrayOps$ofRef.exists(ArrayOps.scala:198)
    at org.apache.spark.sql.types.StructType.existsRecursively(StructType.scala:508)
    at org.apache.spark.sql.catalyst.util.CharVarcharUtils$.hasCharVarchar(CharVarcharUtils.scala:56)
    at org.apache.spark.sql.catalyst.util.CharVarcharUtils$.failIfHasCharVarchar(CharVarcharUtils.scala:63)
    at org.apache.spark.sql.DataFrameReader.schema(DataFrameReader.scala:76)
    at com.mycompany.DataIngestion$.delayedEndpoint$com$mycompany$DataIngestion$1(DataIngestion.scala:44)
    at com.mycompany.DataIngestion$delayedInit$body.apply(DataIngestion.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at com.mycompany.DataIngestion$.main(DataIngestion.scala:10)
    at com.mycompany.DataIngestion.main(DataIngestion.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:739)
)

I added another simpler csv file and created a schema for it in the Schemas object. This schema has no nested structs from the same Schemas object and writing.

 val simplerDocSchema = MyProjectUtils.Schemas.anotherDocSchema

spark
      .read
      .csv(path)
      .schema(simplerDocSchema)
      .load(connectionString)
Schemas {
 ...
val anotherDocSchema: StructType = StructType(
    Array(
      StructField("ID", StringType, nullable = true),
      StructField("DATE", StringType, nullable = true),
      StructField("CODE", StringType, nullable = true),
      StructField("AD", StringType, nullable = true),
      StructField("ACCOUNT", StringType, nullable = true)
    )
  )
}

I expected this to fail as well but ran OK in a compiled project as well as in a notebook


Solution

  • Although you don't state which Spark version you are using the code doesn't seem to have changed for 8 years:

    override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
        f(this) || elementType.existsRecursively(f)
      }
    

    Highly probably elementType is null. As you don't provide the whole code I'd guess you have an ArrayType(someVal, ..) which is not yet defined.

    Swap your vals to def's and try again.