0

I was trying to create a spark dataframe(using scala) from a csv that has entries like this: This is a single row entry of 5 columns:

{"username":"john_doe, "id":"123411"} true 0 5 {"country":"IN", "city":"BOM"}

So, some columns are in JSON format are some are not. I did write some code to process the JSON columns as a String and directly add them row-wise as a String to a dataframe, but the other columns which are left I have to manually add them to the dataframe, which is turning out to be a costly process since I create a separate dataframe, add an "id" column to it, do a natural join, drop the "id" column every time.

Any help will be appreciated!

zero323
  • 322,348
  • 103
  • 959
  • 935
J.Doe
  • 183
  • 1
  • 13
  • 1
    can we see what you've tried so far? – Ramesh Maharjan Jan 29 '18 at 07:16
  • My code is a little messy. I am doing a lot of unrelated things in between related to janusgraph. To process json columns, It's mostly manipulating column entries which start with '{' and removing, trimming, splitting strings to get key, value pairs and adding it to a big string that cumulates all this. After every row, I am using read.json to convert the string to a temporary dataframe and finally doing a union with a main_df, to accumulate all the rows. – J.Doe Jan 29 '18 at 07:28
  • I am similar situation, the only difference is my json schema is not fixed. – smishra Apr 15 '19 at 17:43

1 Answers1

1

First you need to read the csv correctly such that the json columns are read as string. So you would do something like this:

val csvSchema = StructType(Seq(StructField("info", StringType), StructField("boolean", BooleanType), ....))
val df = spark.read.option(...).schema(csvSChema).csv(...)

build the full schema into csvSchema, set the relevant options (delimiter, header) set the schema and read the csv.

The resulting dataframe would have string in the json column. Now we can convert the json to real columns:

 val s = StructType(StructField("username", StringType), StructField("id",StringType))

df.withColumn("info", from_json(df("info"), s))

the result would be a struct in the new column.

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • How can I process this if json schema is not fixed? – smishra Apr 15 '19 at 17:44
  • @smishra Dataframe requires a fixed schema so if you want to use it as columns you need to fix the schema somehow. You can build the schema in the code as long as it is the same for the entire column or you can create a schema where some of the values are null but at the end of the day, dataframe is based on consistent schema. – Assaf Mendelson Apr 16 '19 at 07:39