Suppose I have this XML file:
<root>
<id>1</id>
<tags>
<tagA>
<value>A1</value>
</tagA>
<tagA>
<value>A2</value>
</tagA>
<tagB>
<value>B</value>
</tagB>
<tagC>
<value>C</value>
</tagC>
</tags>
</root>
which I load into Spark with spark-xml in python, and I get this DataFrame:
+---+----------------------------------+
|id |tags |
+---+----------------------------------+
|1 |[WrappedArray([A1], [A2]),[B],[C]]|
+---+----------------------------------+
What would be the best approach to get to this desired output:
+---+-------+-----+
| id|tagName|value|
+---+-------+-----+
| 1| tagA| A1|
| 1| tagA| A2|
| 1| tagB| B|
| 1| tagC| C|
+---+-------+-----+
One solution I came up with (not finished yet, so not tested, but should work) was to use a python UDF which gets the tags
column as input, and goes through each tag and returns a new list:
def myf(tags):
....
return [[1,'tagA','A1'],[1,'tagA','A2'],[1,'tagB','B'],[1,'tagC','C']]
over which I can use explode()
to create separate rows for each entry in this array. However, for this I need to register a UDF with an explicit schema as a return type, e.g.
format = ArrayType(
StructType([
StructField('id',IntegerType()),
StructField('tagName',StringType()),
StructField('value',StringType())
]
)
spark.udf.register('myf', myf, format)
and then use it, e.g.:
df.selectExpr('explode(myf(tags)) AS cols').select('cols.id','cols.tagName','cols.value')
The problem is, first, I'm not sure if this is the best approach and more importantly, performant, because I will be dealing with a lot of rows. Second, I don't want to define a static schema for the function return object, it needs to be as dynamic as possible. Is there any better way to achieve this, preferably by using UDFs and DataFrames not RDDs, and hoping I'm not asking too much, possibly in one single query run?
PS: It is also best if the order of the tags under tags
is kept.