5

I have a Spark application that will need to make heavy use of unions whereby I'll be unioning lots of DataFrames together at different times, under different circumstances. I'm trying to make this run as efficiently as I can. I'm still pretty much brand-spanking-new to Spark, and something occurred to me:

If I have DataFrame 'A' (dfA) that has X number of partitions (numAPartitions), and I union that to DataFrame 'B' (dfB) which has Y number of partitions (numBPartitions), then what will the resultant unioned DataFrame (unionedDF) look like, with result to partitions?

// How many partitions will unionedDF have?
// X * Y ?
// Something else?
val unionedDF : DataFrame = dfA.unionAll(dfB)

To me, this seems like its very important to understand, seeing that Spark performance seems to rely heavily on the partitioning strategy employed by DataFrames. So if I'm unioning DataFrames left and right, I need to make sure I'm constantly managing the partitions of the resultant unioned DataFrames.

The only thing I can think of (so as to properly manage partitions of unioned DataFrames) would be to repartition them and then subsequently persist the DataFrames to memory/disk as soon as I union them:

val unionedDF : DataFrame = dfA.unionAll(dfB)
unionedDF.repartition(optimalNumberOfPartitions).persist(StorageLevel.MEMORY_AND_DISK)

This way, as soon as they are unioned, we repartition them so as to spread them over the available workers/executors properly, and then the persist(...) call tells to Spark to not evict the DataFrame from memory, so we can continue working on it.

The problem is, repartitioning sounds expensive, but it may not be as expensive as the alternative (not managing partitions at all). Are there generally-accepted guidelines about how to efficiently manage unions in Spark-land?

gsamaras
  • 71,951
  • 46
  • 188
  • 305
smeeb
  • 27,777
  • 57
  • 250
  • 447

2 Answers2

2

Yes, Partitions are important for .

I am wondering if you could find that out yourself by calling:

yourResultedRDD.getNumPartitions()

Do I have to persist, post union?

In general, you have to persist/cache an RDD (no matter if it is the result of a union, or a potato :) ), if you are going to use it multiple times. Doing so will prevent from fetching it again in memory and can increase the performance of your application by 15%, in some cases!

For example if you are going to use the resulted RDD just once, it would be safe not to do persist it.

Do I have to repartition?

Since you don't care about finding the number of partitions, you can read in my memoryOverhead issue in Spark about how the number of partitions affects your application.

In general, the more partitions you have, the smaller the chunk of data every executor will process.

Recall that a worker can host multiple executors, you can think of it like the worker to be the machine/node of your cluster and the executor to be a process (executing in a core) that runs on that worker.

Isn't the Dataframe always in memory?

Not really. And that's something really lovely with , since when you handle you don't want unnecessary things to lie in the memory, since this will threaten the safety of your application.

A DataFrame can be stored in temporary files that creates for you, and is loaded in the memory of your application only when needed.

For more read: Should I always cache my RDD's and DataFrames?

gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • Thanks @gsamaras (+1) - I'm not really asking how to find out how many partitions my DataFrames have, what I'm really asking is whether or not I have to repartition and persist so as to manage them, post-union. Any thoughts? – smeeb Sep 08 '16 at 01:25
  • 1
    @smeeb I updated my answer with some thoughts, since you didn't have any other answer..Hope that helps! :) – gsamaras Sep 08 '16 at 05:29
  • Thanks again @gsamaras (+1) - two quick followup questions if you don't mind: **(1)** When you say "*Doing so will prevent spark from fetching it again in memory and can increase the performance of your application...*", I guess I don't understand what you mean by "*...prevent spark from fetching it **in memory** again...*". Isn't the DataFrame *always* in-memory? And if Spark is fetching a DataFrame *from* memory, where else could it possibly be loading the DataFrame to? Isn't memory the fastest place to store a DataFrame anyways?!? – smeeb Sep 08 '16 at 12:08
  • And **(2)** How do I decide how many executors each worker should get, and how many partitions each executor should get? Thanks again! – smeeb Sep 08 '16 at 12:09
  • 1
    @smeeb I updated my answer. As for the (2), damn that's too broad...Ask a new question if you want.. ;) – gsamaras Sep 08 '16 at 16:03
  • Thanks @gsamaras (+1 and green check) - also I [did ask that in another question](http://stackoverflow.com/questions/39381041/determining-optimal-number-of-spark-partitions-based-on-workers-cores-and-dataf), if you were so inclined to have a peek! Thanks again! – smeeb Sep 08 '16 at 18:00
  • @gsamaras Should I cache a data frame even if it is reused within the same spark job? – ravi malhotra Sep 29 '19 at 08:40
2

Union just add up the number of partitions in dataframe 1 and dataframe 2. Both dataframe have same number of columns and same order to perform union operation. So no worries, if partition columns different in both the dataframes, there will be max m + n partitions.

You doesn't need to repartition your dataframe after join, my suggestion is to use coalesce in place of repartition, coalesce combine common partitions or merge some small partitions and avoid/reduce shuffling data within partitions.

If you cache/persist dataframe after each union, you will reduce performance and lineage is not break by cache/persist, in that case, garbage collection will clean cache/memory in case of some heavy memory intensive operation and recomputing will increase computation time for the same, may be this time partial computation is required for clear/removed data.

As spark transformation are lazy, i.e; unionAll is lazy operation and coalesce/repartition is also lazy operation and come in action at the time of first action, so try to coalesce unionall result after an interval like counter of 8 and reduce partition in resulting dataframe. Use checkpoints to break lineage and store data, if there is lots of memory intensive operation in your solution.

Rahul Gupta
  • 716
  • 7
  • 14