This is the schema of a Spark DataFrame that I've created:
root
|-- id: double (nullable = true)
|-- sim_scores: struct (nullable = true)
| |-- scores: map (nullable = true)
| | |-- key: string
| | |-- value: map (valueContainsNull = true)
| | | |-- key: integer
| | | |-- value: vector (valueContainsNull = true)
The 'sim_scores' struct represents a Scala case-class that I am using for aggregation purposes. I have custom a UDAF designed to merge these structs. To make them merge-safe for all the edge-cases, they look like they do. Lets assume for this question, they have to stay this way.
I would like to 'flatten' this DataFrame into something like:
root
|-- id: double (nullable = true)
|-- score_1: map (valueContainsNull = true)
| |-- key: integer
| |-- value: vector (valueContainsNull = true)
|-- score_2: map (valueContainsNull = true)
| |-- key: integer
| |-- value: vector (valueContainsNull = true)
|-- score_3: map (valueContainsNull = true)
| |-- key: integer
| |-- value: vector (valueContainsNull = true)
...
The outer MapType in the 'scores' struct maps score topics to documents; the inner maps, representing a document, map sentence position within a document to a vector score. The 'score_1', 'score_2', ... represent all possible keys of the 'scores' MapType in the initial DF.
In json-ish terms, if I had an input that looks like:
{ "id": 739874.0,
"sim_scores": {
"firstTopicName": {
1: [1,9,1,0,1,1,4,6],
2: [5,7,8,2,4,3,1,3],
...
},
"anotherTopic": {
1: [6,8,4,1,3,4,2,0],
2: [0,1,3,2,4,5,6,2],
...
}
}
}
then I would get an output
{ "id": 739874.0,
"firstTopicName": {
1: [1,9,1,0,1,1,4,6],
2: [5,7,8,2,4,3,1,3],
...
}
"anotherTopic": {
1: [6,8,4,1,3,4,2,0],
2: [0,1,3,2,4,5,6,2],
...
}
}
If I knew the total number of topic columns, this would be easy; but I do not. The number of topics is set by the user at runtime, the output DataFrame has a variable number of columns. It is guarantees to be >=1, but I need to design this so that it could work with 100 different topic columns, if necessary.
How can I implement this?
Last note: I'm stuck using Spark 1.6.3; so solutions that work with that version are best. However, I'll take any way of doing it in hopes of future implementation.