13

I would like to write my spark dataframe as a set of JSON files and in particular each of which as an array of JSON. Let's me explain with a simple (reproducible) code.

We have:

import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))

Saving the dataframe as:

df.write.json('s3://path/to/json')

each file just created has one JSON object per line, something like:

{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}

but I would like to have an array of those JSON per file:

[
   {"x":0.9953802385540144,"y":0.476027611419198},
   {"x":0.929599290575914,"y":0.72878523939521},
   {"x":0.951701684432855,"y":0.8008064729546504}
]
enneppi
  • 1,029
  • 2
  • 15
  • 33
  • each executor writes its data in parallel. You could concatenate all of the part files and add the brackets yourself. – pault Oct 04 '19 at 14:55
  • this is only a work around and even though it isnt feasible for me since as soon as the file is wrote on s3 a lambda is triggered on... – enneppi Oct 04 '19 at 15:08
  • Are you okay with having each file be an array of `json` or do you want the whole contents in one file. If you're using spark 2.4 `df.coalesce(1).write.json("path", lineSep="\n,")` would almost get you there... – pault Oct 04 '19 at 15:15
  • 1
    @pault, okay with having each file be an array of json not only a file – enneppi Oct 04 '19 at 15:23

2 Answers2

8

It is not currently possible to have spark "natively" write a single file in your desired format because spark works in a distributed (parallel) fashion, with each executor writing its part of the data independently.

However, since you are okay with having each file be an array of json not only [one] file, here is one workaround that you can use to achieve your desired output:

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

df.select(to_json(struct(*df.columns)).alias("json"))\
    .groupBy(spark_partition_id())\
    .agg(collect_list("json").alias("json_list"))\
    .select(col("json_list").cast("string"))\
    .write.text("s3://path/to/json")

First you create a json from all of the columns in df. Then group by the spark partition ID and aggregate using collect_list. This will put all the jsons on that partition into a list. Since you're aggregating within the partition, there should be no shuffling of data required.

Now select the list column, convert to a string, and write it as a text file.

Here's an example of how one file looks:

[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]

Note you may get some empty files.


Presumably you can force spark to write the data in ONE file if you specified an empty groupBy, but this would result in forcing all of the data into a single partition which could result in an out of memory error.

pault
  • 41,343
  • 15
  • 107
  • 149
  • 1
    thanks @pault, the solution you wrote is again a workaround but I think this is the only way to reach the goal. In fact Spark has no alternative to wrote the file except concatenating the single json: each executor just writes its own set of json objects knowing nothing (as they work in a parallel fashion) about an other executor. So adding a trailing/ending brackets or a comma to separate the json object would be go against spark "philosophy". could you add this reasoning to your answer so i can accept it – enneppi Oct 09 '19 at 15:28
  • 1
    @enneppi I added some clarification based on your feedback. – pault Oct 09 '19 at 16:03
  • hi @pault - How would you add a root element to this? Before the json_list array – SqlKindaGuy Feb 25 '22 at 07:47
1

If the data is not super huge and it's okay to have the list as one JSON file, the following workaround is also valid. First, convert the Pyspark data frame to Pandas and then to a list of dicts. Then, the list can be dumped as JSON.

list_of_dicts = df.toPandas().to_dict('records')
json_file = open('path/to/file.json', 'w')

json_file.write(json.dumps(list_of_dicts))
json_file.close()
emjeexyz
  • 376
  • 3
  • 6