1

This is the schema of a Spark DataFrame that I've created:

root
 |-- id: double (nullable = true)
 |-- sim_scores: struct (nullable = true)
 |    |-- scores: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: map (valueContainsNull = true)
 |    |    |    |-- key: integer
 |    |    |    |-- value: vector (valueContainsNull = true)

The 'sim_scores' struct represents a Scala case-class that I am using for aggregation purposes. I have custom a UDAF designed to merge these structs. To make them merge-safe for all the edge-cases, they look like they do. Lets assume for this question, they have to stay this way.

I would like to 'flatten' this DataFrame into something like:

root
 |-- id: double (nullable = true)
 |-- score_1: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
 |-- score_2: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
 |-- score_3: map (valueContainsNull = true)
 |    |-- key: integer
 |    |-- value: vector (valueContainsNull = true)
...

The outer MapType in the 'scores' struct maps score topics to documents; the inner maps, representing a document, map sentence position within a document to a vector score. The 'score_1', 'score_2', ... represent all possible keys of the 'scores' MapType in the initial DF.

In json-ish terms, if I had an input that looks like:

{ "id": 739874.0,
  "sim_scores": {
    "firstTopicName": {
      1: [1,9,1,0,1,1,4,6],
      2: [5,7,8,2,4,3,1,3],
      ...
    },
    "anotherTopic": {
      1: [6,8,4,1,3,4,2,0],
      2: [0,1,3,2,4,5,6,2],
      ...
    }
  }
}

then I would get an output

{ "id": 739874.0,
  "firstTopicName": {
    1: [1,9,1,0,1,1,4,6],
    2: [5,7,8,2,4,3,1,3],
    ...
  }
  "anotherTopic": {
    1: [6,8,4,1,3,4,2,0],
    2: [0,1,3,2,4,5,6,2],
    ...
  }
}

If I knew the total number of topic columns, this would be easy; but I do not. The number of topics is set by the user at runtime, the output DataFrame has a variable number of columns. It is guarantees to be >=1, but I need to design this so that it could work with 100 different topic columns, if necessary.

How can I implement this?

Last note: I'm stuck using Spark 1.6.3; so solutions that work with that version are best. However, I'll take any way of doing it in hopes of future implementation.

kingledion
  • 2,263
  • 3
  • 25
  • 39
  • Possible duplicate of [How to get keys and values from MapType column in SparkSQL DataFrame](https://stackoverflow.com/questions/40602606/how-to-get-keys-and-values-from-maptype-column-in-sparksql-dataframe) – 10465355 Feb 26 '19 at 19:25

1 Answers1

1

At a high level, I think you have two options here:

  1. Using the dataframe API
  2. Switch to an RDD

If you want to keep using spark SQL, then you could use selectExpr and generate the select query:

it("should flatten using dataframes and spark sql") {
  val sqlContext = new SQLContext(sc)
  val df = sqlContext.createDataFrame(sc.parallelize(rows), schema)
  df.printSchema()
  df.show()
  val numTopics = 3 // input from user
  // fancy logic to generate the select expression
  val selectColumns: Seq[String] = "id" +: 1.to(numTopics).map(i => s"sim_scores['scores']['topic${i}']")
  val df2 = df.selectExpr(selectColumns:_*)
  df2.printSchema()
  df2.show()
}

Given this sample data:

val schema = sql.types.StructType(List(
  sql.types.StructField("id", sql.types.DoubleType, nullable = true),
  sql.types.StructField("sim_scores", sql.types.StructType(List(
    sql.types.StructField("scores", sql.types.MapType(sql.types.StringType, sql.types.MapType(sql.types.IntegerType, sql.types.StringType)), nullable = true)
  )), nullable = true)
))
val rows = Seq(
  sql.Row(1d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
  sql.Row(2d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2")))),
  sql.Row(3d, sql.Row(Map("topic1" -> Map(1 -> "scores1"), "topic2" -> Map(1 -> "scores2"), "topic3" -> Map(1 -> "scores3"))))
)

You get this result:

root
 |-- id: double (nullable = true)
 |-- sim_scores.scores[topic1]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)
 |-- sim_scores.scores[topic2]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)
 |-- sim_scores.scores[topic3]: map (nullable = true)
 |    |-- key: integer
 |    |-- value: string (valueContainsNull = true)

+---+-------------------------+-------------------------+-------------------------+
| id|sim_scores.scores[topic1]|sim_scores.scores[topic2]|sim_scores.scores[topic3]|
+---+-------------------------+-------------------------+-------------------------+
|1.0|        Map(1 -> scores1)|        Map(1 -> scores2)|                     null|
|2.0|        Map(1 -> scores1)|        Map(1 -> scores2)|                     null|
|3.0|        Map(1 -> scores1)|        Map(1 -> scores2)|        Map(1 -> scores3)|
+---+-------------------------+-------------------------+-------------------------+

The other option is to switch to processing an RDD where you could add more powerful flattening logic based on the keys in the map.

Kit Menke
  • 7,046
  • 1
  • 32
  • 54