I have a set of .xml
documents that I want to parse.
I previously have tried to parse them using methods that take the file contents and dump them into a single cell, however I've noticed this doesn't work in practice since I'm seeing slower and slower run times, often with one task taking tens of hours to run:
The first transform of mine takes the .xml
contents and puts it into a single cell, and a second transform takes this string and uses Python's xml library to parse the string into a document. This document I'm then able to extract properties from and return a DataFrame.
I'm using a UDF to conduct the process of mapping the string contents to the fields I want.
How can I make this faster / work better with large .xml
files?
For this problem, we're going to combine a couple of different techniques to make this code both testable and highly scalable.
When parsing raw files, you have a couple of options you can consider:
In our case, we can use the Databricks parser to great effect.
In general, you should also avoid using the .udf
method as it likely is being used instead of good functionality already available in the Spark API. UDFs are not as performant as native methods and should be used only when no other option is available.
A good example of UDFs covering up hidden problems would be string manipulations of column contents; while you technically can use a UDF to do things like splitting and trimming strings, these things already exist in the Spark API and will be orders of magnitude faster than your own code.
Our design is going to use the following:
First, we need to add the .jar
to our spark_session
available inside Transforms. Thanks to recent improvements, this argument, when configured, will allow you to use the .jar
in both Preview/Test and at full build time. Previously, this would have required a full build but not so now.
We need to go to our transforms-python/build.gradle
file and add 2 blocks of config:
pytest
plugincondaJars
argument and declare the .jar
dependencyMy /transforms-python/build.gradle
now looks like the following:
buildscript {
repositories {
// some other things
}
dependencies {
classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
}
}
apply plugin: 'com.palantir.transforms.lang.python'
apply plugin: 'com.palantir.transforms.lang.python-defaults'
dependencies {
condaJars "com.databricks:spark-xml_2.13:0.14.0"
}
// Apply the testing plugin
apply plugin: 'com.palantir.transforms.lang.pytest-defaults'
// ... some other awesome features you should enable
After applying this config, you'll want to restart your Code Assist session by clicking on the bottom ribbon and hitting Refresh
After refreshing Code Assist, we now have low-level functionality available to parse our .xml
files, now we need to test it!
If we adopt the same style of test-driven development as here, we end up with /transforms-python/src/myproject/datasets/xml_parse_transform.py
with the following contents:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
... an example file /transforms-python/test/myproject/datasets/sample.xml
with contents:
<tag>
<field1>
my_value
</field1>
</tag>
And a test file /transforms-python/test/myproject/datasets/test_xml_parse_transform.py
:
from myproject.datasets import xml_parse_transform
from pkg_resources import resource_filename
def test_parse_xml(spark_session):
file_path = resource_filename(__name__, "sample.xml")
parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
assert parsed_df.count() == 1
assert set(parsed_df.columns) == {"field1"}
We now have:
.xml
parser that is highly scalableCheers