3

I need to write valid json but spark allows to write single row at a time like:

{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}

Above Json is not valid. Instead I need this:

{
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
}

How Can i achieve it in java?

pavan yadav
  • 159
  • 2
  • 9
  • It's hard to tell what you're asking without knowing what you have already, but does this help: https://stackoverflow.com/questions/39392044/spark-sqlcontext-output-json-format ? – Jamey Jun 25 '17 at 08:24
  • I need to convert Hive table to XML but I was facing various issues with that. So first I am converting Hive table to json and than I will directly convert JSON to XML. But as I convert Hive to Json, I found invalid json. So just need to convert it into valid one. – pavan yadav Jun 25 '17 at 08:33
  • Possible duplicate of [Spark SqlContext output JSON format](https://stackoverflow.com/questions/39392044/spark-sqlcontext-output-json-format) – always-a-learner Jun 25 '17 at 11:18

2 Answers2

5

Frist start from converting DataFrame rows to json:

Scala

val jsonDs = df.toJSON

Java

Dataset<String> jsonDs = simpleProf.toJSON();

Scala example:

case class Data(name: String, age: Int)
case class DataObj(id: String, seq: Seq[Data])

val df = session.createDataFrame(Seq(
 DataObj("1", Seq(Data("n1", 1), Data("n2", 2))),
 DataObj("2", Seq(Data("n1", 1), Data("n2", 2), Data("n3", 3))),
 DataObj("3", Seq(Data("n1", 1))),
 DataObj("4", Seq(Data("n4", 44))),
 DataObj("5", Seq(Data("n5", 55)))
))

val jsonDs = df.toJSON

Next steps depends on whether you want to save to one file or multiple files per partition.

Save to one JSON file

Scala

val count = jsonDs.count()
jsonDs
  .repartition(1) // make sure it is only one partition and in consequence one output file
  .rdd
  .zipWithIndex()
  .map { case(json, idx) =>
      if(idx == 0) "[\n" + json + "," // first row
      else if(idx == count-1) json + "\n]" // last row
      else json + ","
  }
  .saveAsTextFile("path")

Java

jsonDs
  .repartition(1) // make sure it is only one partition and in consequence one output file
  .javaRDD()
  .zipWithIndex()
  .map(t -> t._2 == 0 ? "[\n" + t._1 + "," : t._2 == count-1 ? t._1 + "\n]" : t._1 + ",")
  .saveAsTextFile("path");

Save to multiple JSON files for each partition

Scala

jsonDs
  .mapPartitions(vals => Iterator("[" + vals.mkString(",") + "]"))
  .write
  .text("path")

Java

import org.apache.commons.lang3.StringUtils;

jsonDs
  .mapPartitions(input -> Arrays.asList("[" + StringUtils.join(input, ",") + "]").iterator(), Encoders.STRING())
  .write()
  .text("path");
Piotr Kalański
  • 669
  • 1
  • 5
  • 8
  • If you repartition to 1 then your solution for multiple json files should work for one file (and it would be better to use coalesce(1) instead of repartition(1)). Note that when changing to one partition everything would have to fit that one executor. Also you are missing the { and } surrounding the object. – Assaf Mendelson Jun 25 '17 at 09:25
  • can you [lease provide java code for "Save to one JSON file" as i am new to lambda expressions in java. – pavan yadav Jun 25 '17 at 11:22
  • Hi, I've added Java implementation. – Piotr Kalański Jun 25 '17 at 12:18
1

In Python

   def write_valid_json(df, path):
       """Write df to json files, one per partition, with each file being a
           valid json array of objects (instead of Spark's json lines format).

        Each part file will have the format:
            [{json-version-rec-1},
            {json-version-rec-2},
            ...,
            {json-version-rec-N}]

        Note: If a partition is empty an empty part file will be created.
        """

        def add_delimiters(js):
            try:
                curr = next(js)
            except StopIteration:
                yield ""
                return

            result = f"[{curr}"
            while True:
                try:
                    nxt = next(js)
                    yield f"{result},"
                except StopIteration:
                    yield f"{result}]"
                    return
                curr = nxt 
                result = curr
 
        df.toJSON().mapPartitions(add_delimiters).saveAsTextFile(path)