0

Need to create one json file for each row from the dataframe. I'm using PartitionBy which creates subfolders for each file. Is there a way to avoid creating the subfolders and rename the json files with the unique key? OR any other alternatives? Its a huge dataframe with thousands (~300K) of unique values, so Repartition is eating up a lot of resources and taking time.Thanks.

df.select(Seq(col("UniqueField").as("UniqueField_Copy")) ++ 
df.columns.map(col): _*)       
.write.partitionBy("UniqueField")
.mode("overwrite").format("json").save("c:\temp\json\")
Afaq
  • 1,146
  • 1
  • 13
  • 25
user11030430
  • 3
  • 1
  • 4

1 Answers1

0

Putting all the output in one directory

Your example code is calling partitionBy on a DataFrameWriter object. The documentation tells us that this function:

Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

year=2016/month=01/

year=2016/month=02/

This is the reason you're getting subdirectories. Simply removing the call to partitionBy will get all your output in one directory.

Getting one row per file

Spark SQL

You had the right idea partitioning your data by UniqueField, since Spark writes one file per partition. Rather than using DataFrameWriter's partition, you can use

df.repartitionByRange(numberOfJson, $"UniqueField")

to get the desired number of partitions, with one JSON per partition. Notice that this requires you to know the number of JSON's you will end up with in advance. You can compute it by

val numberOfJson = df.select(count($"UniqueField")).first.getAs[Long](0)

However, this adds an additional action to your query, which will cause your entire dataset to be computed again. It sounds like your dataset is too big to fit in memory, so you'll need to carefully consider if caching (or checkpointing) with df.cache (or df.checkpoint) actually saves you computation time. (For large datasets that don't require intensive computation to create, recomputation can actually be faster)

RDD

An alternative to using the Spark SQL API is to drop down to the lower-level RDD. Partitioning by key (in pyspark) for RDDs was discussed thoroughly in the answer to this question. In scala, you'd have to specify a custom Partitioner as described in this question.

Renaming Spark's output files

This is a fairly common question, and AFAIK, the consensus is it's not possible.

Hope this helps, and welcome to Stack Overflow!

Community
  • 1
  • 1
Jason
  • 209
  • 2
  • 6