For this problem, we're going to combine a couple of different techniques to make this code both testable and highly scalable.
Theory
When parsing raw files, you have a couple of options you can consider:
- ❌ You can write your own parser to read bytes from files and convert them into data Spark can understand.
- This is highly discouraged whenever possible due to the engineering time and unscalable architecture. It doesn't take advantage of distributed compute when you do this as you must bring the entire raw file to your parsing method before you can use it. This is not an effective use of your resources.
- ⚠ You can use your own parser library not made for Spark, such as the XML Python library mentioned in the question
- While this is less difficult to accomplish than writing your own parser, it still does not take advantage of distributed computation in Spark. It is easier to get something running, but it will eventually hit a limit of performance because it does not take advantage of low-level Spark functionality only exposed when writing a Spark library.
- ✅ You can use a Spark-native raw file parser
- This is the preferred option in all cases as it takes advantage of low-level Spark functionality and doesn't require you to write your own code. If a low-level Spark parser exists, you should use it.
In our case, we can use the Databricks parser to great effect.
In general, you should also avoid using the .udf
method as it likely is being used instead of good functionality already available in the Spark API. UDFs are not as performant as native methods and should be used only when no other option is available.
A good example of UDFs covering up hidden problems would be string manipulations of column contents; while you technically can use a UDF to do things like splitting and trimming strings, these things already exist in the Spark API and will be orders of magnitude faster than your own code.
Design
Our design is going to use the following:
- Low-level Spark-optimized file parsing done via the Databricks XML Parser
- Test-driven raw file parsing as explained here
Wire the Parser
First, we need to add the .jar
to our spark_session
available inside Transforms. Thanks to recent improvements, this argument, when configured, will allow you to use the .jar
in both Preview/Test and at full build time. Previously, this would have required a full build but not so now.
We need to go to our transforms-python/build.gradle
file and add 2 blocks of config:
- Enable the
pytest
plugin
- Enable the
condaJars
argument and declare the .jar
dependency
My /transforms-python/build.gradle
now looks like the following:
buildscript {
repositories {
// some other things
}
dependencies {
classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
}
}
apply plugin: 'com.palantir.transforms.lang.python'
apply plugin: 'com.palantir.transforms.lang.python-defaults'
dependencies {
condaJars "com.databricks:spark-xml_2.13:0.14.0"
}
// Apply the testing plugin
apply plugin: 'com.palantir.transforms.lang.pytest-defaults'
// ... some other awesome features you should enable
After applying this config, you'll want to restart your Code Assist session by clicking on the bottom ribbon and hitting Refresh

After refreshing Code Assist, we now have low-level functionality available to parse our .xml
files, now we need to test it!
Testing the Parser
If we adopt the same style of test-driven development as here, we end up with /transforms-python/src/myproject/datasets/xml_parse_transform.py
with the following contents:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
... an example file /transforms-python/test/myproject/datasets/sample.xml
with contents:
<tag>
<field1>
my_value
</field1>
</tag>
And a test file /transforms-python/test/myproject/datasets/test_xml_parse_transform.py
:
from myproject.datasets import xml_parse_transform
from pkg_resources import resource_filename
def test_parse_xml(spark_session):
file_path = resource_filename(__name__, "sample.xml")
parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
assert parsed_df.count() == 1
assert set(parsed_df.columns) == {"field1"}
We now have:
- A distributed-compute, low-level
.xml
parser that is highly scalable
- A test-driven setup that we can quickly iterate on to get our exact functionality right
Cheers