5

When writing a dataframe to parquet using partitionBy :

df.write.partitionBy("col1","col2","col3").parquet(path)

It would be my expectation that each partition being written were done independently by a separate task and in parallel to the extent of the number of workers assigned to the current spark job.

However there is actually only one worker/task running at a time when writing to the parquet. That one worker is cycling through each of the partitions and writing out the .parquet files serially. Why would this be the case - and is there a way to compel concurrency in this spark.write.parquet operation?

The following is not what I want to see (should be 700%+ ..)

enter image description here

From this other post I also tried adding repartition in front

Spark parquet partitioning : Large number of files

df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)

This unfortunately had no effect: still one worker only..

Note: I am running in local mode with local[8] and have seen other spark operations run with as many as eight concurrent workers and using up to 750% of the cpus.

WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560

1 Answers1

4

In short, writing the multiple output files from a single task is not parallelized, but assuming you have multiple tasks (multiple input splits) each one of those will get their own core on a worker.

The goal of writing out partitioned data isn't to parallelize your writing operation. Spark is already doing that by simultaneously writing out multiple tasks at once. The goal is just to optimize future read operations where you want only one partition of the saved data.

The logic to write partitions in Spark is designed to read all of the records from the previous stage only once when writing them out to their destination. I believe part of the design choice is also to protect against the case where a partition key has many many values.

EDIT: Spark 2.x method

In Spark 2.x, it sorts the records in each task by their partition keys, then it iterates through them writing to one file handle at a time. I assume they are doing this to ensure they never open a huge number of file handles if there are a lot of distinct values in your partition keys.

For reference, here is the sorting:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

Scroll down a little and you will see it calling write(iter.next()) looping through each row.

And here is the actual writing (one file/partition key at a time):

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

There you can see it only holds one file handle open at a time.

EDIT: Spark 1.x method

What spark 1.x does is for a given task is loop through all the records, opening a new file handle whenever it encounters a new output partition it hasn't seen before for this task. It then immediately writes the record to that file handle and goes onto the next one. This means at any given time while processing a single task it can have up to N file handles open just for that one task where N is the maximum number of output partitions. To make it clearer, here is some python psuedo-code to show the general idea:

# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
    partition_path = determine_output_path(row, partition_keys)
    if partition_path not in handles:
        handles[partition_path] = open(partition_path, 'w')

    handles[partition_path].write(row)

There is a caveat to the above strategy for writing out records. In spark 1.x the setting spark.sql.sources.maxConcurrentWrites put an upper limit on the mask file handles that could be open per task. After that was reached, Spark would instead sort the data by the partition key, so it could iterate through the records writing out one file at a time.

Ryan Widmaier
  • 7,948
  • 2
  • 30
  • 32
  • I'm not getting this as a "strategy": `hive` for example uses the partitions to drive concurrency both on read and write. These are distinct directories and afford no obstacle to parallel writing. I mean in the end - if this were going to be the case then I could write my own routine to parallelize the writing of each partition in a different worker via `mapPartitions`. – WestCoastProjects Jun 26 '18 at 21:21
  • Even if they are distinct folders, there are still considerations about number of concurrent file handles, and max total write IO you can do. You could use mapPartitions to parallelize writing of tasks manually in theory, but you could also just increase the number of executors or cores to achieve the same thing (assuming your job has enough tasks) – Ryan Widmaier Jun 26 '18 at 21:28
  • Updated with more specific info for Spark 2 – Ryan Widmaier Jun 26 '18 at 21:46
  • 1
    This is nice research. Holding the question open a bit longer in hopes of someone dropping in a workaround. afa # of workers: the OP mentions they are 8 but only one is being used. I'm more than a little surprised that it would be considered acceptable to use a single thread for all this. – WestCoastProjects Jun 26 '18 at 22:05
  • Nice research, but how does it explain CPU usage with more than one partition. Even in `local` mode, each worker thread should work independently of others, shouldn't it? – Alper t. Turker Jun 26 '18 at 22:27
  • 1
    Only one worker is writing the parquet. As mentioned in the OP the `local[8]` is working fine (700+%) for *other* spark tasks : so it is parquet writing that is the problem. It makes no sense to serialize this work. – WestCoastProjects Jun 26 '18 at 22:34
  • @javadba Do you see "Only one worker (...) writing" (by monitoring task execution) or just see low CPU utilization? These are not the same things. And "In short, writing the multiple output files from a single task is not parallelized" doesn't explain why it would happen with multiple tasks. – Alper t. Turker Jun 27 '18 at 10:04
  • Its not low cpu utiliation -it's 100% of *one* Cpu. For tens of minutes. – WestCoastProjects Jun 27 '18 at 15:16
  • @javadba I doesn't really answer the question, does it? CPU activity pattern like this can be caused by many unrelated reasons. – Alper t. Turker Jun 27 '18 at 22:18
  • If you think I can not distinguish between one process causing cpu usage and multiple ones adding up to 100% then suit yourself. – WestCoastProjects Jun 28 '18 at 01:03