1

I have a Spark job, which has a DataFrame with the following value :

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type": {
      "isMale": true,
      "id": "dd",
      "mcc": 1234,
      "name": "Adam"
    }
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
    "type2": {
      "isMale": true,
      "id": "dd",
      "mcc": 12134,
      "name": "Perth"
    }
  }
}

and I want to flatten it out elegantly (as no of keys is unknown and type etc) in such a way that props remains as a struct but everything inside it is flattened off (irrespective of the level of nesting)

The output desired is :

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type.isMale": true,
    "type.id": "dd",
    "type.mcc": 1234,
    "type.name": "Adam"
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
      "type2.isMale": true,
      "type2.id": "dd",
      "type2.mcc": 12134,
      "type2.name": "Perth"
  }
}

I used the solution mentioned in Automatically and Elegantly flatten DataFrame in Spark SQL

however, I'm unable to keep the props field intact. It also gets flattened off. Can somebody help me with extending this solution?

The final schema should be something like :

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
      |-- type2.id: string (nullable = true)
 |    |-- type2.isMale: boolean (nullable = true)
 |    |-- type2.mcc: long (nullable = true)
 |    |-- type2.name: string (nullable = true)
 |-- test_id: string (nullable = true)
mythic
  • 535
  • 7
  • 21

1 Answers1

5

I've been able to achieve this with the RDD API :

val jsonRDD = df.rdd.map{row =>
  def unnest(r: Row): Map[String, Any] = {
    r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
      (f.name, f.dataType) match {
        case ("props", _:StructType) =>
          val propsObject = r.getAs[Row](f.name)
          Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
            val subObject = propsObject.getAs[Row](propsAttr.name)
            subObject.schema.fields.map{subField =>
              s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
            }
          }.toMap)
        case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
        case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
        case _ => Map(f.name -> r.get(i))
      }
    }
  }.toMap

  val asMap = unnest(row)
  new ObjectMapper().registerModule(DefaultScalaModule).writeValueAsString(asMap)
}

val finalDF = spark.read.json(jsonRDD.toDS).cache

The solution should accept deeply nested inputs, thanks to recursion.

With your data, here's what we get :

finalDF.printSchema()
finalDF.show(false)
finalDF.select("props.*").show()

Outputs :

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
 |-- test_id: string (nullable = true)

+-------+----------------------+-------+
|id     |props                 |test_id|
+-------+----------------------+-------+
|abchchd|[dd, true, 1234, Adam]|ndsbsb |
+-------+----------------------+-------+

+-------+-----------+--------+---------+
|type.id|type.isMale|type.mcc|type.name|
+-------+-----------+--------+---------+
|     dd|       true|    1234|     Adam|
+-------+-----------+--------+---------+

But we can also pass more nested/complexe structures like for instance :

val str2 = """{"newroot":[{"mystruct":{"id":"abchchd","test_id":"ndsbsb","props":{"type":{"isMale":true,"id":"dd","mcc":1234,"name":"Adam"}}}}]}"""

...

finalDF.printSchema()
finalDF.show(false)

Gives the following output :

root
 |-- newroot: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- mystruct: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- props: struct (nullable = true)
 |    |    |    |    |-- type.id: string (nullable = true)
 |    |    |    |    |-- type.isMale: boolean (nullable = true)
 |    |    |    |    |-- type.mcc: long (nullable = true)
 |    |    |    |    |-- type.name: string (nullable = true)
 |    |    |    |-- test_id: string (nullable = true)

+---------------------------------------------+
|root                                         |
+---------------------------------------------+
|[[[abchchd, [dd, true, 1234, Adam], ndsbsb]]]|
+---------------------------------------------+

EDIT: As you mentioned, if you have records with different structure you need to wrap the above subObject value in an option.
Here's the fixed unnest function :

def unnest(r: Row): Map[String, Any] = {
  r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
    (f.name, f.dataType) match {
      case ("props", _:StructType) =>
        val propsObject = r.getAs[Row](f.name)
        Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
          val subObjectOpt = Option(propsObject.getAs[Row](propsAttr.name))
          subObjectOpt.toSeq.flatMap{subObject => subObject.schema.fields.map{subField =>
            s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
          }}
        }.toMap)
      case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
      case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
      case _ => Map(f.name -> r.get(i))
    }
  }
}.toMap

New printSchema gives :

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
 |    |-- type2.id: string (nullable = true)
 |    |-- type2.isMale: boolean (nullable = true)
 |    |-- type2.mcc: long (nullable = true)
 |    |-- type2.name: string (nullable = true)
 |-- test_id: string (nullable = true)
baitmbarek
  • 2,440
  • 4
  • 18
  • 26
  • Hi @baitmbarek, This seems to work! Looks simple and powerful! Thank you so much. Just had a question, with respect to nested fields, if I do a finalDf.select("props.type.id") It throws an error saying the field was not found. This is because spark is looking for a strong struct called type and hence it fails. One work around is to rename the subfields to type_id and replace the dot with the underscore. Is there any other way that I could go ahead with dot without it being a struct field? – mythic Jan 10 '20 at 03:46
  • Hi @mythic you need to "escape" the dot : ```finalDF.select("props.`type.isMale`").show()```. This renders the `type.isMale` column – baitmbarek Jan 10 '20 at 07:09
  • 1
    Hi @baitmbarek, this solution seems to be breaking when the df has different named fields. I have a field called "type" and also a field called "type1" in another row of the df. Its throwing a null pointer exception! :( – mythic Jan 13 '20 at 15:55
  • Hi @mythic can you share the structure so I can fix / adapt the method ? – baitmbarek Jan 13 '20 at 15:57
  • Hi @baitmbarek, Ive added two rows from the dataframe and also the final desired schema. Please let me know if you require anything else – mythic Jan 13 '20 at 16:03
  • Thank you @mythic I'll take a look at it this evening ;) – baitmbarek Jan 13 '20 at 16:06
  • 1
    @mythic I just edited the answer with an improvement to handle potentially missing attributes – baitmbarek Jan 13 '20 at 19:27