Frist start from converting DataFrame
rows to json:
Scala
val jsonDs = df.toJSON
Java
Dataset<String> jsonDs = simpleProf.toJSON();
Scala example:
case class Data(name: String, age: Int)
case class DataObj(id: String, seq: Seq[Data])
val df = session.createDataFrame(Seq(
DataObj("1", Seq(Data("n1", 1), Data("n2", 2))),
DataObj("2", Seq(Data("n1", 1), Data("n2", 2), Data("n3", 3))),
DataObj("3", Seq(Data("n1", 1))),
DataObj("4", Seq(Data("n4", 44))),
DataObj("5", Seq(Data("n5", 55)))
))
val jsonDs = df.toJSON
Next steps depends on whether you want to save to one file or multiple files per partition.
Save to one JSON file
Scala
val count = jsonDs.count()
jsonDs
.repartition(1) // make sure it is only one partition and in consequence one output file
.rdd
.zipWithIndex()
.map { case(json, idx) =>
if(idx == 0) "[\n" + json + "," // first row
else if(idx == count-1) json + "\n]" // last row
else json + ","
}
.saveAsTextFile("path")
Java
jsonDs
.repartition(1) // make sure it is only one partition and in consequence one output file
.javaRDD()
.zipWithIndex()
.map(t -> t._2 == 0 ? "[\n" + t._1 + "," : t._2 == count-1 ? t._1 + "\n]" : t._1 + ",")
.saveAsTextFile("path");
Save to multiple JSON files for each partition
Scala
jsonDs
.mapPartitions(vals => Iterator("[" + vals.mkString(",") + "]"))
.write
.text("path")
Java
import org.apache.commons.lang3.StringUtils;
jsonDs
.mapPartitions(input -> Arrays.asList("[" + StringUtils.join(input, ",") + "]").iterator(), Encoders.STRING())
.write()
.text("path");