1

Suppose we have a DataFrame with a column of map type.

val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")
df.show()
// +--------------------+
// |           mapColumn|
// +--------------------+
// |{foo -> 1, bar -> 2}|
// +--------------------+

What is the most straightforward way to convert it to a struct (or, equivalently, define a new column with the same keys and values but as a struct type)? See the following spark-shell (2.4.5) session, for an insanely inefficient way of going about it:

val jsonStr = df.select(to_json($"mapColumn")).collect()(0)(0).asInstanceOf[String]

spark.read.json(Seq(jsonStr).toDS()).show()
// +---+---+
// |bar|foo|
// +---+---+
// |  2|  1|
// +---+---+

Now, obviously collect() is very inefficient, and this is generally an awful way to do things in Spark. But what is the preferred way to accomplish this conversion? named_struct and struct both take a sequence of parameter values to construct the results, but I can't find any way to "unwrap" the map key/values to pass them to these functions.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Jeff Evans
  • 1,257
  • 1
  • 15
  • 31

4 Answers4

2

I would use explode function:

+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+

df.select(explode('mapColumn)).select(struct('*).as("struct"))

output:

+--------+
|  struct|
+--------+
|[foo, 1]|
|[bar, 2]|
+--------+

root
 |-- struct: struct (nullable = false)
 |    |-- key: string (nullable = false)
 |    |-- value: integer (nullable = false)
chlebek
  • 2,431
  • 1
  • 8
  • 20
  • This does exactly what I want, but the weird, seemingly unbalanced single quotes are throwing me off... – Jeff Evans Jun 05 '20 at 16:19
  • 1
    in case it's not obvious, in a real life scenario with multiple columns you would use `.select('*', struct('mapCol'))` – Topde Jun 06 '20 at 10:09
  • 1
    I finally learned that my confusion was due to the Symbol syntax. I.e. this: https://stackoverflow.com/a/918613/375670 – Jeff Evans Jun 19 '20 at 17:19
1

I see @chlebek answer but in case it should be kept in one row you can use an UDF

scala> val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")
df: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>]

scala> df.show
+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+

scala> case class KeyValue(key: String, value: String)
defined class KeyValue

scala> val toArrayOfStructs = udf((value: Map[String, String]) => value.map {
     |   case (k, v) => KeyValue(k, v)
     | }.toArray )
toArrayOfStructs: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,true)),true),Some(List(MapType(StringType,StringType,true))))

scala> df.withColumn("alfa", toArrayOfStructs(col("mapColumn")))
res4: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>, alfa: array<struct<key:string,value:string>>]

scala> res4.show
+--------------------+--------------------+
|           mapColumn|                alfa|
+--------------------+--------------------+
|[foo -> 1, bar -> 2]|[[foo, 1], [bar, 2]]|
+--------------------+--------------------+


scala> res4.printSchema
root
 |-- mapColumn: map (nullable = false)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- alfa: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
1

Your method doesn't seem to work with more rows, and especially if they have different maps, like this one:

val df = Seq(
    (Map("foo"->1, "bar"->2)),
    (Map("foo"->3, "baz"->4))
).toDF("mapColumn")

df.show()
// +--------------------+
// |           mapColumn|
// +--------------------+
// |{foo -> 1, bar -> 2}|
// |{foo -> 3, baz -> 4}|
// +--------------------+

Your script would return...

val jsonStr = df.select(to_json($"mapColumn")).collect()(0)(0).asInstanceOf[String]
spark.read.json(Seq(jsonStr).toDS()).show()
// +---+---+
// |bar|foo|
// +---+---+
// |  2|  1|
// +---+---+

Solutions

  • map to columns:

    val json_col = to_json($"mapColumn")
    val json_schema = spark.read.json(df.select(json_col).as[String]).schema
    val df2 = df.withColumn("_c", from_json(json_col, json_schema)).select("_c.*")
    
    df2.show()
    // +----+----+---+
    // | bar| baz|foo|
    // +----+----+---+
    // |   2|null|  1|
    // |null|   4|  3|
    // +----+----+---+
    
  • map to struct (field names: "key", "value"):

    val df2 = df.select(explode(map_entries($"mapColumn")).as("struct"))
    df2.show()
    // +--------+
    // |  struct|
    // +--------+
    // |{foo, 1}|
    // |{bar, 2}|
    // |{foo, 3}|
    // |{baz, 4}|
    // +--------+
    
  • map to struct (field names: "foo", "bar", "baz"):

    val json_col = to_json($"mapColumn")
    val json_schema = spark.read.json(df.select(json_col).as[String]).schema
    val df2 = df.select(from_json(json_col, json_schema).as("struct"))
    
    df2.show()
    // +------------+
    // |      struct|
    // +------------+
    // |{2, null, 1}|
    // |{null, 4, 3}|
    // +------------+
    
ZygD
  • 22,092
  • 39
  • 79
  • 102
0

Define a case class

case class Bean56(foo: Int, bar: Int)
//Define a bean Encoder
val personEncoder = Encoders.bean[Bean56](classOf[Bean56])

    val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")

//Map the output to required bean
    val Bean56s = df.map(row => {
      val map = row.getMap[String, Int](0)
      Bean56(map.getOrElse("foo", -1), map.getOrElse("bar", -1))
    })(personEncoder)  // Supply implicit Encoder of the bean
    Bean56s.foreach(println(_)) // Print the bean
QuickSilver
  • 3,915
  • 2
  • 13
  • 29