Don't be misled by databricks spark-xml docs, as they lead to use uncompressed XML file as an input. This is very inefficient and much faster is to download XMLs directly to spark dataframe. Databricks xml-pyspark version doesn't include it but there is a workaround:
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)
def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1
scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
XMLs downloaded to list
xml = [('url',"""<Level_0 Id0="Id0_value_file1">
<Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
<Level_2_A>A</Level_2_A>
<Level_2>
<Level_3>
<Level_4>
<Date>2021-01-01</Date>
<Value>4_1</Value>
</Level_4>
<Level_4>
<Date>2021-01-02</Date>
<Value>4_2</Value>
</Level_4>
</Level_3>
</Level_2>
</Level_1>
</Level_0>"""),
('url',"""<Level_0 I"d0="Id0_value_file2">
<Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
<Level_2_A>A</Level_2_A>
<Level_2>
<Level_3>
<Level_4>
<Date>2021-01-01</Date>
<Value>4_1</Value>
</Level_4>
<Level_4>
<Date>2021-01-02</Date>
<Value>4_2</Value>
</Level_4>
</Level_3>
</Level_2>
</Level_1>
</Level_0>""")]
Spark data frame transformation of XML string:
#create df with XML strings
rdd = sc.parallelize(xml)
df = spark.createDataFrame(rdd,"url string, content string")
# XML schema
payloadSchema = ext_schema_of_xml_df(df.select("content"))
# parse xml
parsed = df.withColumn("parsed", ext_from_xml(df.content, payloadSchema, {"rowTag":"Level_0"}))
# select required data
df2 = parsed.select(
'parsed._Id0',
F.explode_outer('parsed.Level_1.Level_2.Level_3.Level_4').alias('Level_4')
).select(
'`parsed._Id0`',
'Level_4.*'
)
To decode bytes: b'string'.decode('utf-8')
@mck answer for more info about XMLs:
How to transform to spark Data Frame data from multiple nested XML files with attributes