1

When I look for ways to parse json within a string column of a dataframe, I keep running into results that more simply read json file sources. My source is actually a hive ORC table with some strings in one of the columns which is in a json format. I'd really like to convert that to something parsed like a map.

I'm having trouble finding a way to do this:

import java.util.Date
import org.apache.spark.sql.Row
import scala.util.parsing.json.JSON

val items = sql("select * from db.items limit 10")
//items.printSchema
val internal = items.map {
  case Row(externalid: Long, itemid: Long, locale: String,
           internalitem: String, version: Long,
           createdat: Date, modifiedat: Date)
       => JSON.parseFull(internalitem)
}

I thought this should work, but maybe there's a more Spark way of doing this instead because I get the following error:

java.lang.ClassNotFoundException: scala.Any
 at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)

Specifically, my input data looks approximately like this:

externalid, itemid, locale, internalitem, version, createdat, modifiedat
       123,    321, "en_us", "{'name':'thing','attr':{
                               21:{'attrname':'size','attrval':'big'},
                               42:{'attrname':'color','attrval':'red'}
                              }}",              1, 2017-05-05…, 2017-05-06…

Yes it's not RFC 7158 exactly.

The attr keys can be 5 to 30 of any 80,000 values, so I wanted get to something like this instead:

externalid, itemid, locale, internalitem, version, createdat, modifiedat
       123,    321, "en_us", "{"name':"thing","attr":[
                               {"id":21,"attrname':"size","attrval":"big"},
                               {"id":42,"attrname":"color","attrval":"red"}
                              ]}",              1, 2017-05-05…, 2017-05-06…

Then flatten the internalitem to fields and explode the attr array:

externalid, itemid, locale, name, attrid, attrname attrval, version, createdat, modifiedat
       123,    321, "en_us", "thing", 21,   "size",  "big",       1, 2017-05-05…, 2017-05-06…
       123,    321, "en_us", "thing", 21,  "color",  "red",       1, 2017-05-05…, 2017-05-06…
dlamblin
  • 43,965
  • 20
  • 101
  • 140
  • this code creates a new dataframe by parsing the data field in the source dataframe. `spark.sqlContext.read.json(df.select("col1").rdd.map(_.getAs[String](0)))` – rogue-one May 25 '17 at 20:58
  • @rogue-one yes, so when you do that you no longer can refer to the other columns after the json is parsed though, right? EG if I flattened what's in the json and wanted to output all the columns along with the new ones how would that get done? – dlamblin May 25 '17 at 21:10
  • Have you looked at this: https://stackoverflow.com/questions/35068792/spark-1-4-1-dataframe-explode-list-of-json-objects -- I think the explode method should help you do just that. Of course - it may not work, when you come from orc. This could help you extract the JSON https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes – Rick Moritz May 25 '17 at 21:25

1 Answers1

4

I've never been using such computations, but I have an advice for you :

Before doing any operation on columns on your own, just check the sql.functions package which contain a whole bunch of helpful functions to work with columns like date extracting and formatting, string concatenation and spliting, ... and it also provide a couple of functions to work with json objects like : from_json and json_tuple.

To use those methods you simply need to import them and call them inside a select method like this :

import spark.implicits._
import org.apache.spark.sql.functions._
val transofrmedDf = df.select($"externalid", $"itemid", … from_json($"internalitem", schema), $"version" …)

First of all you have to create a schema for your json column and put it in the schema variable

Hope it helps.

Haroun Mohammedi
  • 2,404
  • 12
  • 25
  • Thanks. So if I write out transofrmedDf it would contain the parsed json according to the schema. But not the other columns. I'm kind of new to the Spark DataFrame, is it possible to say: `df.select($"externalid", $"itemid", … from_json($"internalitem", schema), $"version" …)`? – dlamblin May 26 '17 at 00:20
  • @dlamblin Sure, it is obviously possible ! – Haroun Mohammedi May 26 '17 at 06:39
  • Okay that answers the main initial question. The next part was mapping what's in the json to either the "wanted to get something like" or the "flatten…explode" view. I think I'll make that a separate question. https://stackoverflow.com/q/44192994/459 – dlamblin May 26 '17 at 07:21