I'm trying to parse XML files into DataFrames using Databricks' spark-xml with this line of code:
val xmlDF = spark
.read
.option("rowTag", "MeterReadingDocument")
.option("valueTag", "foo") // meaningless, used to parse tags with no child elements
.option("inferSchema", "false")
.schema(schema)
.xml(connectionString)
As you can see, I have provided a schema in order to avoid the costly operation of schema inference. This schema is defined as
val schema = MyProjectUtils.Schemas.meterReadingDocumentSchema
Where MyProjectUtils
is a package containing an object Schemas
with the schema definition:
object Schemas {
...
// nested schemas
...
val meterReadingDocumentSchema = StructType(
Array(
StructField("ReadingStatusRefTable", readingStatusRefTableSchema, nullable = true),
StructField("Header", headerSchema, nullable = true),
StructField("ImportExportParameters", importExportParametersSchema, nullable = true),
StructField("Channels", channelsSchema, nullable = true),
StructField("_xmlns:xsd", StringType, nullable = true),
StructField("_xmlns:xsi", StringType, nullable = true)
)
)
}
You will notice the readingStatusRefTableSchema
, headerSchema
and other custom Schemas which are StructTypes
corresponding to nested elements within the XML. These are in turn, nested as well, for example:
val headerSchema = StructType(
Array(
StructField("Creation_Datetime", creationDatetimeSchema, nullable = true),
StructField("Export_Template", exportTemplateSchema, nullable = true),
StructField("System", SystemSchema, nullable = true),
StructField("Path", pathSchema, nullable = true),
StructField("Timezone", timezoneSchema, nullable = true)
)
)
with
val creationDatetimeSchema = StructType(
Array(
StructField("_Datetime", TimestampType, nullable = true),
StructField("foo", StringType, nullable = true)
)
)
(I can provide more details on the nested nature of the schema if relevant)
If I declare these nested schemas on a notebook or as an object within the notebook I use to read data, this works and loads the data. But when I create a jar from this project and execute it, I get the following stack trace:
INFO ApplicationMaster [shutdown-hook-0]: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.NullPointerException
at org.apache.spark.sql.types.ArrayType.existsRecursively(ArrayType.scala:102)
at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1(StructType.scala:508)
at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1$adapted(StructType.scala:508)
at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
at scala.collection.IndexedSeqOptimized.exists(IndexedSeqOptimized.scala:49)
at scala.collection.IndexedSeqOptimized.exists$(IndexedSeqOptimized.scala:49)
at scala.collection.mutable.ArrayOps$ofRef.exists(ArrayOps.scala:198)
at org.apache.spark.sql.types.StructType.existsRecursively(StructType.scala:508)
at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1(StructType.scala:508)
at org.apache.spark.sql.types.StructType.$anonfun$existsRecursively$1$adapted(StructType.scala:508)
at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
at scala.collection.IndexedSeqOptimized.exists(IndexedSeqOptimized.scala:49)
at scala.collection.IndexedSeqOptimized.exists$(IndexedSeqOptimized.scala:49)
at scala.collection.mutable.ArrayOps$ofRef.exists(ArrayOps.scala:198)
at org.apache.spark.sql.types.StructType.existsRecursively(StructType.scala:508)
at org.apache.spark.sql.catalyst.util.CharVarcharUtils$.hasCharVarchar(CharVarcharUtils.scala:56)
at org.apache.spark.sql.catalyst.util.CharVarcharUtils$.failIfHasCharVarchar(CharVarcharUtils.scala:63)
at org.apache.spark.sql.DataFrameReader.schema(DataFrameReader.scala:76)
at com.mycompany.DataIngestion$.delayedEndpoint$com$mycompany$DataIngestion$1(DataIngestion.scala:44)
at com.mycompany.DataIngestion$delayedInit$body.apply(DataIngestion.scala:10)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:431)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at com.mycompany.DataIngestion$.main(DataIngestion.scala:10)
at com.mycompany.DataIngestion.main(DataIngestion.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:739)
)
I added another simpler csv file and created a schema for it in the Schemas
object. This schema has no nested structs from the same Schemas
object and writing.
val simplerDocSchema = MyProjectUtils.Schemas.anotherDocSchema
spark
.read
.csv(path)
.schema(simplerDocSchema)
.load(connectionString)
Schemas {
...
val anotherDocSchema: StructType = StructType(
Array(
StructField("ID", StringType, nullable = true),
StructField("DATE", StringType, nullable = true),
StructField("CODE", StringType, nullable = true),
StructField("AD", StringType, nullable = true),
StructField("ACCOUNT", StringType, nullable = true)
)
)
}
I expected this to fail as well but ran OK in a compiled project as well as in a notebook
Although you don't state which Spark version you are using the code doesn't seem to have changed for 8 years:
override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
f(this) || elementType.existsRecursively(f)
}
Highly probably elementType is null. As you don't provide the whole code I'd guess you have an ArrayType(someVal, ..) which is not yet defined.
Swap your vals to def's and try again.