xmlapache-sparkpysparkdatabricksapache-spark-xml

Spark-xml crashes on reading processing instructions


I'm attempting to read in an XML file to a Spark dataframe using the Databricks spark-xml package. However, when it comes across processing instructions Spark raises an error claiming an unexpected event.

I'm attempting to import the XML files into dataframes which I can then manipulate into flat files to write to CSV. The datasets are large enough that we need some sort of handler such as Spark. I've looked through the spark-xml documentation and can't find any mention of processing instructions. I don't actually need any of the information from the instructions, so I'd be happy to just pass over them if that were an option, but as it is they are jamming the whole file. Any suggestions would be appreciated.

Here is an XML snippet that reproduces the problem:

<?xml version="1.0" encoding="UTF-8"?>
<row>
<description>
<?issue?>
<text>foo</text>
</description>
</row>

Here is how I am attempting to read the XML in Python:

sc = SparkContext()
sql = SQLContext(sc)
xml = sql.read.format("com.databricks.spark.xml").option("rowTag", "row").load("example.xml")

And just for completeness, here's how I'm loading databricks and submitting the script to Spark:

spark-submit --packages com.databricks:spark-csv_2.11:1.5.0,com.databricks:spark-xml_2.10:0.4.1 example.py

When I attempt to read in the XML using the code above, Spark raises an exception claiming an "unexpected event." Find the exact error message below.

2019-08-20 13:47:03 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
2019-08-20 13:47:03 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

2019-08-20 13:47:03 ERROR TaskSetManager:70 - Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/oak/stanford/groups/hlwill/gsmoore/projects/parser_new/stackoverflow/example.py", line 10, in <module>
    xml = sql.read.format("com.databricks.spark.xml").option("rowTag", "row").load("example.xml")
  File "/share/software/user/open/spark/2.3.0/python/pyspark/sql/readwriter.py", line 166, in load
    return self._df(self._jreader.load(path))
  File "/share/software/user/open/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/share/software/user/open/spark/2.3.0/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/share/software/user/open/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o27.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
    at com.databricks.spark.xml.util.InferSchema$.infer(InferSchema.scala:109)
    at com.databricks.spark.xml.XmlRelation$$anonfun$1.apply(XmlRelation.scala:46)
    at com.databricks.spark.xml.XmlRelation$$anonfun$1.apply(XmlRelation.scala:46)
    at scala.Option.getOrElse(Option.scala:121)
    at com.databricks.spark.xml.XmlRelation.<init>(XmlRelation.scala:45)
    at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:65)
    at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:43)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Solution

  • Ended up figuring it out--turns out I'd been using an oudated version of spark-xml. At least at the moment, the proper way to load the databricks packages is as follows:

    spark-submit --packages com.databricks:spark-csv_2.11:1.5.0,com.databricks:spark-xml_2.11:0.6.0 example.py

    This way two things are true:

    1. All packages are running in the same version of Scala, 2.11 (which should match the version used to run Spark). You can see the version of Spark you are running by typing spark-shell --version.
    2. I am using the most up-to-date version of each package according to their github.