20

I would like to create a JSON from a Spark v.1.6 (using scala) dataframe. I know that there is the simple solution of doing df.toJSON.

However, my problem looks a bit different. Consider for instance a dataframe with the following columns:

|  A  |     B     |  C1  |  C2  |    C3   |
-------------------------------------------
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |

I would like to have at the end a dataframe with

|  A  |     B     |                        C                   |
----------------------------------------------------------------
|  1  | test      | { "c1" : "ab", "c2" : 22, "c3" : TRUE }    |
|  2  | mytest    | { "c1" : "gh", "c2" : 17, "c3" : FALSE }   |

where C is a JSON containing C1, C2, C3. Unfortunately, I at compile time I do not know what the dataframe looks like (except the columns A and B that are always "fixed").

As for the reason why I need this: I am using Protobuf for sending around the results. Unfortunately, my dataframe sometimes has more columns than expected and I would still send those via Protobuf, but I do not want to specify all columns in the definition.

How can I achieve this?

lospejos
  • 1,976
  • 3
  • 19
  • 35
navige
  • 2,447
  • 3
  • 27
  • 53

4 Answers4

28

Spark 2.1 should have native support for this use case (see #15354).

import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
Michael Armbrust
  • 1,545
  • 11
  • 12
  • 1
    Is `struct` is `StructType` in Java, Can you give me java implementation – Yashwanth Kambala Sep 13 '19 at 09:15
  • 3
    While using the above solution I am getting json result as below with extra character **\** ```\"SIGNAL\":[{\"TIME\":1569382072016,\"VALUE\":-9}],\"SIGNAL01\":[{\"TIME\":1569382099654,\"VALUE\":8.0}]}"}``` How to remove that extra character from result? @Michael Armbrust – Antony Sep 25 '19 at 04:42
  • Does to_json preserve order? – ron_ron Aug 02 '22 at 20:24
13

I use this command to solve the to_json problem:

output_df = (df.select(to_json(struct(col("*"))).alias("content")))
Cyanny
  • 520
  • 6
  • 9
7

Here, no JSON parser, and it adapts to your schema:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}

df.select(
  col(df.columns(0)),
  col(df.columns(1)),
  concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
      val c = dt._1;
      val t = dt._2;
      concat(
        lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
        col(c),
        lit(if(t=="StringType") "\""; else "") 
      )
    }):_*), 
    lit("}")
  ) as "C"
).collect()
navige
  • 2,447
  • 3
  • 27
  • 53
David Griffin
  • 13,677
  • 5
  • 47
  • 65
5

First lets convert C's to a struct:

val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))

This is structure can be converted to JSONL using toJSON as before:

dfStruct.toJSON.collect
// Array[String] = Array(
//   {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
//   {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})

I am not aware of any built-in method that can convert a single column but you can either convert it individually and join or use your favorite JSON parser in an UDF.

case class C(C1: String, C2: Int, C3: Boolean)

object CJsonizer {
  import org.json4s._
  import org.json4s.JsonDSL._
  import org.json4s.jackson.Serialization
  import org.json4s.jackson.Serialization.write

  implicit val formats = Serialization.formats(org.json4s.NoTypeHints)

  def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}


val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
  CJsonizer.toJSON(c1, c2, c3))

df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Actually, my question is really about the second part of how to convert the individual columns to JSON. You are mentioning `join`-ing the columns, but that does not really work as I have on the one hand a `RDD[String]` and on the other hand a `DataFrame` – navige Mar 22 '16 at 15:51
  • 2
    Like he says, just use a `UDF`. You don't even have to use a full-blown JSON parser in the `UDF` -- you can just craft a JSON string on the fly using `map` and `mkString`. You will probably need to use `DataFrame.columns` or possibly `DataFrame.dtypes` to both craft the `select` statement and as the basis of the `map` in the `UDF`. – David Griffin Mar 22 '16 at 16:05
  • I agree with @DavidGriffin - udf can be the simplest solution here. And Jackson and json4s are already dragged with other dependencies. – zero323 Mar 22 '16 at 21:47
  • 1
    My problem with all of the JSON parsers I have seen is that you need to know in advance what the schema looks like -- like with your solution @zero323 -- it only works for those specific columns. What if the names were different? What if there were more than 3 columns? – David Griffin Mar 23 '16 at 00:04
  • 1
    The only problem I see is that `Row` is extremely ugly data structure. Otherwise you can simply build an arbitrary complex AST with Lift / json4s and convert it to JSON. But truth be told it is to much effort to put it into a SO answer. – zero323 Mar 23 '16 at 00:32
  • 2
    `Row` is ugly for the same reason I hate dealing with JSON in Scala -- it's a clash of cultures Loosey, goosey vs strong, static typing. SQL is loosey goosey -- you are a `select` away from defining a new type -- hence `Row` is messy. Avro's `GenericRecord` has the same problem. – David Griffin Mar 23 '16 at 00:59