1

I am trying to parse a json file as csv file.

The structure is a little bit complex and I wrote a spark program in scala to accomplish this task. Like the document does not contain a json object per line I decided to use the wholeTextFiles method as suggested in some answers and posts I’ve found.

val jsonRDD  = spark.sparkContext.wholeTextFiles(fileInPath).map(x => x._2)

Then I read the json content in a dataframe

val dwdJson = spark.read.json(jsonRDD)

Then I would like to navigate the json and flatten out the data. This is the schema from dwdJson

root
 |-- meta: struct (nullable = true)
 |    |-- dimensions: struct (nullable = true)
 |    |    |-- lat: long (nullable = true)
 |    |    |-- lon: long (nullable = true)
 |    |-- directory: string (nullable = true)
 |    |-- filename: string (nullable = true)
 |-- records: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- grids: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- gPt: array (nullable = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- time: string (nullable = true)

This is my best approach:

val dwdJson_e1 = dwdJson.select($"meta.filename", explode($"records").as("records_flat"))
val dwdJson_e2 = dwdJson_e1.select($"filename", $"records_flat.time",explode($"records_flat.grids").as("gPt"))
val dwdJson_e3 = dwdJson_e2.select($"filename", $"time", $"gPt.gPt")
val dwdJson_flat = dwdJson_e3.select($"filename"
      ,$"time"
      ,$"gPt".getItem(0).as("lat1")
      ,$"gPt".getItem(1).as("long1")
      ,$"gPt".getItem(2).as("lat2")
      ,$"gPt".getItem(3).as("long2")
      ,$"gPt".getItem(4).as("value"))

I am a scala rookie and I am wondering if I can avoid create the intermediate dataframes (dwdJson_e1, dwdJson_e2, dwdJson_e3) that seems to be inefficient and the program runs very slowly (compare with a java parser running in a laptop).

On the other side I could not find I way how to unbind these nested arrays.

spark version: 2.0.0 scala: 2.11.8 java: 1.8

**

Edit 1: Sample Json file and csv output

**

This is a sample Json file I want to convert:

{
  "meta" : {
    "directory" : "weather/cosmo/de/grib/12/aswdir_s",
    "filename" : "COSMODE_single_level_elements_ASWDIR_S_2018022312_000.grib2.bz2",
    "dimensions" : {
      "lon" : 589,
      "time" : 3,
      "lat" : 441
    }
   },
  "records" : [ {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 3.366295E-7 ]
    } ],
    "time" : "2018-02-23T12:15:00Z"
  }, {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 4.545918E-7 ]
    }
    ],
    "time" : "2018-02-23T12:30:00Z"
    }
    ]
}

This is a sample output from the json above:

filename, time, lat1, long1, lat2, long2, value
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.55, 45.2, 13.575,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.575, 45.2, 13.6,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.6, 45.2, 13.625,3.366295E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175, 13.55, 45.2,13.575,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.575,45.2,13.6,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.6,45.2,13.625,4.545918E-7

Any help will be appreciated. Kind regards,

Playing With BI
  • 411
  • 1
  • 9
  • 20

2 Answers2

4

You can try below code. it worked for me for complext json doc

def flattenDataframe(df: DataFrame): DataFrame = {

val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length

for(i <- 0 to fields.length-1){
  val field = fields(i)
  val fieldtype = field.dataType
  val fieldName = field.name
  fieldtype match {
    case arrayType: ArrayType =>
      val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
      val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
     // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
      val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
      return flattenDataframe(explodedDf)
    case structType: StructType =>
      val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
      val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
      val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
     val explodedf = df.select(renamedcols:_*)
      return flattenDataframe(explodedf)
    case _ =>
  }
}
df

}

Theo
  • 57,719
  • 8
  • 24
  • 41
3

I think your approach is complely right in spark. About avoid create the intermediate dataframes, you actually can write your statements consecutively without breaking it into intermediate dataframes, like

 val df = dwdJson.select($"meta.filename", explode($"records").as("record")).
    select($"filename", $"record.time", explode($"record.grids").as("grids")).
    select($"filename", $"time", $"grids.gpt").
    select($"filename", $"time", 
              $"gpt"(0).as("lat1"), 
              $"gpt"(1).as("long1"), 
              $"gpt"(2).as("lat2"),
              $"gpt"(3).as("long2"), 
              $"gpt"(4).as("value"))

And I have some thought the performance issue. Spark uses Jackson lib internally to parse json, and it must interfer schema itself by sampling records of the input (default sample ratio is 1.0, i.e all records). So, if you have big input, big files ( wholeTextFiles operation) and complex schema it will affect spark program performance.

Thang Nguyen
  • 1,110
  • 8
  • 17
  • Many thanks for the useful answer. I am comparing this spark parser with a java parser a colleague wrote using jackson. He is running the java parser locally as a regular java application and it takes around 8 seconds to parse 350 files. For the same input my spark parser needs around 5 minutes. I think there is some parallelization problem since I have the same performance with "--master local [8]" as with "--master local [1]" when I submit the job. – Playing With BI Mar 06 '18 at 12:27