pysparkpalantir-foundryfoundry-code-repositoriesfoundry-python-transform

How do I parse xml documents in Palantir Foundry?


I have a set of .xml documents that I want to parse.

I previously have tried to parse them using methods that take the file contents and dump them into a single cell, however I've noticed this doesn't work in practice since I'm seeing slower and slower run times, often with one task taking tens of hours to run:

The first transform of mine takes the .xml contents and puts it into a single cell, and a second transform takes this string and uses Python's xml library to parse the string into a document. This document I'm then able to extract properties from and return a DataFrame.

I'm using a UDF to conduct the process of mapping the string contents to the fields I want.

How can I make this faster / work better with large .xml files?


Solution

  • For this problem, we're going to combine a couple of different techniques to make this code both testable and highly scalable.

    Theory

    When parsing raw files, you have a couple of options you can consider:

    1. ❌ You can write your own parser to read bytes from files and convert them into data Spark can understand.
      • This is highly discouraged whenever possible due to the engineering time and unscalable architecture. It doesn't take advantage of distributed compute when you do this as you must bring the entire raw file to your parsing method before you can use it. This is not an effective use of your resources.
    2. ⚠ You can use your own parser library not made for Spark, such as the XML Python library mentioned in the question
      • While this is less difficult to accomplish than writing your own parser, it still does not take advantage of distributed computation in Spark. It is easier to get something running, but it will eventually hit a limit of performance because it does not take advantage of low-level Spark functionality only exposed when writing a Spark library.
    3. ✅ You can use a Spark-native raw file parser
      • This is the preferred option in all cases as it takes advantage of low-level Spark functionality and doesn't require you to write your own code. If a low-level Spark parser exists, you should use it.

    In our case, we can use the Databricks parser to great effect.

    In general, you should also avoid using the .udf method as it likely is being used instead of good functionality already available in the Spark API. UDFs are not as performant as native methods and should be used only when no other option is available.

    A good example of UDFs covering up hidden problems would be string manipulations of column contents; while you technically can use a UDF to do things like splitting and trimming strings, these things already exist in the Spark API and will be orders of magnitude faster than your own code.

    Design

    Our design is going to use the following:

    1. Low-level Spark-optimized file parsing done via the Databricks XML Parser
    2. Test-driven raw file parsing as explained here

    Wire the Parser

    First, we need to add the .jar to our spark_session available inside Transforms. Thanks to recent improvements, this argument, when configured, will allow you to use the .jar in both Preview/Test and at full build time. Previously, this would have required a full build but not so now.

    We need to go to our transforms-python/build.gradle file and add 2 blocks of config:

    1. Enable the pytest plugin
    2. Enable the condaJars argument and declare the .jar dependency

    My /transforms-python/build.gradle now looks like the following:

    buildscript {
        repositories {
           // some other things
        }
    
        dependencies {
            classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
        }
    }
    
    apply plugin: 'com.palantir.transforms.lang.python'
    apply plugin: 'com.palantir.transforms.lang.python-defaults'
    
    dependencies {
        condaJars "com.databricks:spark-xml_2.13:0.14.0"
    }
    
    // Apply the testing plugin
    apply plugin: 'com.palantir.transforms.lang.pytest-defaults'
    
    // ... some other awesome features you should enable
    

    After applying this config, you'll want to restart your Code Assist session by clicking on the bottom ribbon and hitting Refresh

    Refresh

    After refreshing Code Assist, we now have low-level functionality available to parse our .xml files, now we need to test it!

    Testing the Parser

    If we adopt the same style of test-driven development as here, we end up with /transforms-python/src/myproject/datasets/xml_parse_transform.py with the following contents:

    from transforms.api import transform, Output, Input
    from transforms.verbs.dataframes import union_many
    
    
    def read_files(spark_session, paths):
        parsed_dfs = []
        for file_name in paths:
            parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
            parsed_dfs += [parsed_df]
        output_df = union_many(*parsed_dfs, how="wide")
        return output_df
    
    
    @transform(
        the_output=Output("my.awesome.output"),
        the_input=Input("my.awesome.input"),
    )
    def my_compute_function(the_input, the_output, ctx):
        session = ctx.spark_session
        input_filesystem = the_input.filesystem()
        hadoop_path = input_filesystem.hadoop_path
        files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
        output_df = read_files(session, files)
        the_output.write_dataframe(output_df)
    

    ... an example file /transforms-python/test/myproject/datasets/sample.xml with contents:

    <tag>
    <field1>
    my_value
    </field1>
    </tag>
    

    And a test file /transforms-python/test/myproject/datasets/test_xml_parse_transform.py:

    from myproject.datasets import xml_parse_transform
    from pkg_resources import resource_filename
    
    
    def test_parse_xml(spark_session):
        file_path = resource_filename(__name__, "sample.xml")
        parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
        assert parsed_df.count() == 1
        assert set(parsed_df.columns) == {"field1"}
    

    We now have:

    1. A distributed-compute, low-level .xml parser that is highly scalable
    2. A test-driven setup that we can quickly iterate on to get our exact functionality right

    Cheers